Source code for uniqc.gateway.api.archive
"""Archive management API — /api/archive and /api/tasks/{id}/archive."""
from __future__ import annotations
from typing import Any
from fastapi import APIRouter, HTTPException
from uniqc.backend_adapter.task.store import TaskStore
from uniqc.gateway.db.archive_store import ArchiveStore
router = APIRouter()
def _info_to_dict(t: Any) -> dict[str, Any]:
return {
"task_id": t.task_id,
"backend": t.backend,
"status": t.status,
"shots": t.shots,
"submit_time": t.submit_time,
"update_time": t.update_time,
"has_result": t.result is not None,
"metadata": t.metadata,
"archived_at": t.archived_at,
}
# ---------------------------------------------------------------------------
# /api/tasks/{task_id}/archive — archive a task
# ---------------------------------------------------------------------------
_archive_move_router = APIRouter(prefix="/tasks", tags=["tasks"])
[docs]
@_archive_move_router.post("/{task_id}/archive")
def archive_task(task_id: str) -> dict[str, str]:
"""Move a task from the active ``tasks`` table into the archive."""
store = TaskStore()
task = store.get(task_id)
if task is None:
raise HTTPException(status_code=404, detail=f"Task '{task_id}' not found")
archive = ArchiveStore()
success = archive.archive_task(task_id)
if not success:
raise HTTPException(status_code=500, detail=f"Failed to archive task '{task_id}'")
return {"archived": task_id}
# ---------------------------------------------------------------------------
# /api/archive — browse / manage archived tasks
# ---------------------------------------------------------------------------
_archive_router = APIRouter(prefix="/archive", tags=["archive"])
[docs]
@_archive_router.get("")
def list_archived(
status: str | None = None,
backend: str | None = None,
limit: int | None = 100,
offset: int | None = 0,
) -> dict[str, Any]:
"""List archived tasks."""
archive = ArchiveStore()
tasks = archive.list_archived(status=status, backend=backend, limit=limit, offset=offset)
return {
"tasks": [_info_to_dict(t) for t in tasks],
"total": archive.count_archived(status=status, backend=backend),
"count": archive.count_archived(),
"limit": limit,
"offset": offset or 0,
}
[docs]
@_archive_router.get("/{task_id}")
def get_archived(task_id: str) -> dict[str, Any]:
"""Return full details of an archived task."""
archive = ArchiveStore()
task = archive.get_archived(task_id)
if task is None:
raise HTTPException(status_code=404, detail=f"Archived task '{task_id}' not found")
return {
"task_id": task.task_id,
"backend": task.backend,
"status": task.status,
"shots": task.shots,
"submit_time": task.submit_time,
"update_time": task.update_time,
"result": task.result,
"metadata": task.metadata,
"archived_at": task.archived_at,
}
[docs]
@_archive_router.delete("/{task_id}")
def delete_archived(task_id: str) -> dict[str, str]:
"""Permanently delete an archived task."""
archive = ArchiveStore()
deleted = archive.delete_archived(task_id)
if not deleted:
raise HTTPException(status_code=404, detail=f"Archived task '{task_id}' not found")
return {"deleted": task_id}
[docs]
@_archive_router.post("/restore/{task_id}")
def restore_task(task_id: str) -> dict[str, str]:
"""Restore an archived task back to the active task list."""
archive = ArchiveStore()
restored = archive.restore_task(task_id)
if not restored:
raise HTTPException(status_code=404, detail=f"Archived task '{task_id}' not found")
return {"restored": task_id}