"""Arq worker tasks — run_scenario wraps the real engine in a thread.""" from __future__ import annotations import asyncio import json import os from concurrent.futures import ThreadPoolExecutor from typing import Any import redis.asyncio as aioredis from remodel_api.config import settings from remodel_api.db.models import Scenario from remodel_api.db.session import AsyncSessionLocal _executor = ThreadPoolExecutor(max_workers=4) def _safe_dumps(obj: Any) -> str: """json.dumps that converts non-finite floats to null instead of raising.""" import math def _default(o: Any) -> Any: if isinstance(o, float) and not math.isfinite(o): return None raise TypeError(f"Object of type {type(o)} is not JSON serializable") return json.dumps(obj, default=_default) async def _publish(r: Any, channel: str, stage: str, pct: int) -> None: payload = json.dumps({"stage": stage, "pct": pct}) await r.publish(channel, payload) def _run_engine(inputs_json: str) -> dict[str, Any]: """CPU-bound: parse inputs and run the scenario engine.""" # Force reload engine modules to pick up code changes import sys for mod in list(sys.modules.keys()): if 'remodel_engine' in mod: del sys.modules[mod] from remodel_engine.scenarios.runner import run_scenario from remodel_engine.schemas.scenario import ScenarioInput inputs = ScenarioInput.model_validate_json(inputs_json) result = run_scenario(inputs) return { "status": result.status, "solved_tariff": result.solved_tariff, "kpis": result.kpis.model_dump(), "statements": { "pnl": [r.model_dump() for r in result.financials.pnl] if result.financials else [], "cfs": [r.model_dump() for r in result.financials.cfs] if result.financials else [], "bs": [r.model_dump() for r in result.financials.bs] if result.financials else [], "generation": result.generation_by_year, "idc_phasing": result.idc_phasing, # Hourly generation: 25 years × 8760 hours "solar_hourly": result.solar_hourly, "wind_hourly": result.wind_hourly, }, "debt_schedule": [r.model_dump() for r in result.debt_schedule], "irr_metrics": result.irr_metrics.model_dump(), "runtime_s": result.runtime_s, "warnings": result.warnings, } async def run_scenario_task(ctx: dict[str, Any], scenario_id: str) -> dict[str, Any]: """Arq task: run the full scenario pipeline.""" r = aioredis.from_url(settings.redis_url) # type: ignore[no-untyped-call] channel = f"scenario:{scenario_id}:events" async with AsyncSessionLocal() as db: scenario = await db.get(Scenario, scenario_id) if scenario is None: await r.aclose() return {"error": "not found"} inputs_json = scenario.inputs_json or "{}" scenario.status = "running" await db.commit() await _publish(r, channel, "starting", 5) loop = asyncio.get_event_loop() try: await _publish(r, channel, "computing", 20) engine_result = await loop.run_in_executor( _executor, _run_engine, inputs_json ) await _publish(r, channel, "finishing", 90) timeseries_path: str | None = None timeseries_dir = os.path.join("data", "scenarios", scenario_id) os.makedirs(timeseries_dir, exist_ok=True) async with AsyncSessionLocal() as db: scenario = await db.get(Scenario, scenario_id) if scenario is not None: scenario.status = engine_result.get("status", "success") scenario.kpis_json = _safe_dumps(engine_result.get("kpis", {})) scenario.statements_json = _safe_dumps(engine_result.get("statements", {})) scenario.debt_schedule_json = _safe_dumps(engine_result.get("debt_schedule", [])) scenario.runtime_s = engine_result.get("runtime_s") scenario.timeseries_path = timeseries_path await db.commit() await _publish(r, channel, "done", 100) await r.aclose() return engine_result except Exception as e: async with AsyncSessionLocal() as db: scenario = await db.get(Scenario, scenario_id) if scenario is not None: scenario.status = "failed" scenario.error_message = str(e) await db.commit() await _publish(r, channel, "error", 100) await r.aclose() raise async def run_dummy_scenario(ctx: dict[str, Any], scenario_id: str) -> dict[str, Any]: """Legacy dummy task kept for backward compatibility.""" return await run_scenario_task(ctx, scenario_id)