Source code for uniqc.gateway.db.archive_store

"""Archive store — moves tasks in/out of the ``archived_tasks`` table.

The archive lives in the same ``tasks.sqlite`` file as the main task store.
This module provides an ``ArchiveStore`` class that mirrors ``TaskStore``'s
interface but operates on the ``archived_tasks`` table.
"""

from __future__ import annotations

from datetime import datetime, timezone
from typing import Any

from uniqc.backend_adapter.task.store import TaskInfo, TaskStore


[docs] class ArchiveStore: """Persistent archive for completed / failed tasks. Tasks are moved atomically from ``tasks`` → ``archived_tasks`` so that the hot-path query on ``tasks`` stays fast even when many historical tasks have accumulated. """ def __init__(self) -> None: self._store = TaskStore()
[docs] def archive_task(self, task_id: str) -> bool: """Move a task from ``tasks`` to ``archived_tasks``. Cascades shards to ``archived_task_shards`` first; the task row's ``ON DELETE CASCADE`` would otherwise wipe them silently. Returns ``True`` if the task existed and was moved. """ with self._store._tx() as conn: row = conn.execute("SELECT * FROM tasks WHERE task_id = ?", (task_id,)).fetchone() if not row: return False archived_at = datetime.now(timezone.utc).isoformat() # Move shards first so we don't lose them via FK cascade. shard_rows = conn.execute( "SELECT * FROM task_shards WHERE uniqc_task_id = ?", (task_id,), ).fetchall() for srow in shard_rows: scols = list(srow.keys()) svalues = [srow[c] for c in scols] splaceholders = ",".join("?" * len(scols)) conn.execute( f"INSERT INTO archived_task_shards ({','.join(scols)}, archived_at) VALUES ({splaceholders}, ?)", svalues + [archived_at], ) cols = list(row.keys()) values = [row[c] for c in cols] conn.execute("DELETE FROM tasks WHERE task_id = ?", (task_id,)) placeholders = ",".join("?" * len(cols)) conn.execute( f"INSERT INTO archived_tasks ({','.join(cols)}, archived_at) VALUES ({placeholders}, ?)", values + [archived_at], ) return True
[docs] def restore_task(self, task_id: str) -> bool: """Move a task from ``archived_tasks`` back to ``tasks``. Cascades archived shards back to ``task_shards``. Returns ``True`` if the task existed and was restored. """ with self._store._tx() as conn: row = conn.execute("SELECT * FROM archived_tasks WHERE task_id = ?", (task_id,)).fetchone() if not row: return False cols = [c for c in row.keys() if c != "archived_at"] values = [row[c] for c in cols] conn.execute("DELETE FROM archived_tasks WHERE task_id = ?", (task_id,)) placeholders = ",".join("?" * len(cols)) conn.execute( f"INSERT INTO tasks ({','.join(cols)}) VALUES ({placeholders})", values, ) # Restore any archived shards. srows = conn.execute( "SELECT * FROM archived_task_shards WHERE uniqc_task_id = ?", (task_id,), ).fetchall() for srow in srows: scols = [c for c in srow.keys() if c != "archived_at"] svalues = [srow[c] for c in scols] splaceholders = ",".join("?" * len(scols)) conn.execute( f"INSERT INTO task_shards ({','.join(scols)}) VALUES ({splaceholders})", svalues, ) conn.execute( "DELETE FROM archived_task_shards WHERE uniqc_task_id = ?", (task_id,), ) return True
[docs] def list_archived( self, *, status: str | None = None, backend: str | None = None, limit: int | None = None, offset: int | None = None, ) -> list[TaskInfo]: """List archived tasks, newest-first by ``submit_time``.""" from uniqc.backend_adapter.task.store import _row_to_info conditions: list[str] = [] params: list[Any] = [] if status is not None: conditions.append("status = ?") params.append(status) if backend is not None: conditions.append("backend = ?") params.append(backend) sql = "SELECT * FROM archived_tasks" if conditions: sql += " WHERE " + " AND ".join(conditions) sql += " ORDER BY submit_time DESC, rowid DESC" if limit is not None: sql += " LIMIT ?" params.append(int(limit)) if offset is not None: if limit is None: sql += " LIMIT -1" sql += " OFFSET ?" params.append(int(offset)) with self._store._tx() as conn: rows = conn.execute(sql, params).fetchall() return [_row_to_info(row) for row in rows]
[docs] def get_archived(self, task_id: str) -> TaskInfo | None: """Return an archived task by id, or ``None``.""" from uniqc.backend_adapter.task.store import _row_to_info with self._store._tx() as conn: row = conn.execute("SELECT * FROM archived_tasks WHERE task_id = ?", (task_id,)).fetchone() return _row_to_info(row) if row is not None else None
[docs] def delete_archived(self, task_id: str) -> bool: """Permanently remove an archived task. Returns ``True`` if it existed.""" with self._store._tx() as conn: conn.execute( "DELETE FROM archived_task_shards WHERE uniqc_task_id = ?", (task_id,), ) cur = conn.execute("DELETE FROM archived_tasks WHERE task_id = ?", (task_id,)) return cur.rowcount > 0
[docs] def count_archived(self, *, status: str | None = None, backend: str | None = None) -> int: """Return total number of archived tasks.""" conditions: list[str] = [] params: list[Any] = [] if status is not None: conditions.append("status = ?") params.append(status) if backend is not None: conditions.append("backend = ?") params.append(backend) sql = "SELECT COUNT(*) AS c FROM archived_tasks" if conditions: sql += " WHERE " + " AND ".join(conditions) with self._store._tx() as conn: row = conn.execute(sql, params).fetchone() return int(row["c"]) if row is not None else 0