"""Task management with local caching for quantum computing backends.
This module provides a unified interface for submitting quantum tasks,
managing task lifecycle, and caching results locally. All persistent
storage is delegated to :class:`uniqc.task.store.TaskStore` (SQLite at
``~/.uniqc/cache/tasks.sqlite``).
Environment Variables:
UNIQC_DUMMY: Set to 'true', '1', or 'yes' to enable dummy mode.
When enabled, all task submissions use local simulation instead
of real quantum backends. Useful for development and testing.
Usage::
from uniqc.task_manager import submit_task, query_task, wait_for_result
from uniqc.circuit_builder import Circuit
from uniqc.backend import get_backend
# Create a circuit
circuit = Circuit()
circuit.h(0)
circuit.cnot(0, 1)
circuit.measure(0, 1)
# Submit task (use UNIQC_DUMMY=true for local simulation)
task_id = submit_task(circuit, backend='quafu', shots=1000)
# Wait for result
result = wait_for_result(task_id, backend='quafu', timeout=300)
# Query task status
info = query_task(task_id, backend='quafu')
print(info['status']) # 'running', 'success', or 'failed'
# Explicitly use dummy mode for a single submission
task_id = submit_task(circuit, backend='quafu', dummy=True)
"""
from __future__ import annotations
__all__ = [
# Task submission
"submit_task",
"submit_batch",
# Task query
"query_task",
"wait_for_result",
# Cache management
"save_task",
"get_task",
"list_tasks",
"clear_completed_tasks",
"clear_cache",
# Classes
"TaskInfo",
"TaskStatus",
"TaskManager",
# Dummy mode
"UNIQC_DUMMY",
"is_dummy_mode",
# Storage path (useful for tests / tooling)
"DEFAULT_CACHE_DIR",
]
import os
import time
from pathlib import Path
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from uniqc.circuit_builder.qcircuit import Circuit
from uniqc import backend as backend_module
from uniqc.circuit_adapter import (
CircuitAdapter,
IBMCircuitAdapter,
OriginQCircuitAdapter,
QuafuCircuitAdapter,
)
from uniqc.exceptions import (
AuthenticationError,
BackendNotAvailableError,
BackendNotFoundError,
InsufficientCreditsError,
NetworkError,
QuotaExceededError,
TaskFailedError,
TaskNotFoundError,
TaskTimeoutError,
)
from uniqc.task.adapters.base import (
TASK_STATUS_FAILED,
TASK_STATUS_RUNNING,
TASK_STATUS_SUCCESS,
)
from uniqc.task.store import (
DEFAULT_CACHE_DIR,
TaskInfo,
TaskStatus,
TaskStore,
)
# -----------------------------------------------------------------------------
# Configuration
# -----------------------------------------------------------------------------
# Environment variable for global dummy mode
UNIQC_DUMMY = os.environ.get("UNIQC_DUMMY", "").lower() in ("true", "1", "yes")
[docs]
def is_dummy_mode() -> bool:
"""Check if dummy mode is enabled via environment variable.
Returns:
True if UNIQC_DUMMY is set to 'true', '1', or 'yes'.
"""
return UNIQC_DUMMY
# -----------------------------------------------------------------------------
# Circuit Adapter Mapping
# -----------------------------------------------------------------------------
ADAPTER_MAP: dict[str, type[CircuitAdapter]] = {
"originq": OriginQCircuitAdapter,
"quafu": QuafuCircuitAdapter,
"ibm": IBMCircuitAdapter,
}
def _get_adapter(backend_name: str) -> CircuitAdapter:
"""Get the appropriate circuit adapter for a backend.
Args:
backend_name: The name of the backend.
Returns:
CircuitAdapter instance for the backend.
Raises:
BackendNotFoundError: If no adapter exists for the backend.
"""
if backend_name not in ADAPTER_MAP:
available = ", ".join(ADAPTER_MAP.keys())
raise BackendNotFoundError(
f"No circuit adapter for backend '{backend_name}'. "
f"Available adapters: {available}"
)
return ADAPTER_MAP[backend_name]()
# -----------------------------------------------------------------------------
# Cache Management
# -----------------------------------------------------------------------------
def _store(cache_dir: Path | None = None) -> TaskStore:
"""Return a :class:`TaskStore` bound to ``cache_dir`` (or the default)."""
return TaskStore(cache_dir)
[docs]
def save_task(task_info: TaskInfo, cache_dir: Path | None = None) -> None:
"""Save a task to the local cache.
Args:
task_info: Task information to save.
cache_dir: Optional custom cache directory.
"""
_store(cache_dir).save(task_info)
[docs]
def get_task(task_id: str, cache_dir: Path | None = None) -> TaskInfo | None:
"""Get a task from the local cache.
Args:
task_id: The task identifier.
cache_dir: Optional custom cache directory.
Returns:
TaskInfo if found, None otherwise.
"""
return _store(cache_dir).get(task_id)
[docs]
def list_tasks(
status: str | None = None,
backend: str | None = None,
cache_dir: Path | None = None,
) -> list[TaskInfo]:
"""List tasks from the local cache.
Args:
status: Filter by status (optional).
backend: Filter by backend (optional).
cache_dir: Optional custom cache directory.
Returns:
List of TaskInfo objects matching the filters, newest first.
"""
return _store(cache_dir).list(status=status, backend=backend)
[docs]
def clear_completed_tasks(cache_dir: Path | None = None) -> int:
"""Remove completed tasks from the cache.
Args:
cache_dir: Optional custom cache directory.
Returns:
Number of tasks removed.
"""
return _store(cache_dir).clear_completed()
[docs]
def clear_cache(cache_dir: Path | None = None) -> None:
"""Clear all tasks from the cache by deleting the SQLite file.
Args:
cache_dir: Optional custom cache directory.
"""
_store(cache_dir).clear_all()
# -----------------------------------------------------------------------------
# Error Handling
# -----------------------------------------------------------------------------
def _map_adapter_error(error: Exception, backend_name: str) -> Exception:
"""Map an adapter error to a UnifiedQuantumError.
Args:
error: The original error from the adapter.
backend_name: The name of the backend.
Returns:
A UnifiedQuantumError subclass or the original error.
"""
error_message = str(error).lower()
# Check for authentication errors
if any(keyword in error_message for keyword in ["unauthorized", "invalid token", "authentication", "auth"]):
return AuthenticationError(
f"Authentication failed for backend '{backend_name}'. "
"Please check your API token or credentials.",
details={"original_error": str(error)},
)
# Check for credit/quota errors
if any(keyword in error_message for keyword in ["credit", "balance", "payment", "billing"]):
return InsufficientCreditsError(
f"Insufficient credits for backend '{backend_name}'. "
"Please top up your account.",
details={"original_error": str(error)},
)
if any(keyword in error_message for keyword in ["quota", "limit exceeded", "rate limit"]):
return QuotaExceededError(
f"Quota exceeded for backend '{backend_name}'. "
"Please try again later or upgrade your plan.",
details={"original_error": str(error)},
)
# Check for network errors
if any(keyword in error_message for keyword in ["connection", "timeout", "network", "dns", "refused"]):
return NetworkError(
f"Network error while communicating with backend '{backend_name}'.",
details={"original_error": str(error)},
)
return error
# -----------------------------------------------------------------------------
# Task Submission
# -----------------------------------------------------------------------------
[docs]
def submit_task(
circuit: "Circuit",
backend: str,
shots: int = 1000,
metadata: dict | None = None,
dummy: bool | None = None,
**kwargs: Any,
) -> str:
"""Submit a single circuit to a quantum backend.
This function converts the circuit to the backend's native format,
submits it, and caches the task information locally.
Args:
circuit: The UnifiedQuantum Circuit to submit.
backend: The backend name (e.g., 'originq', 'quafu', 'ibm').
shots: Number of measurement shots.
metadata: Optional metadata to store with the task.
dummy: Override dummy mode. If None, uses UNIQC_DUMMY env var.
When True, uses local simulation instead of real backend.
**kwargs: Additional backend-specific parameters.
- For Quafu: chip_id, auto_mapping
- For OriginQ: backend_name (e.g., 'origin:wuyuan:d5'), circuit_optimize, measurement_amend
Returns:
The task ID assigned by the backend.
Raises:
BackendNotFoundError: If the backend is not recognized.
BackendNotAvailableError: If the backend is not available.
AuthenticationError: If authentication fails.
InsufficientCreditsError: If account has insufficient credits.
QuotaExceededError: If usage quota is exceeded.
NetworkError: If a network error occurs.
Example:
>>> circuit = Circuit()
>>> circuit.h(0)
>>> circuit.measure(0)
>>> task_id = submit_task(circuit, backend='originq', shots=1000, backend_name='origin:wuyuan:d5')
>>> # Use dummy mode for local simulation
>>> task_id = submit_task(circuit, backend='quafu', dummy=True)
"""
# Determine if dummy mode should be used
use_dummy = dummy if dummy is not None else UNIQC_DUMMY
if use_dummy:
# Use dummy adapter for local simulation
return _submit_dummy(circuit, backend, shots, metadata, **kwargs)
# Get backend instance
try:
backend_instance = backend_module.get_backend(backend)
except ValueError as e:
raise BackendNotFoundError(str(e)) from e
# Check backend availability
if not backend_instance.is_available():
raise BackendNotAvailableError(
f"Backend '{backend}' is not available. "
"Please check your configuration and credentials."
)
# Convert circuit using adapter
try:
adapter = _get_adapter(backend)
native_circuit = adapter.adapt(circuit)
except Exception as e:
raise _map_adapter_error(e, backend) from e
# Submit to backend
try:
task_id = backend_instance.submit(native_circuit, shots=shots, **kwargs)
except Exception as e:
mapped_error = _map_adapter_error(e, backend)
raise mapped_error from e
# Create and save task info
task_info = TaskInfo(
task_id=task_id,
backend=backend,
status=TaskStatus.RUNNING,
shots=shots,
metadata=metadata or {},
)
save_task(task_info)
return task_id
def _submit_dummy(
circuit: "Circuit",
backend: str,
shots: int = 1000,
metadata: dict | None = None,
**kwargs: Any,
) -> str:
"""Submit a circuit using the dummy adapter for local simulation.
Args:
circuit: The UnifiedQuantum Circuit to simulate.
backend: The backend name (used for logging/metadata only).
shots: Number of measurement shots.
metadata: Optional metadata.
**kwargs: Additional parameters (passed to dummy adapter).
Returns:
Task ID from the dummy adapter.
"""
from uniqc.task.adapters.dummy_adapter import DummyAdapter
# Create dummy adapter
dummy_adapter = DummyAdapter(
noise_model=kwargs.get("noise_model"),
available_qubits=kwargs.get("available_qubits"),
available_topology=kwargs.get("available_topology"),
)
# Submit to dummy adapter
originir = circuit.originir
task_id = dummy_adapter.submit(originir, shots=shots)
# Get result from dummy adapter
result = dummy_adapter.query(task_id)
adapter_status = result.get("status", TASK_STATUS_RUNNING)
# Map adapter status to TaskStatus
status_map = {
TASK_STATUS_SUCCESS: TaskStatus.SUCCESS,
TASK_STATUS_FAILED: TaskStatus.FAILED,
TASK_STATUS_RUNNING: TaskStatus.RUNNING,
"pending": TaskStatus.PENDING,
"cancelled": TaskStatus.CANCELLED,
}
task_status = status_map.get(adapter_status, TaskStatus.FAILED)
# Create and save task info
task_info = TaskInfo(
task_id=task_id,
backend=f"dummy:{backend}",
status=task_status,
shots=shots,
metadata=metadata or {},
)
# Store result if successful
if adapter_status == TASK_STATUS_SUCCESS:
task_info.result = result.get("result")
save_task(task_info)
return task_id
[docs]
def submit_batch(
circuits: list["Circuit"],
backend: str,
shots: int = 1000,
dummy: bool | None = None,
**kwargs: Any,
) -> list[str]:
"""Submit multiple circuits as a batch to a quantum backend.
Args:
circuits: List of UnifiedQuantum Circuits to submit.
backend: The backend name.
shots: Number of measurement shots per circuit.
dummy: Override dummy mode. If None, uses UNIQC_DUMMY env var.
**kwargs: Additional backend-specific parameters.
- For Quafu: chip_id, auto_mapping, group_name
- For OriginQ: backend_name (e.g., 'origin:wuyuan:d5'), circuit_optimize
Returns:
List of task IDs assigned by the backend.
Raises:
BackendNotFoundError: If the backend is not recognized.
BackendNotAvailableError: If the backend is not available.
AuthenticationError: If authentication fails.
InsufficientCreditsError: If account has insufficient credits.
QuotaExceededError: If usage quota is exceeded.
NetworkError: If a network error occurs.
Example:
>>> circuits = [circuit1, circuit2, circuit3]
>>> task_ids = submit_batch(circuits, backend='quafu', shots=1000, chip_id='ScQ-P10')
"""
# Determine if dummy mode should be used
use_dummy = dummy if dummy is not None else UNIQC_DUMMY
if use_dummy:
# Use dummy adapter for local simulation
return _submit_batch_dummy(circuits, backend, shots, **kwargs)
# Get backend instance
try:
backend_instance = backend_module.get_backend(backend)
except ValueError as e:
raise BackendNotFoundError(str(e)) from e
# Check backend availability
if not backend_instance.is_available():
raise BackendNotAvailableError(
f"Backend '{backend}' is not available. "
"Please check your configuration and credentials."
)
# Convert circuits using adapter
try:
adapter = _get_adapter(backend)
native_circuits = adapter.adapt_batch(circuits)
except Exception as e:
raise _map_adapter_error(e, backend) from e
# Submit batch to backend
try:
result = backend_instance.submit_batch(native_circuits, shots=shots, **kwargs)
# Handle both list of task IDs and single group ID
if isinstance(result, list):
task_ids = result
else:
task_ids = [result]
except Exception as e:
mapped_error = _map_adapter_error(e, backend)
raise mapped_error from e
# Create and save task info for each task
for task_id in task_ids:
task_info = TaskInfo(
task_id=task_id,
backend=backend,
status=TaskStatus.RUNNING,
shots=shots,
metadata={"batch": True, "batch_size": len(circuits)},
)
save_task(task_info)
return task_ids
def _submit_batch_dummy(
circuits: list["Circuit"],
backend: str,
shots: int = 1000,
**kwargs: Any,
) -> list[str]:
"""Submit multiple circuits using the dummy adapter.
Args:
circuits: List of UnifiedQuantum Circuits to simulate.
backend: The backend name (used for logging/metadata only).
shots: Number of measurement shots per circuit.
**kwargs: Additional parameters.
Returns:
List of task IDs from the dummy adapter.
"""
from uniqc.task.adapters.dummy_adapter import DummyAdapter
# Create dummy adapter
dummy_adapter = DummyAdapter(
noise_model=kwargs.get("noise_model"),
available_qubits=kwargs.get("available_qubits"),
available_topology=kwargs.get("available_topology"),
)
# Submit all circuits
originir_circuits = [c.originir for c in circuits]
task_ids = dummy_adapter.submit_batch(originir_circuits, shots=shots)
# Create and save task info for each
for task_id in task_ids:
result = dummy_adapter.query(task_id)
adapter_status = result.get("status", TASK_STATUS_RUNNING)
# Map adapter status to TaskStatus
status_map = {
TASK_STATUS_SUCCESS: TaskStatus.SUCCESS,
TASK_STATUS_FAILED: TaskStatus.FAILED,
TASK_STATUS_RUNNING: TaskStatus.RUNNING,
}
task_status = status_map.get(adapter_status, TaskStatus.FAILED)
task_info = TaskInfo(
task_id=task_id,
backend=f"dummy:{backend}",
status=task_status,
shots=shots,
metadata={"batch": True, "batch_size": len(circuits)},
)
if adapter_status == TASK_STATUS_SUCCESS:
task_info.result = result.get("result")
save_task(task_info)
return task_ids
# -----------------------------------------------------------------------------
# Task Query
# -----------------------------------------------------------------------------
[docs]
def query_task(task_id: str, backend: str | None = None) -> TaskInfo:
"""Query the status of a task.
This function queries the backend for the current status of a task
and updates the local cache.
Args:
task_id: The task identifier.
backend: The backend name. If None, attempts to look up from cache.
Prefer using None to let the system auto-detect the correct backend.
Returns:
TaskInfo with current status and result if available.
Raises:
TaskNotFoundError: If the task is not found locally or remotely.
BackendNotFoundError: If the backend is not recognized.
NetworkError: If a network error occurs.
Example:
>>> info = query_task('task-123', backend='quafu')
>>> print(info.status)
'success'
"""
# Always prefer cached backend info to handle dummy mode correctly
cached_task = get_task(task_id)
if cached_task is not None:
# Use cached backend (e.g., 'dummy:originq' for dummy mode)
backend = cached_task.backend
# For dummy tasks, results are already stored - return cached info directly
if backend.startswith("dummy:"):
return cached_task
if backend is None:
raise TaskNotFoundError(
f"Task '{task_id}' not found in local cache. "
"Please provide the backend parameter."
)
# Get backend instance (strip 'dummy:' prefix if present)
actual_backend = backend.split(":", 1)[-1] if backend.startswith("dummy:") else backend
try:
backend_instance = backend_module.get_backend(actual_backend)
except ValueError as e:
raise BackendNotFoundError(str(e)) from e
# Query backend
try:
result = backend_instance.query(task_id)
except Exception as e:
mapped_error = _map_adapter_error(e, backend)
if isinstance(mapped_error, NetworkError):
raise mapped_error from e
# For other errors, try to use cached info
cached_task = get_task(task_id)
if cached_task is not None:
return cached_task
raise TaskNotFoundError(
f"Task '{task_id}' not found: {e}",
task_id=task_id,
) from e
# Map adapter status to TaskStatus
adapter_status = result.get("status", TASK_STATUS_RUNNING)
status_map = {
TASK_STATUS_SUCCESS: TaskStatus.SUCCESS,
TASK_STATUS_FAILED: TaskStatus.FAILED,
TASK_STATUS_RUNNING: TaskStatus.RUNNING,
"pending": TaskStatus.PENDING,
"cancelled": TaskStatus.CANCELLED,
}
task_status = status_map.get(adapter_status, TaskStatus.PENDING)
# Update task info
task_info = TaskInfo(
task_id=task_id,
backend=backend,
status=task_status,
result=result.get("result") if task_status == TaskStatus.SUCCESS else None,
)
# Merge with existing metadata if available
cached_task = get_task(task_id)
if cached_task is not None:
task_info.submit_time = cached_task.submit_time
task_info.shots = cached_task.shots
task_info.metadata = cached_task.metadata
save_task(task_info)
return task_info
[docs]
def wait_for_result(
task_id: str,
backend: str | None = None,
timeout: float = 300.0,
poll_interval: float = 5.0,
raise_on_failure: bool = True,
) -> dict | None:
"""Wait for a task to complete and return its result.
This function polls the task status until it completes, fails, or
the timeout is reached.
Args:
task_id: The task identifier.
backend: The backend name. If None, attempts to look up from cache.
timeout: Maximum time to wait in seconds.
poll_interval: Time between status checks in seconds.
raise_on_failure: If True, raises TaskFailedError on task failure.
Returns:
The task result dictionary if successful, None if timed out.
Raises:
TaskTimeoutError: If the timeout is reached before completion.
TaskFailedError: If the task fails and raise_on_failure is True.
TaskNotFoundError: If the task is not found.
NetworkError: If a network error occurs.
Example:
>>> result = wait_for_result('task-123', backend='quafu', timeout=300)
>>> print(result['counts'])
{'00': 512, '11': 488}
"""
start_time = time.time()
while True:
# Query current status
task_info = query_task(task_id, backend)
# Check if completed
if task_info.status == TaskStatus.SUCCESS:
return task_info.result
# Check if failed
if task_info.status == TaskStatus.FAILED:
if raise_on_failure:
raise TaskFailedError(
f"Task '{task_id}' failed on backend '{task_info.backend}'.",
task_id=task_id,
backend=task_info.backend,
)
return None
# Check timeout
elapsed = time.time() - start_time
if elapsed >= timeout:
raise TaskTimeoutError(
f"Timeout waiting for task '{task_id}' to complete.",
task_id=task_id,
timeout=timeout,
)
# Wait before next poll
time.sleep(poll_interval)
# -----------------------------------------------------------------------------
# TaskManager Class
# -----------------------------------------------------------------------------
[docs]
class TaskManager:
"""High-level task manager for quantum computing workflows.
This class provides a convenient interface for managing quantum tasks
with persistent caching and batch operations.
Example:
>>> manager = TaskManager()
>>> task_id = manager.submit(circuit, backend='quafu', shots=1000)
>>> result = manager.wait_for_result(task_id)
>>> print(result)
"""
def __init__(self, cache_dir: Path | str | None = None) -> None:
"""Initialize the TaskManager.
Args:
cache_dir: Optional custom cache directory.
"""
self._cache_dir = Path(cache_dir) if cache_dir else None
[docs]
def submit(
self,
circuit: "Circuit",
backend: str,
shots: int = 1000,
metadata: dict | None = None,
**kwargs: Any,
) -> str:
"""Submit a single circuit."""
return submit_task(
circuit,
backend,
shots=shots,
metadata=metadata,
**kwargs,
)
[docs]
def submit_batch(
self,
circuits: list["Circuit"],
backend: str,
shots: int = 1000,
**kwargs: Any,
) -> list[str]:
"""Submit multiple circuits as a batch."""
return submit_batch(
circuits,
backend,
shots=shots,
**kwargs,
)
[docs]
def query(self, task_id: str, backend: str | None = None) -> TaskInfo:
"""Query a task's status."""
return query_task(task_id, backend)
[docs]
def wait_for_result(
self,
task_id: str,
backend: str | None = None,
timeout: float = 300.0,
poll_interval: float = 5.0,
raise_on_failure: bool = True,
) -> dict | None:
"""Wait for a task to complete."""
return wait_for_result(
task_id,
backend,
timeout=timeout,
poll_interval=poll_interval,
raise_on_failure=raise_on_failure,
)
[docs]
def list_tasks(
self,
status: str | None = None,
backend: str | None = None,
) -> list[TaskInfo]:
"""List tasks from cache."""
return list_tasks(status, backend, cache_dir=self._cache_dir)
[docs]
def clear_completed(self) -> int:
"""Clear completed tasks from cache."""
return clear_completed_tasks(cache_dir=self._cache_dir)
[docs]
def clear_cache(self) -> None:
"""Clear all tasks from cache."""
clear_cache(cache_dir=self._cache_dir)