uniqc.backend_adapter.task_manager module

Task management with local caching for quantum computing backends.

This module provides a unified interface for submitting quantum tasks, managing task lifecycle, and caching results locally. All persistent storage is delegated to uniqc.backend_adapter.task.store.TaskStore (SQLite at ~/.uniqc/cache/tasks.sqlite).

Usage:

from uniqc.backend_adapter.task_manager import submit_task, query_task, wait_for_result, dry_run_task
from uniqc.circuit_builder import Circuit

# Create a circuit
circuit = Circuit()
circuit.h(0)
circuit.cnot(0, 1)
circuit.measure(0, 1)

# Dry-run: validate circuit offline before submitting
result = dry_run_task(circuit, backend='quafu:ScQ-P18', shots=1000)
if not result.success:
    print(f"Validation failed: {result.error}")

# Submit task
task_id = submit_task(circuit, backend='quafu:ScQ-P18', shots=1000)

# Wait for result (backend is auto-resolved from the cached TaskInfo)
result = wait_for_result(task_id, timeout=300)

# Query task status
info = query_task(task_id)
print(info['status'])  # 'running', 'success', or 'failed'

# Use dummy backend for local simulation
task_id = submit_task(circuit, backend='dummy:local:simulator', shots=1000)

Note

Any dry-run success followed by actual submission failure is a critical bug. Please report it at the UnifiedQuantum issue tracker.

class uniqc.backend_adapter.task_manager.TaskManager(cache_dir=None)[source]

Bases: object

High-level task manager for quantum computing workflows.

This class provides a convenient interface for managing quantum tasks with persistent caching and batch operations.

Example

>>> manager = TaskManager()
>>> task_id = manager.submit(circuit, backend='quafu:ScQ-P18', shots=1000)
>>> result = manager.wait_for_result(task_id)
>>> print(result)
Parameters:

cache_dir (Path | str | None)

clear_cache()[source]

Clear all tasks from cache.

Return type:

None

clear_completed()[source]

Clear completed tasks from cache.

Return type:

int

list_tasks(status=None, backend=None, *, limit=None, offset=None)[source]

List tasks from cache.

Parameters:
  • status (str | None)

  • backend (str | None)

  • limit (int | None)

  • offset (int | None)

Return type:

list[TaskInfo]

query(task_id, backend=None)[source]

Query a task’s status.

Parameters:
  • task_id (str)

  • backend (str | None)

Return type:

TaskInfo

submit(circuit, backend, shots=1000, metadata=None, **kwargs)[source]

Submit a single circuit.

Parameters:
Return type:

str

submit_batch(circuits, backend, shots=1000, **kwargs)[source]

Submit multiple circuits as a batch.

Parameters:
Return type:

list[str]

wait_for_result(task_id, timeout=300.0, poll_interval=5.0, raise_on_failure=True)[source]

Wait for a task to complete. See wait_for_result().

Parameters:
Return type:

UnifiedResult | list[UnifiedResult] | None

uniqc.backend_adapter.task_manager.clear_cache(cache_dir=None)[source]

Clear all tasks from the cache by deleting the SQLite file.

Parameters:

cache_dir (Path | None) – Optional custom cache directory.

Return type:

None

uniqc.backend_adapter.task_manager.clear_completed_tasks(cache_dir=None, status=None)[source]

Remove completed tasks from the cache.

Parameters:
  • cache_dir (Path | None) – Optional custom cache directory.

  • status (str | None) – If given, only remove tasks with this status (case-insensitive, same normalization as list_tasks). When None, removes all tasks in any terminal status (success, failed, cancelled, …).

Returns:

Number of tasks removed.

Return type:

int

uniqc.backend_adapter.task_manager.dry_run_batch(circuits, backend, shots=1000, **kwargs)[source]

Validate multiple circuits against a backend without making network calls.

Runs dry_run_task() on each circuit in sequence and returns a list of results, one per circuit in input order.

Parameters:
  • circuits (list[Circuit]) – List of UnifiedQuantum Circuits to validate.

  • backend (str) – The backend name.

  • shots (int) – Number of measurement shots per circuit.

  • **kwargs (Any) – Additional backend-specific parameters.

Returns:

List of DryRunResult, one per circuit in input order.

Return type:

list[DryRunResult]

Note

Any dry-run success followed by actual submission failure is a critical bug. Please report it at the UnifiedQuantum issue tracker.

uniqc.backend_adapter.task_manager.dry_run_task(circuit, backend, shots=1000, **kwargs)[source]

Validate a circuit against a backend without making network calls.

This performs a dry-run validation that checks: 1. The circuit can be successfully translated to the platform’s native format. 2. The resulting native circuit object is structurally valid. 3. The qubit count fits within the backend’s limits (where determinable offline). 4. Gate basis compatibility is confirmed (where determinable offline).

