uniqc.task_manager module#
Task management with local caching for quantum computing backends.
This module provides a unified interface for submitting quantum tasks,
managing task lifecycle, and caching results locally. All persistent
storage is delegated to uniqc.task.store.TaskStore (SQLite at
~/.uniqc/cache/tasks.sqlite).
- Environment Variables:
- UNIQC_DUMMY: Set to ‘true’, ‘1’, or ‘yes’ to enable dummy mode.
When enabled, all task submissions use local simulation instead of real quantum backends. Useful for development and testing.
Usage:
from uniqc.task_manager import submit_task, query_task, wait_for_result
from uniqc.circuit_builder import Circuit
from uniqc.backend import get_backend
# Create a circuit
circuit = Circuit()
circuit.h(0)
circuit.cnot(0, 1)
circuit.measure(0, 1)
# Submit task (use UNIQC_DUMMY=true for local simulation)
task_id = submit_task(circuit, backend='quafu', shots=1000)
# Wait for result
result = wait_for_result(task_id, backend='quafu', timeout=300)
# Query task status
info = query_task(task_id, backend='quafu')
print(info['status']) # 'running', 'success', or 'failed'
# Explicitly use dummy mode for a single submission
task_id = submit_task(circuit, backend='quafu', dummy=True)
- class uniqc.task_manager.TaskManager(cache_dir=None)[source]#
Bases:
objectHigh-level task manager for quantum computing workflows.
This class provides a convenient interface for managing quantum tasks with persistent caching and batch operations.
Example
>>> manager = TaskManager() >>> task_id = manager.submit(circuit, backend='quafu', shots=1000) >>> result = manager.wait_for_result(task_id) >>> print(result)
- Parameters:
cache_dir (Path | str | None)
- uniqc.task_manager.clear_cache(cache_dir=None)[source]#
Clear all tasks from the cache by deleting the SQLite file.
- Parameters:
cache_dir (Path | None) – Optional custom cache directory.
- Return type:
None
- uniqc.task_manager.clear_completed_tasks(cache_dir=None)[source]#
Remove completed tasks from the cache.
- uniqc.task_manager.is_dummy_mode()[source]#
Check if dummy mode is enabled via environment variable.
- Returns:
True if UNIQC_DUMMY is set to ‘true’, ‘1’, or ‘yes’.
- Return type:
- uniqc.task_manager.list_tasks(status=None, backend=None, cache_dir=None)[source]#
List tasks from the local cache.
- uniqc.task_manager.query_task(task_id, backend=None)[source]#
Query the status of a task.
This function queries the backend for the current status of a task and updates the local cache.
- Parameters:
- Returns:
TaskInfo with current status and result if available.
- Raises:
TaskNotFoundError – If the task is not found locally or remotely.
BackendNotFoundError – If the backend is not recognized.
NetworkError – If a network error occurs.
- Return type:
Example
>>> info = query_task('task-123', backend='quafu') >>> print(info.status) 'success'
- uniqc.task_manager.submit_batch(circuits, backend, shots=1000, dummy=None, **kwargs)[source]#
Submit multiple circuits as a batch to a quantum backend.
- Parameters:
circuits (list['Circuit']) – List of UnifiedQuantum Circuits to submit.
backend (str) – The backend name.
shots (int) – Number of measurement shots per circuit.
dummy (bool | None) – Override dummy mode. If None, uses UNIQC_DUMMY env var.
**kwargs (Any) – Additional backend-specific parameters. - For Quafu: chip_id, auto_mapping, group_name - For OriginQ: backend_name (e.g., ‘origin:wuyuan:d5’), circuit_optimize
- Returns:
List of task IDs assigned by the backend.
- Raises:
BackendNotFoundError – If the backend is not recognized.
BackendNotAvailableError – If the backend is not available.
AuthenticationError – If authentication fails.
InsufficientCreditsError – If account has insufficient credits.
QuotaExceededError – If usage quota is exceeded.
NetworkError – If a network error occurs.
- Return type:
Example
>>> circuits = [circuit1, circuit2, circuit3] >>> task_ids = submit_batch(circuits, backend='quafu', shots=1000, chip_id='ScQ-P10')
- uniqc.task_manager.submit_task(circuit, backend, shots=1000, metadata=None, dummy=None, **kwargs)[source]#
Submit a single circuit to a quantum backend.
This function converts the circuit to the backend’s native format, submits it, and caches the task information locally.
- Parameters:
circuit (Circuit) – The UnifiedQuantum Circuit to submit.
backend (str) – The backend name (e.g., ‘originq’, ‘quafu’, ‘ibm’).
shots (int) – Number of measurement shots.
metadata (dict | None) – Optional metadata to store with the task.
dummy (bool | None) – Override dummy mode. If None, uses UNIQC_DUMMY env var. When True, uses local simulation instead of real backend.
**kwargs (Any) – Additional backend-specific parameters. - For Quafu: chip_id, auto_mapping - For OriginQ: backend_name (e.g., ‘origin:wuyuan:d5’), circuit_optimize, measurement_amend
- Returns:
The task ID assigned by the backend.
- Raises:
BackendNotFoundError – If the backend is not recognized.
BackendNotAvailableError – If the backend is not available.
AuthenticationError – If authentication fails.
InsufficientCreditsError – If account has insufficient credits.
QuotaExceededError – If usage quota is exceeded.
NetworkError – If a network error occurs.
- Return type:
Example
>>> circuit = Circuit() >>> circuit.h(0) >>> circuit.measure(0) >>> task_id = submit_task(circuit, backend='originq', shots=1000, backend_name='origin:wuyuan:d5') >>> # Use dummy mode for local simulation >>> task_id = submit_task(circuit, backend='quafu', dummy=True)
- uniqc.task_manager.wait_for_result(task_id, backend=None, timeout=300.0, poll_interval=5.0, raise_on_failure=True)[source]#
Wait for a task to complete and return its result.
This function polls the task status until it completes, fails, or the timeout is reached.
- Parameters:
task_id (str) – The task identifier.
backend (str | None) – The backend name. If None, attempts to look up from cache.
timeout (float) – Maximum time to wait in seconds.
poll_interval (float) – Time between status checks in seconds.
raise_on_failure (bool) – If True, raises TaskFailedError on task failure.
- Returns:
The task result dictionary if successful, None if timed out.
- Raises:
TaskTimeoutError – If the timeout is reached before completion.
TaskFailedError – If the task fails and raise_on_failure is True.
TaskNotFoundError – If the task is not found.
NetworkError – If a network error occurs.
- Return type:
dict | None
Example
>>> result = wait_for_result('task-123', backend='quafu', timeout=300) >>> print(result['counts']) {'00': 512, '11': 488}