blackopsrepl's picture
Upload 33 files
177c40c verified
"""
REST API endpoints.
Provides a standard API for:
- Listing and fetching demo data
- Starting/stopping solving
- Getting solution status
- Analyzing scores
"""
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from uuid import uuid4
from dataclasses import replace
from typing import Dict, List
import os
from .domain import Schedule, ScheduleModel, Resource, Task, ConstraintWeightsModel
from .demo_data import DemoData, generate_demo_data
from .solver import solver_manager, solution_manager
from . import constraints # For setting global weights
app = FastAPI(
title="SolverForge Quickstart",
description="Constraint optimization API",
docs_url='/q/swagger-ui'
)
# In-memory storage for solving jobs
data_sets: dict[str, Schedule] = {}
# =============================================================================
# DEMO DATA ENDPOINTS
# =============================================================================
@app.get("/demo-data")
async def demo_data_list() -> list[DemoData]:
"""List available demo datasets."""
return [e for e in DemoData]
@app.get("/demo-data/{dataset_id}", response_model_exclude_none=True)
async def get_demo_data(dataset_id: str) -> ScheduleModel:
"""Get a specific demo dataset."""
demo_data = getattr(DemoData, dataset_id)
domain_schedule = generate_demo_data(demo_data)
return _schedule_to_model(domain_schedule)
# =============================================================================
# SOLVING ENDPOINTS
# =============================================================================
@app.post("/schedules")
async def solve(schedule_model: ScheduleModel) -> str:
"""
Start solving a schedule.
Returns a job ID that can be used to check progress and get results.
Accepts optional constraint_weights to adjust constraint penalties.
"""
job_id = str(uuid4())
# Set constraint weights globally before solving
if schedule_model.constraint_weights:
weights = schedule_model.constraint_weights
constraints.CONSTRAINT_WEIGHTS = {
'required_skill': weights.required_skill,
'resource_capacity': weights.resource_capacity,
'minimize_duration': weights.minimize_duration,
'balance_load': weights.balance_load,
}
else:
# Reset to defaults
constraints.CONSTRAINT_WEIGHTS = {
'required_skill': 100,
'resource_capacity': 100,
'minimize_duration': 50,
'balance_load': 50,
}
schedule = _model_to_schedule(schedule_model)
data_sets[job_id] = schedule
solver_manager.solve_and_listen(
job_id,
schedule,
lambda solution: _update_schedule(job_id, solution)
)
return job_id
@app.get("/schedules")
async def list_schedules() -> List[str]:
"""List all job IDs."""
return list(data_sets.keys())
@app.get("/schedules/{job_id}", response_model_exclude_none=True)
async def get_schedule(job_id: str) -> ScheduleModel:
"""Get the current solution for a job."""
if job_id not in data_sets:
raise ValueError(f"No schedule found with ID {job_id}")
schedule = data_sets[job_id]
updated = replace(schedule, solver_status=solver_manager.get_solver_status(job_id))
return _schedule_to_model(updated)
@app.get("/schedules/{job_id}/status")
async def get_status(job_id: str) -> Dict:
"""Get solving status and score."""
if job_id not in data_sets:
raise ValueError(f"No schedule found with ID {job_id}")
schedule = data_sets[job_id]
solver_status = solver_manager.get_solver_status(job_id)
return {
"score": {
"hardScore": schedule.score.hard_score if schedule.score else 0,
"softScore": schedule.score.soft_score if schedule.score else 0,
},
"solverStatus": solver_status.name,
}
@app.delete("/schedules/{job_id}")
async def stop_solving(job_id: str) -> ScheduleModel:
"""Stop solving and return current solution."""
if job_id not in data_sets:
raise ValueError(f"No schedule found with ID {job_id}")
try:
solver_manager.terminate_early(job_id)
except Exception as e:
print(f"Warning: terminate_early failed for {job_id}: {e}")
return await get_schedule(job_id)
@app.put("/schedules/analyze")
async def analyze(schedule_model: ScheduleModel) -> Dict:
"""Analyze a schedule's score breakdown."""
schedule = _model_to_schedule(schedule_model)
analysis = solution_manager.analyze(schedule)
constraints = []
for constraint in getattr(analysis, 'constraint_analyses', []) or []:
matches = [
{
"name": str(getattr(getattr(match, 'constraint_ref', None), 'constraint_name', "")),
"score": str(getattr(match, 'score', "0hard/0soft")),
"justification": str(getattr(match, 'justification', "")),
}
for match in getattr(constraint, 'matches', []) or []
]
constraints.append({
"name": str(getattr(constraint, 'constraint_name', "")),
"weight": str(getattr(constraint, 'weight', "0hard/0soft")),
"score": str(getattr(constraint, 'score', "0hard/0soft")),
"matches": matches,
})
return {"constraints": constraints}
# =============================================================================
# HELPER FUNCTIONS
# =============================================================================
def _update_schedule(job_id: str, schedule: Schedule):
"""Callback for solver updates."""
global data_sets
data_sets[job_id] = schedule
def _schedule_to_model(schedule: Schedule) -> ScheduleModel:
"""Convert domain Schedule to Pydantic model."""
from .domain import ResourceModel, TaskModel, ScheduleModel
resources = [
ResourceModel(
name=r.name,
capacity=r.capacity,
skills=list(r.skills),
)
for r in schedule.resources
]
tasks = [
TaskModel(
id=t.id,
name=t.name,
duration=t.duration,
requiredSkill=t.required_skill,
resource=t.resource.name if t.resource else None,
)
for t in schedule.tasks
]
return ScheduleModel(
resources=resources,
tasks=tasks,
score=str(schedule.score) if schedule.score else None,
solverStatus=schedule.solver_status.name if schedule.solver_status else None,
)
def _model_to_schedule(model: ScheduleModel) -> Schedule:
"""Convert Pydantic model to domain Schedule."""
resources = {
r.name: Resource(
name=r.name,
capacity=r.capacity,
skills=set(r.skills),
)
for r in model.resources
}
tasks = [
Task(
id=t.id,
name=t.name,
duration=t.duration,
required_skill=t.required_skill or "",
resource=resources.get(t.resource) if t.resource else None,
)
for t in model.tasks
]
return Schedule(
resources=list(resources.values()),
tasks=tasks,
)
# =============================================================================
# SOURCE CODE VIEWER ENDPOINTS
# =============================================================================
# Whitelist of files that can be viewed
SOURCE_FILES = {
'domain.py': 'src/my_quickstart/domain.py',
'constraints.py': 'src/my_quickstart/constraints.py',
'solver.py': 'src/my_quickstart/solver.py',
'rest_api.py': 'src/my_quickstart/rest_api.py',
'demo_data.py': 'src/my_quickstart/demo_data.py',
'index.html': 'static/index.html',
'app.js': 'static/app.js',
'app.css': 'static/app.css',
}
@app.get("/source-code")
async def list_source_files() -> List[str]:
"""List available source files for the code viewer."""
return list(SOURCE_FILES.keys())
@app.get("/source-code/{filename}")
async def get_source_code(filename: str) -> Dict:
"""Get the contents of a source file."""
if filename not in SOURCE_FILES:
raise ValueError(f"File not available: {filename}")
filepath = SOURCE_FILES[filename]
if not os.path.exists(filepath):
raise ValueError(f"File not found: {filepath}")
with open(filepath, 'r') as f:
content = f.read()
return {"filename": filename, "content": content}
# =============================================================================
# STATIC FILES (optional web UI)
# =============================================================================
# Mount static files if directory exists
if os.path.exists("static"):
app.mount("/", StaticFiles(directory="static", html=True), name="static")