This function makes NO cloud API calls.

Parameters:
  • circuit (Circuit) – The UnifiedQuantum Circuit to validate.

  • backend (str) – The backend identifier. Accepts the same forms as submit_task(), including '<platform>:<chip>' (e.g. 'originq:WK_C180', 'dummy:originq:WK_C180').

  • shots (int) – Number of measurement shots for validation.

  • **kwargs (Any) –

    Additional backend-specific parameters. - For IBM: chip_id (required for full validation) - For Quafu: chip_id (required for full validation) - For OriginQ: backend_name (e.g., ‘WK_C180’). When backend

    already contains the chip suffix ('originq:WK_C180') the chip is extracted automatically and forwarded as backend_name.

Returns:

DryRunResult indicating success or failure with details and warnings.

Return type:

DryRunResult

Example

>>> from uniqc.circuit_builder import Circuit
>>> circuit = Circuit()
>>> circuit.h(0)
>>> circuit.measure(0)
>>> result = dry_run_task(circuit, backend='quafu:ScQ-P18', shots=1000)
>>> if result.success:
...     print("Circuit is valid for submission")
>>> else:
...     print(f"Validation failed: {result.error}")

Note

Any dry-run success followed by actual submission failure is a critical bug. Please report it at the UnifiedQuantum issue tracker.

uniqc.backend_adapter.task_manager.get_platform_task_ids(task_id)[source]

Return the shard mapping for a uniqc task id.

Each TaskShard records:

  • platform_task_id — the id assigned by the cloud platform

  • shard_index — 0-based ordering within the parent

  • circuit_count — number of circuits packed into this shard

  • sub_index_offset — index of this shard’s first circuit in the original submit_batch() list

  • status / error_message — per-shard liveness

Parameters:

task_id (str) – A uniqc task id (uqt_*). For backwards compatibility this will also accept a platform task id and emit a DeprecationWarning while resolving the parent.

Returns:

Shards in submission order. Empty list when the task has no shards yet (e.g. submission failed before any shard was persisted) or when task_id is unknown.

Return type:

list[TaskShard]

uniqc.backend_adapter.task_manager.get_result(task_id, timeout=300.0, poll_interval=5.0, raise_on_failure=True)[source]

Blocking retrieval: wait until task completes or timeout.

This is a convenience alias for wait_for_result(). The name get_result emphasises the “I want the answer” pattern, while wait_for_result emphasises the blocking behaviour.

Parameters:
  • task_id (str) – The task identifier.

  • timeout (float) – Maximum time to wait in seconds (default 300).

  • poll_interval (float) – Seconds between status checks (default 5).

  • raise_on_failure (bool) – If True (default), raise TaskFailedError when the task fails. If False, return None on failure.

Returns:

UnifiedResult for single-circuit tasks, or list[UnifiedResult] for native batch submissions. Returns None if the task failed and raise_on_failure is False.

Raises:
Return type:

UnifiedResult | list[UnifiedResult] | None

uniqc.backend_adapter.task_manager.get_task(task_id, cache_dir=None)[source]

Get a task from the local cache.

Parameters:
  • task_id (str) – The task identifier.

  • cache_dir (Path | None) – Optional custom cache directory.

Returns:

TaskInfo if found, None otherwise.

Return type:

TaskInfo | None

uniqc.backend_adapter.task_manager.list_tasks(status=None, backend=None, cache_dir=None, *, limit=None, offset=None)[source]

List tasks from the local cache.

Parameters:
  • status (str | None) – Filter by status (optional).

  • backend (str | None) – Filter by backend (optional).

  • cache_dir (Path | None) – Optional custom cache directory.

  • limit (int | None) – If set, push a SQL LIMIT to the underlying store so very large caches (thousands of tasks, gigabytes of result blobs) do not have to be fully materialised in memory before being sliced.

  • offset (int | None) – SQL OFFSET for paginated reads. Implies limit.

Returns:

List of TaskInfo objects matching the filters, newest first.

Return type:

list[TaskInfo]

uniqc.backend_adapter.task_manager.poll_result(task_id)[source]

Non-blocking status check: return current task status/result without waiting.

Unlike wait_for_result(), this returns immediately with the latest cached status. Call it in a loop if you want to poll without blocking.

Parameters:

task_id (str) – The task identifier (uqt_* or platform id).

Returns:

TaskInfo with current status. Check .status for TaskStatus.SUCCESS, TaskStatus.FAILED, TaskStatus.RUNNING, etc. If the task has completed, .result will be populated.

Return type:

TaskInfo

Example

