uniqc.task_manager module#

Task management with local caching for quantum computing backends.

This module provides a unified interface for submitting quantum tasks, managing task lifecycle, and caching results locally. All persistent storage is delegated to uniqc.task.store.TaskStore (SQLite at ~/.uniqc/cache/tasks.sqlite).

Environment Variables:
UNIQC_DUMMY: Set to ‘true’, ‘1’, or ‘yes’ to enable dummy mode.

When enabled, all task submissions use local simulation instead of real quantum backends. Useful for development and testing.

Usage:

from uniqc.task_manager import submit_task, query_task, wait_for_result
from uniqc.circuit_builder import Circuit
from uniqc.backend import get_backend

# Create a circuit
circuit = Circuit()
circuit.h(0)
circuit.cnot(0, 1)
circuit.measure(0, 1)

# Submit task (use UNIQC_DUMMY=true for local simulation)
task_id = submit_task(circuit, backend='quafu', shots=1000)

# Wait for result
result = wait_for_result(task_id, backend='quafu', timeout=300)

# Query task status
info = query_task(task_id, backend='quafu')
print(info['status'])  # 'running', 'success', or 'failed'

# Explicitly use dummy mode for a single submission
task_id = submit_task(circuit, backend='quafu', dummy=True)
class uniqc.task_manager.TaskManager(cache_dir=None)[source]#

Bases: object

High-level task manager for quantum computing workflows.

This class provides a convenient interface for managing quantum tasks with persistent caching and batch operations.

Example

>>> manager = TaskManager()
>>> task_id = manager.submit(circuit, backend='quafu', shots=1000)
>>> result = manager.wait_for_result(task_id)
>>> print(result)
Parameters:

cache_dir (Path | str | None)

clear_cache()[source]#

Clear all tasks from cache.

Return type:

None

clear_completed()[source]#

Clear completed tasks from cache.

Return type:

int

list_tasks(status=None, backend=None)[source]#

List tasks from cache.

Parameters:
  • status (str | None)

  • backend (str | None)

Return type:

list[TaskInfo]

query(task_id, backend=None)[source]#

Query a task’s status.

Parameters:
  • task_id (str)

  • backend (str | None)

Return type:

TaskInfo

submit(circuit, backend, shots=1000, metadata=None, **kwargs)[source]#

Submit a single circuit.

Parameters:
Return type:

str

submit_batch(circuits, backend, shots=1000, **kwargs)[source]#

Submit multiple circuits as a batch.

Parameters:
  • circuits (list['Circuit'])

  • backend (str)

  • shots (int)

  • kwargs (Any)

Return type:

list[str]

wait_for_result(task_id, backend=None, timeout=300.0, poll_interval=5.0, raise_on_failure=True)[source]#

Wait for a task to complete.

Parameters:
Return type:

dict | None

uniqc.task_manager.clear_cache(cache_dir=None)[source]#

Clear all tasks from the cache by deleting the SQLite file.

Parameters:

cache_dir (Path | None) – Optional custom cache directory.

Return type:

None

uniqc.task_manager.clear_completed_tasks(cache_dir=None)[source]#

Remove completed tasks from the cache.

Parameters:

cache_dir (Path | None) – Optional custom cache directory.

Returns:

Number of tasks removed.

Return type:

int

uniqc.task_manager.get_task(task_id, cache_dir=None)[source]#

Get a task from the local cache.

Parameters:
  • task_id (str) – The task identifier.

  • cache_dir (Path | None) – Optional custom cache directory.

Returns:

TaskInfo if found, None otherwise.

Return type:

TaskInfo | None

uniqc.task_manager.is_dummy_mode()[source]#

Check if dummy mode is enabled via environment variable.

Returns:

True if UNIQC_DUMMY is set to ‘true’, ‘1’, or ‘yes’.

Return type:

bool

uniqc.task_manager.list_tasks(status=None, backend=None, cache_dir=None)[source]#

List tasks from the local cache.

Parameters:
  • status (str | None) – Filter by status (optional).

  • backend (str | None) – Filter by backend (optional).

  • cache_dir (Path | None) – Optional custom cache directory.

