- Engine: Add ppa_revenue_cr, mcp_revenue_cr, tariff, units to PnLRow - Engine: Split PPA vs MCP revenue in P&L computation - Web: Collapsible rows for PPA/MCP Revenue and Opex - Web: Highlighted rows (Total Revenue, EBITDA, EBIT, PBT, PAT) - Web: Units above Tariff in breakdown, bg-blue-50 highlight - Fix sticky column z-index for horizontal scroll - CLAUDE.md: Add project documentation Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
164 lines
5.3 KiB
Python
164 lines
5.3 KiB
Python
"""Scenario API integration tests (S5-T10)."""
|
|
|
|
import json
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
import pytest
|
|
from httpx import AsyncClient
|
|
|
|
|
|
async def test_list_scenarios_empty(client: AsyncClient) -> None:
|
|
resp = await client.get("/api/scenarios")
|
|
assert resp.status_code == 200
|
|
assert resp.json() == []
|
|
|
|
|
|
async def test_create_and_get_scenario(client: AsyncClient) -> None:
|
|
mock_pool = AsyncMock()
|
|
mock_pool.enqueue_job = AsyncMock()
|
|
mock_pool.aclose = AsyncMock()
|
|
|
|
with patch("remodel_api.routers.scenarios.arq.create_pool", return_value=mock_pool):
|
|
resp = await client.post("/api/scenarios", json={"name": "Test Scenario"})
|
|
|
|
assert resp.status_code == 201
|
|
data = resp.json()
|
|
assert data["name"] == "Test Scenario"
|
|
assert data["status"] == "queued"
|
|
scenario_id = data["id"]
|
|
|
|
resp2 = await client.get(f"/api/scenarios/{scenario_id}")
|
|
assert resp2.status_code == 200
|
|
assert resp2.json()["id"] == scenario_id
|
|
|
|
|
|
async def test_get_scenario_not_found(client: AsyncClient) -> None:
|
|
resp = await client.get("/api/scenarios/nonexistent-id")
|
|
assert resp.status_code == 404
|
|
|
|
|
|
async def test_list_scenarios_after_create(client: AsyncClient) -> None:
|
|
mock_pool = AsyncMock()
|
|
mock_pool.enqueue_job = AsyncMock()
|
|
mock_pool.aclose = AsyncMock()
|
|
|
|
with patch("remodel_api.routers.scenarios.arq.create_pool", return_value=mock_pool):
|
|
await client.post("/api/scenarios", json={"name": "S1"})
|
|
await client.post("/api/scenarios", json={"name": "S2"})
|
|
|
|
resp = await client.get("/api/scenarios")
|
|
assert resp.status_code == 200
|
|
assert len(resp.json()) == 2
|
|
|
|
|
|
async def test_create_scenario_with_inputs(client: AsyncClient) -> None:
|
|
mock_pool = AsyncMock()
|
|
mock_pool.enqueue_job = AsyncMock()
|
|
mock_pool.aclose = AsyncMock()
|
|
|
|
payload = {
|
|
"name": "Solar 10MW",
|
|
"inputs": {
|
|
"solar": {"location_id": "RJ", "capacity_dc_mwp": 10.0, "capacity_ac_mw": 8.0}
|
|
},
|
|
}
|
|
with patch("remodel_api.routers.scenarios.arq.create_pool", return_value=mock_pool):
|
|
resp = await client.post("/api/scenarios", json=payload)
|
|
|
|
assert resp.status_code == 201
|
|
data = resp.json()
|
|
assert data["name"] == "Solar 10MW"
|
|
mock_pool.enqueue_job.assert_awaited_once()
|
|
|
|
|
|
async def test_get_kpis_not_success(client: AsyncClient) -> None:
|
|
mock_pool = AsyncMock()
|
|
mock_pool.enqueue_job = AsyncMock()
|
|
mock_pool.aclose = AsyncMock()
|
|
|
|
with patch("remodel_api.routers.scenarios.arq.create_pool", return_value=mock_pool):
|
|
resp = await client.post("/api/scenarios", json={"name": "Pending"})
|
|
|
|
scenario_id = resp.json()["id"]
|
|
resp2 = await client.get(f"/api/scenarios/{scenario_id}/kpis")
|
|
assert resp2.status_code == 409
|
|
|
|
|
|
async def test_get_statements_not_success(client: AsyncClient) -> None:
|
|
mock_pool = AsyncMock()
|
|
mock_pool.enqueue_job = AsyncMock()
|
|
mock_pool.aclose = AsyncMock()
|
|
|
|
with patch("remodel_api.routers.scenarios.arq.create_pool", return_value=mock_pool):
|
|
resp = await client.post("/api/scenarios", json={"name": "Pending"})
|
|
|
|
scenario_id = resp.json()["id"]
|
|
resp2 = await client.get(f"/api/scenarios/{scenario_id}/statements")
|
|
assert resp2.status_code == 409
|
|
|
|
|
|
async def test_archive_scenario(client: AsyncClient) -> None:
|
|
mock_pool = AsyncMock()
|
|
mock_pool.enqueue_job = AsyncMock()
|
|
mock_pool.aclose = AsyncMock()
|
|
|
|
with patch("remodel_api.routers.scenarios.arq.create_pool", return_value=mock_pool):
|
|
resp = await client.post("/api/scenarios", json={"name": "ToArchive"})
|
|
|
|
scenario_id = resp.json()["id"]
|
|
del_resp = await client.delete(f"/api/scenarios/{scenario_id}")
|
|
assert del_resp.status_code == 200
|
|
|
|
list_resp = await client.get("/api/scenarios")
|
|
assert not any(s["id"] == scenario_id for s in list_resp.json())
|
|
|
|
|
|
async def test_get_kpis_success(client: AsyncClient, db_session: object) -> None:
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from remodel_api.db.models import Scenario
|
|
|
|
session = db_session # already the overridden session
|
|
assert isinstance(session, AsyncSession)
|
|
kpis = {"equity_irr": 0.15, "total_capex_cr": 100.0}
|
|
scenario = Scenario(
|
|
name="Done",
|
|
status="success",
|
|
inputs_json="{}",
|
|
kpis_json=json.dumps(kpis),
|
|
)
|
|
session.add(scenario)
|
|
await session.commit()
|
|
await session.refresh(scenario)
|
|
scenario_id = scenario.id
|
|
|
|
resp = await client.get(f"/api/scenarios/{scenario_id}/kpis")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["equity_irr"] == pytest.approx(0.15)
|
|
|
|
|
|
async def test_get_statements_success(client: AsyncClient, db_session: object) -> None:
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from remodel_api.db.models import Scenario
|
|
|
|
session = db_session
|
|
assert isinstance(session, AsyncSession)
|
|
stmts = {"pnl": [{"year": 1, "revenue_cr": 50.0}], "cfs": [], "bs": []}
|
|
scenario = Scenario(
|
|
name="Done2",
|
|
status="success",
|
|
inputs_json="{}",
|
|
statements_json=json.dumps(stmts),
|
|
)
|
|
session.add(scenario)
|
|
await session.commit()
|
|
await session.refresh(scenario)
|
|
scenario_id = scenario.id
|
|
|
|
resp = await client.get(f"/api/scenarios/{scenario_id}/statements")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert "pnl" in data
|
|
assert len(data["pnl"]) == 1
|