>>> task_id = submit_task(circuit, backend='originq:WK_C180')
>>> while True:
...     info = poll_result(task_id)
...     if info.status in (TaskStatus.SUCCESS, TaskStatus.FAILED):
...         break
...     time.sleep(2)
uniqc.backend_adapter.task_manager.query_task(task_id, backend=None)[source]

Query the status of a task.

For uniqc-managed task ids, refreshes each shard’s status from its backend (best-effort), then denormalises the aggregate status onto the parent tasks row. The returned TaskInfo therefore reflects the freshest known state.

For legacy platform task ids (pre-v4 cache rows or rows without shards), falls back to the historical direct-query path.

Parameters:
  • task_id (str) – The task identifier. Accepts a uniqc id (uqt_*) or, with a deprecation warning, a raw platform id.

  • backend (str | None) – Ignored when the task is found in cache (the backend is resolved from the stored shards). Only used for legacy direct-query fallback.

Returns:

TaskInfo with current aggregated status.

Return type:

TaskInfo

uniqc.backend_adapter.task_manager.save_task(task_info, cache_dir=None)[source]

Save a task to the local cache.

Parameters:
  • task_info (TaskInfo) – Task information to save.

  • cache_dir (Path | None) – Optional custom cache directory.

Return type:

None

uniqc.backend_adapter.task_manager.submit_batch(circuits, backend, shots=1000, options=None, *, local_compile=1, cloud_compile=1, backend_name=None, chip_id=None, native_batch=True, return_platform_ids=False, **kwargs)[source]

Submit multiple circuits as a batch and return a single uniqc task id.

uniqc maintains an internal mapping from one uqt_* task id to one or more platform-issued task ids. The user manages exactly one handle, regardless of the underlying platform’s batch capabilities:

  • For platforms with native batch (OriginQ, IBM) — circuits are packed into one platform job per shard. uniqc auto-shards if the batch exceeds the adapter’s max_native_batch_size (e.g. OriginQ task_group_size 200, IBM 100).

  • For platforms without native batch (Quafu, Quark, Dummy) — max_native_batch_size = 1: uniqc loops one platform job per circuit, but the user still receives a single uqt_* id and wait_for_result() returns the per-circuit results in submission order.

Parameters:
  • circuits (list[Circuit | str]) – List of UnifiedQuantum Circuits to submit.

  • backend (str) – The backend name.

  • shots (int) – Number of measurement shots per circuit.

  • options (BackendOptions | UnifiedOptions | dict | None) – Optional typed backend options. Same as in submit_task().

  • local_compile (int) – Local qiskit transpile pass strength. See submit_task() for the full semantics. Default 1.

  • cloud_compile (int) – Cloud-side compile request strength. See submit_task(). Default 1.

  • backend_name (str | None) – OriginQ chip name (optional when backend already encodes the chip).

  • chip_id (str | None) – Quafu / IBM chip ID.

  • native_batch (bool) – When True (default), shards use the platform’s native grouped-submission API (one platform job per shard). When False, every circuit is submitted as a separate platform job (one shard per circuit). Useful when you need to retry/cancel individual circuits but do not need per-circuit task ids returned.

  • return_platform_ids (bool) – When True, returns the list of platform-issued task ids (one per shard, NOT one per circuit). Provided only for legacy code paths and debugging; new code should always use the single returned uqt_* id and call get_platform_task_ids() if it needs the mapping.

  • **kwargs (Any) – Additional backend-specific parameters.

Returns:

The uniqc task id uqt_* for this batch, or a list of platform task ids when return_platform_ids=True.

Return type:

str | list[str]

Example

>>> circuits = [build(i) for i in range(400)]
>>> uid = submit_batch(circuits, 'originq', shots=1000,
...                    backend_name='WK_C180')
>>> # `uid` is one ``uqt_*`` covering up to ``ceil(400/200) = 2`` shards
>>> results = wait_for_result(uid)         # list[UnifiedResult] len=400
>>> shards  = get_platform_task_ids(uid)   # 2 shard rows
uniqc.backend_adapter.task_manager.submit_task(circuit, backend, shots=1000, metadata=None, options=None, *, local_compile=1, cloud_compile=1, backend_name=None, chip_id=None, **kwargs)[source]

Submit a single circuit to a quantum backend.

This function performs IR-language validation, optionally rewrites the circuit through a local qiskit transpile pass, then ships the result to the backend’s native API.