Returns:

List of TaskInfo objects matching the filters, newest first.

Return type:

list[TaskInfo]

uniqc.task_manager.query_task(task_id, backend=None)[source]#

Query the status of a task.

This function queries the backend for the current status of a task and updates the local cache.

Parameters:
  • task_id (str) – The task identifier.

  • backend (str | None) – The backend name. If None, attempts to look up from cache. Prefer using None to let the system auto-detect the correct backend.

Returns:

TaskInfo with current status and result if available.

Raises:
Return type:

TaskInfo

Example

>>> info = query_task('task-123', backend='quafu')
>>> print(info.status)
'success'
uniqc.task_manager.save_task(task_info, cache_dir=None)[source]#

Save a task to the local cache.

Parameters:
  • task_info (TaskInfo) – Task information to save.

  • cache_dir (Path | None) – Optional custom cache directory.

Return type:

None

uniqc.task_manager.submit_batch(circuits, backend, shots=1000, dummy=None, **kwargs)[source]#

Submit multiple circuits as a batch to a quantum backend.

Parameters:
  • circuits (list['Circuit']) – List of UnifiedQuantum Circuits to submit.

  • backend (str) – The backend name.

  • shots (int) – Number of measurement shots per circuit.

  • dummy (bool | None) – Override dummy mode. If None, uses UNIQC_DUMMY env var.

  • **kwargs (Any) – Additional backend-specific parameters. - For Quafu: chip_id, auto_mapping, group_name - For OriginQ: backend_name (e.g., ‘origin:wuyuan:d5’), circuit_optimize

Returns:

List of task IDs assigned by the backend.

Raises:
Return type:

list[str]

Example

>>> circuits = [circuit1, circuit2, circuit3]
>>> task_ids = submit_batch(circuits, backend='quafu', shots=1000, chip_id='ScQ-P10')
uniqc.task_manager.submit_task(circuit, backend, shots=1000, metadata=None, dummy=None, **kwargs)[source]#

Submit a single circuit to a quantum backend.

This function converts the circuit to the backend’s native format, submits it, and caches the task information locally.

Parameters:
  • circuit (Circuit) – The UnifiedQuantum Circuit to submit.

  • backend (str) – The backend name (e.g., ‘originq’, ‘quafu’, ‘ibm’).

  • shots (int) – Number of measurement shots.

  • metadata (dict | None) – Optional metadata to store with the task.

  • dummy (bool | None) – Override dummy mode. If None, uses UNIQC_DUMMY env var. When True, uses local simulation instead of real backend.

  • **kwargs (Any) – Additional backend-specific parameters. - For Quafu: chip_id, auto_mapping - For OriginQ: backend_name (e.g., ‘origin:wuyuan:d5’), circuit_optimize, measurement_amend

Returns:

The task ID assigned by the backend.

Raises:
Return type:

str

Example

>>> circuit = Circuit()
>>> circuit.h(0)
>>> circuit.measure(0)
>>> task_id = submit_task(circuit, backend='originq', shots=1000, backend_name='origin:wuyuan:d5')
>>> # Use dummy mode for local simulation
>>> task_id = submit_task(circuit, backend='quafu', dummy=True)
uniqc.task_manager.wait_for_result(task_id, backend=None, timeout=300.0, poll_interval=5.0, raise_on_failure=True)[source]#

Wait for a task to complete and return its result.

This function polls the task status until it completes, fails, or the timeout is reached.

Parameters:
  • task_id (str) – The task identifier.

  • backend (str | None) – The backend name. If None, attempts to look up from cache.

  • timeout (float) – Maximum time to wait in seconds.

  • poll_interval (float) – Time between status checks in seconds.

  • raise_on_failure (bool) – If True, raises TaskFailedError on task failure.

Returns:

The task result dictionary if successful, None if timed out.

Raises:
Return type:

dict | None

Example

>>> result = wait_for_result('task-123', backend='quafu', timeout=300)
>>> print(result['counts'])
{'00': 512, '11': 488}