Parameters:
  • circuit (Circuit | str) – The UnifiedQuantum Circuit to submit.

  • backend (str) – Backend identifier in the canonical 'provider:chip-name' format (e.g. 'originq:WK_C180', 'quafu:ScQ-P10', 'ibm:ibm_brisbane'). Cloud submissions reject the bare 'provider' form (e.g. 'originq') and surface the list of cached chips for that provider — call uniqc.list_backends() or run uniqc backend list -p originq to discover available chips. Dummy backends use 'dummy' or 'dummy:<provider>:<chip>' and are exempt from the strict format check.

  • shots (int) – Number of measurement shots.

  • metadata (dict | None) – Optional metadata to store with the task.

  • options (BackendOptions | UnifiedOptions | dict | None) – Optional typed backend options. Accepts a BackendOptions instance, a UnifiedOptions instance (auto-translated to the platform’s specific options with cross-platform semantics for optimize_level / error_mitigation / auto_mapping), a plain dict (treated as **kwargs), or None for platform defaults.

  • local_compile (int) –

    Local qiskit transpile pass strength.

    • 0 — no local transpile. The circuit is shipped as-authored (after IR-language validation). Use this when you have hand-tuned the circuit or want to delegate everything to the cloud transpiler.

    • 1 (default) — light qiskit transpile to the chip’s basis gates and topology when validation fails.

    • 2 / 3 — heavier qiskit optimization. Slower but yields shorter / higher-fidelity circuits.

    See docs/source/compile/compile_levels.md for details on what each level does.

  • cloud_compile (int) – Cloud-side compile request strength forwarded to the adapter. 0 disables cloud compile (e.g. OriginQAdapter receives circuit_optimize=False); any value > 0 enables it (boolean cloud APIs see True). Adapters with finer control may interpret 1/2/3 directly.

  • backend_name (str | None) – OriginQ chip name (e.g. 'WK_C180'). Optional when backend already encodes the chip as 'originq:<chip>'.

  • chip_id (str | None) – Quafu / IBM chip ID. Required for full validation on those platforms.

  • **kwargs (Any) –

    Additional backend-specific parameters passed through to the underlying adapter. Common implicit / hidden defaults:

    • skip_validation (default False): bypass the offline IR-language compatibility check. Use sparingly — most validation failures are real bugs.

    • For Quafu: chip_id, auto_mapping

    • For OriginQ: backend_name (e.g. 'WK_C180'), measurement_amend

    • For dummy: chip_characterization, noise_model, available_qubits, available_topology

Returns:

The task ID assigned by the backend.

Raises:
Return type:

str

Example

>>> circuit = Circuit()
>>> circuit.h(0)
>>> circuit.cnot(0, 1)
>>> circuit.measure(0, 1)
>>> # Canonical form (preferred):
>>> task_id = submit_task(circuit, backend='originq:WK_C180', shots=1000)
>>> # Legacy form (still accepted, normalised internally):
>>> task_id = submit_task(circuit, backend='originq', backend_name='WK_C180', shots=1000)
>>> # Heavier local compile, no cloud-side recompile:
>>> task_id = submit_task(
...     circuit, backend='originq:WK_C180', shots=1000,
...     local_compile=3, cloud_compile=0,
... )
>>> # Local noisy simulation using chip characterization:
>>> from uniqc.backend_adapter.task.adapters.originq_adapter import OriginQAdapter
>>> chip = OriginQAdapter().get_chip_characterization("WK_C180")
>>> task_id = submit_task(circuit, backend='dummy:local:simulator', chip_characterization=chip)
uniqc.backend_adapter.task_manager.wait_for_result(task_id, timeout=300.0, poll_interval=5.0, raise_on_failure=True)[source]

Wait for a task to complete and return its result.

This function polls the task status until it completes, fails, or the timeout is reached. The backend is auto-resolved from the cached TaskInfo — task IDs are unique, so explicit backend= is no longer needed.

Parameters:
  • task_id (str) – The task identifier.

  • timeout (float) – Maximum time to wait in seconds.

  • poll_interval (float) – Time between status checks in seconds.

  • raise_on_failure (bool) – If True, raises TaskFailedError on task failure.

Returns:

Single-circuit submissions return a UnifiedResult.

Native batch submissions (one cloud task ID covering many circuits, as produced by submit_batch() with native_batch=True) return a list[UnifiedResult] — one entry per circuit, in the order they were submitted.

Returns None if the task failed and raise_on_failure=False.

The UnifiedResult object is dict-like (result["00"], len(result), iteration, equality with a plain counts dict all work). Use UnifiedResult.raw() to access the original platform-specific payload.

Raises:
Return type:

UnifiedResult | list[UnifiedResult] | None

Example

>>> result = wait_for_result('task-123', timeout=300)
>>> print(result.counts)        # unified accessor
{'00': 512, '11': 488}
>>> result['00']                # dict-like access still works
512
>>> result.raw()                # original adapter payload
{'result': {'00': 512, '11': 488}, ...}

Native batch result:

>>> task_ids = submit_batch([c1, c2, c3], backend='originq:WK_C180')
>>> results = wait_for_result(task_ids[0])
>>> for r in results:
...     print(r.counts)