uniqc.backend_adapter.task_manager module¶
Task management with local caching for quantum computing backends.
This module provides a unified interface for submitting quantum tasks,
managing task lifecycle, and caching results locally. All persistent
storage is delegated to uniqc.backend_adapter.task.store.TaskStore (SQLite at
~/.uniqc/cache/tasks.sqlite).
Usage:
from uniqc.backend_adapter.task_manager import submit_task, query_task, wait_for_result, dry_run_task
from uniqc.circuit_builder import Circuit
# Create a circuit
circuit = Circuit()
circuit.h(0)
circuit.cnot(0, 1)
circuit.measure(0, 1)
# Dry-run: validate circuit offline before submitting
result = dry_run_task(circuit, backend='quafu:ScQ-P18', shots=1000)
if not result.success:
print(f"Validation failed: {result.error}")
# Submit task
task_id = submit_task(circuit, backend='quafu:ScQ-P18', shots=1000)
# Wait for result (backend is auto-resolved from the cached TaskInfo)
result = wait_for_result(task_id, timeout=300)
# Query task status
info = query_task(task_id)
print(info['status']) # 'running', 'success', or 'failed'
# Use dummy backend for local simulation
task_id = submit_task(circuit, backend='dummy:local:simulator', shots=1000)
Note
Any dry-run success followed by actual submission failure is a critical bug. Please report it at the UnifiedQuantum issue tracker.
- class uniqc.backend_adapter.task_manager.TaskManager(cache_dir=None)[source]¶
Bases:
objectHigh-level task manager for quantum computing workflows.
This class provides a convenient interface for managing quantum tasks with persistent caching and batch operations.
Example
>>> manager = TaskManager() >>> task_id = manager.submit(circuit, backend='quafu:ScQ-P18', shots=1000) >>> result = manager.wait_for_result(task_id) >>> print(result)
- Parameters:
cache_dir (Path | str | None)
- wait_for_result(task_id, timeout=300.0, poll_interval=5.0, raise_on_failure=True)[source]¶
Wait for a task to complete. See
wait_for_result().- Parameters:
- Return type:
UnifiedResult | list[UnifiedResult] | None
- uniqc.backend_adapter.task_manager.clear_cache(cache_dir=None)[source]¶
Clear all tasks from the cache by deleting the SQLite file.
- Parameters:
cache_dir (Path | None) – Optional custom cache directory.
- Return type:
None
- uniqc.backend_adapter.task_manager.clear_completed_tasks(cache_dir=None, status=None)[source]¶
Remove completed tasks from the cache.
- Parameters:
- Returns:
Number of tasks removed.
- Return type:
- uniqc.backend_adapter.task_manager.dry_run_batch(circuits, backend, shots=1000, **kwargs)[source]¶
Validate multiple circuits against a backend without making network calls.
Runs dry_run_task() on each circuit in sequence and returns a list of results, one per circuit in input order.
- Parameters:
- Returns:
List of DryRunResult, one per circuit in input order.
- Return type:
Note
Any dry-run success followed by actual submission failure is a critical bug. Please report it at the UnifiedQuantum issue tracker.
- uniqc.backend_adapter.task_manager.dry_run_task(circuit, backend, shots=1000, **kwargs)[source]¶
Validate a circuit against a backend without making network calls.
This performs a dry-run validation that checks: 1. The circuit can be successfully translated to the platform’s native format. 2. The resulting native circuit object is structurally valid. 3. The qubit count fits within the backend’s limits (where determinable offline). 4. Gate basis compatibility is confirmed (where determinable offline).
This function makes NO cloud API calls.
- Parameters:
circuit (Circuit) – The UnifiedQuantum Circuit to validate.
backend (str) – The backend identifier. Accepts the same forms as
submit_task(), including'<platform>:<chip>'(e.g.'originq:WK_C180','dummy:originq:WK_C180').shots (int) – Number of measurement shots for validation.
**kwargs (Any) –
Additional backend-specific parameters. - For IBM: chip_id (required for full validation) - For Quafu: chip_id (required for full validation) - For OriginQ: backend_name (e.g., ‘WK_C180’). When
backendalready contains the chip suffix (
'originq:WK_C180') the chip is extracted automatically and forwarded asbackend_name.
- Returns:
DryRunResult indicating success or failure with details and warnings.
- Return type:
Example
>>> from uniqc.circuit_builder import Circuit >>> circuit = Circuit() >>> circuit.h(0) >>> circuit.measure(0) >>> result = dry_run_task(circuit, backend='quafu:ScQ-P18', shots=1000) >>> if result.success: ... print("Circuit is valid for submission") >>> else: ... print(f"Validation failed: {result.error}")
Note
Any dry-run success followed by actual submission failure is a critical bug. Please report it at the UnifiedQuantum issue tracker.
- uniqc.backend_adapter.task_manager.get_platform_task_ids(task_id)[source]¶
Return the shard mapping for a uniqc task id.
Each
TaskShardrecords:platform_task_id— the id assigned by the cloud platformshard_index— 0-based ordering within the parentcircuit_count— number of circuits packed into this shardsub_index_offset— index of this shard’s first circuit in the originalsubmit_batch()liststatus/error_message— per-shard liveness
- Parameters:
task_id (str) – A uniqc task id (
uqt_*). For backwards compatibility this will also accept a platform task id and emit aDeprecationWarningwhile resolving the parent.- Returns:
Shards in submission order. Empty list when the task has no shards yet (e.g. submission failed before any shard was persisted) or when
task_idis unknown.- Return type:
- uniqc.backend_adapter.task_manager.get_result(task_id, timeout=300.0, poll_interval=5.0, raise_on_failure=True)[source]¶
Blocking retrieval: wait until task completes or timeout.
This is a convenience alias for
wait_for_result(). The nameget_resultemphasises the “I want the answer” pattern, whilewait_for_resultemphasises the blocking behaviour.- Parameters:
- Returns:
UnifiedResultfor single-circuit tasks, orlist[UnifiedResult]for native batch submissions. ReturnsNoneif the task failed and raise_on_failure is False.- Raises:
TaskTimeoutError – If timeout is exceeded.
TaskFailedError – If the task fails and raise_on_failure is True.
- Return type:
UnifiedResult | list[UnifiedResult] | None
- uniqc.backend_adapter.task_manager.get_task(task_id, cache_dir=None)[source]¶
Get a task from the local cache.
- uniqc.backend_adapter.task_manager.list_tasks(status=None, backend=None, cache_dir=None, *, limit=None, offset=None)[source]¶
List tasks from the local cache.
- Parameters:
status (str | None) – Filter by status (optional).
backend (str | None) – Filter by backend (optional).
cache_dir (Path | None) – Optional custom cache directory.
limit (int | None) – If set, push a SQL
LIMITto the underlying store so very large caches (thousands of tasks, gigabytes of result blobs) do not have to be fully materialised in memory before being sliced.offset (int | None) – SQL
OFFSETfor paginated reads. Implieslimit.
- Returns:
List of TaskInfo objects matching the filters, newest first.
- Return type:
- uniqc.backend_adapter.task_manager.poll_result(task_id)[source]¶
Non-blocking status check: return current task status/result without waiting.
Unlike
wait_for_result(), this returns immediately with the latest cached status. Call it in a loop if you want to poll without blocking.- Parameters:
task_id (str) – The task identifier (
uqt_*or platform id).- Returns:
TaskInfowith current status. Check.statusforTaskStatus.SUCCESS,TaskStatus.FAILED,TaskStatus.RUNNING, etc. If the task has completed,.resultwill be populated.- Return type:
Example
>>> task_id = submit_task(circuit, backend='originq:WK_C180') >>> while True: ... info = poll_result(task_id) ... if info.status in (TaskStatus.SUCCESS, TaskStatus.FAILED): ... break ... time.sleep(2)
- uniqc.backend_adapter.task_manager.query_task(task_id, backend=None)[source]¶
Query the status of a task.
For uniqc-managed task ids, refreshes each shard’s status from its backend (best-effort), then denormalises the aggregate status onto the parent
tasksrow. The returnedTaskInfotherefore reflects the freshest known state.For legacy platform task ids (pre-v4 cache rows or rows without shards), falls back to the historical direct-query path.
- Parameters:
- Returns:
TaskInfowith current aggregated status.- Return type:
- uniqc.backend_adapter.task_manager.save_task(task_info, cache_dir=None)[source]¶
Save a task to the local cache.
- uniqc.backend_adapter.task_manager.submit_batch(circuits, backend, shots=1000, options=None, *, local_compile=1, cloud_compile=1, backend_name=None, chip_id=None, native_batch=True, return_platform_ids=False, **kwargs)[source]¶
Submit multiple circuits as a batch and return a single uniqc task id.
uniqc maintains an internal mapping from one
uqt_*task id to one or more platform-issued task ids. The user manages exactly one handle, regardless of the underlying platform’s batch capabilities:For platforms with native batch (OriginQ, IBM) — circuits are packed into one platform job per shard. uniqc auto-shards if the batch exceeds the adapter’s
max_native_batch_size(e.g. OriginQtask_group_size200, IBM 100).For platforms without native batch (Quafu, Quark, Dummy) —
max_native_batch_size = 1: uniqc loops one platform job per circuit, but the user still receives a singleuqt_*id andwait_for_result()returns the per-circuit results in submission order.
- Parameters:
circuits (list[Circuit | str]) – List of UnifiedQuantum Circuits to submit.
backend (str) – The backend name.
shots (int) – Number of measurement shots per circuit.
options (BackendOptions | UnifiedOptions | dict | None) – Optional typed backend options. Same as in
submit_task().local_compile (int) – Local qiskit transpile pass strength. See
submit_task()for the full semantics. Default1.cloud_compile (int) – Cloud-side compile request strength. See
submit_task(). Default1.backend_name (str | None) – OriginQ chip name (optional when
backendalready encodes the chip).chip_id (str | None) – Quafu / IBM chip ID.
native_batch (bool) – When
True(default), shards use the platform’s native grouped-submission API (one platform job per shard). WhenFalse, every circuit is submitted as a separate platform job (one shard per circuit). Useful when you need to retry/cancel individual circuits but do not need per-circuit task ids returned.return_platform_ids (bool) – When
True, returns the list of platform-issued task ids (one per shard, NOT one per circuit). Provided only for legacy code paths and debugging; new code should always use the single returneduqt_*id and callget_platform_task_ids()if it needs the mapping.**kwargs (Any) – Additional backend-specific parameters.
- Returns:
The uniqc task id
uqt_*for this batch, or a list of platform task ids whenreturn_platform_ids=True.- Return type:
Example
>>> circuits = [build(i) for i in range(400)] >>> uid = submit_batch(circuits, 'originq', shots=1000, ... backend_name='WK_C180') >>> # `uid` is one ``uqt_*`` covering up to ``ceil(400/200) = 2`` shards >>> results = wait_for_result(uid) # list[UnifiedResult] len=400 >>> shards = get_platform_task_ids(uid) # 2 shard rows
- uniqc.backend_adapter.task_manager.submit_task(circuit, backend, shots=1000, metadata=None, options=None, *, local_compile=1, cloud_compile=1, backend_name=None, chip_id=None, **kwargs)[source]¶
Submit a single circuit to a quantum backend.
This function performs IR-language validation, optionally rewrites the circuit through a local qiskit transpile pass, then ships the result to the backend’s native API.
- Parameters:
circuit (Circuit | str) – The UnifiedQuantum Circuit to submit.
backend (str) – Backend identifier in the canonical
'provider:chip-name'format (e.g.'originq:WK_C180','quafu:ScQ-P10','ibm:ibm_brisbane'). Cloud submissions reject the bare'provider'form (e.g.'originq') and surface the list of cached chips for that provider — calluniqc.list_backends()or rununiqc backend list -p originqto discover available chips. Dummy backends use'dummy'or'dummy:<provider>:<chip>'and are exempt from the strict format check.shots (int) – Number of measurement shots.
metadata (dict | None) – Optional metadata to store with the task.
options (BackendOptions | UnifiedOptions | dict | None) – Optional typed backend options. Accepts a
BackendOptionsinstance, aUnifiedOptionsinstance (auto-translated to the platform’s specific options with cross-platform semantics foroptimize_level/error_mitigation/auto_mapping), a plain dict (treated as**kwargs), orNonefor platform defaults.local_compile (int) –
Local qiskit transpile pass strength.
0— no local transpile. The circuit is shipped as-authored (after IR-language validation). Use this when you have hand-tuned the circuit or want to delegate everything to the cloud transpiler.1(default) — light qiskit transpile to the chip’s basis gates and topology when validation fails.2/3— heavier qiskit optimization. Slower but yields shorter / higher-fidelity circuits.
See
docs/source/compile/compile_levels.mdfor details on what each level does.cloud_compile (int) – Cloud-side compile request strength forwarded to the adapter.
0disables cloud compile (e.g.OriginQAdapterreceivescircuit_optimize=False); any value> 0enables it (boolean cloud APIs seeTrue). Adapters with finer control may interpret1/2/3directly.backend_name (str | None) – OriginQ chip name (e.g.
'WK_C180'). Optional whenbackendalready encodes the chip as'originq:<chip>'.chip_id (str | None) – Quafu / IBM chip ID. Required for full validation on those platforms.
**kwargs (Any) –
Additional backend-specific parameters passed through to the underlying adapter. Common implicit / hidden defaults:
skip_validation(defaultFalse): bypass the offline IR-language compatibility check. Use sparingly — most validation failures are real bugs.For Quafu:
chip_id,auto_mappingFor OriginQ:
backend_name(e.g.'WK_C180'),measurement_amendFor dummy:
chip_characterization,noise_model,available_qubits,available_topology
- Returns:
The task ID assigned by the backend.
- Raises:
BackendNotFoundError – If the backend identifier is missing a chip name (bare
'provider') or is otherwise unrecognised.BackendNotAvailableError – If the backend is not available.
UnsupportedGateError – If the circuit uses gates that cannot be expressed in the backend’s IR language and cannot be auto-decomposed (hard block — most OriginIR-native gates such as
RPhi/PHASE2Q/UU15are auto-rewritten viauniqc.compile.decomposebefore this check; only controlled wrappings or unrecognised gates fall through).AuthenticationError – If authentication fails.
InsufficientCreditsError – If account has insufficient credits.
QuotaExceededError – If usage quota is exceeded.
NetworkError – If a network error occurs.
- Return type:
Example
>>> circuit = Circuit() >>> circuit.h(0) >>> circuit.cnot(0, 1) >>> circuit.measure(0, 1) >>> # Canonical form (preferred): >>> task_id = submit_task(circuit, backend='originq:WK_C180', shots=1000) >>> # Legacy form (still accepted, normalised internally): >>> task_id = submit_task(circuit, backend='originq', backend_name='WK_C180', shots=1000) >>> # Heavier local compile, no cloud-side recompile: >>> task_id = submit_task( ... circuit, backend='originq:WK_C180', shots=1000, ... local_compile=3, cloud_compile=0, ... ) >>> # Local noisy simulation using chip characterization: >>> from uniqc.backend_adapter.task.adapters.originq_adapter import OriginQAdapter >>> chip = OriginQAdapter().get_chip_characterization("WK_C180") >>> task_id = submit_task(circuit, backend='dummy:local:simulator', chip_characterization=chip)
- uniqc.backend_adapter.task_manager.wait_for_result(task_id, timeout=300.0, poll_interval=5.0, raise_on_failure=True)[source]¶
Wait for a task to complete and return its result.
This function polls the task status until it completes, fails, or the timeout is reached. The backend is auto-resolved from the cached
TaskInfo— task IDs are unique, so explicitbackend=is no longer needed.- Parameters:
- Returns:
Single-circuit submissions return a
UnifiedResult.Native batch submissions (one cloud task ID covering many circuits, as produced by
submit_batch()withnative_batch=True) return alist[UnifiedResult]— one entry per circuit, in the order they were submitted.Returns
Noneif the task failed andraise_on_failure=False.The
UnifiedResultobject is dict-like (result["00"],len(result), iteration, equality with a plain counts dict all work). UseUnifiedResult.raw()to access the original platform-specific payload.- Raises:
TaskTimeoutError – If the timeout is reached before completion.
TaskFailedError – If the task fails and raise_on_failure is True.
TaskNotFoundError – If the task is not found.
NetworkError – If a network error occurs.
- Return type:
UnifiedResult | list[UnifiedResult] | None
Example
>>> result = wait_for_result('task-123', timeout=300) >>> print(result.counts) # unified accessor {'00': 512, '11': 488} >>> result['00'] # dict-like access still works 512 >>> result.raw() # original adapter payload {'result': {'00': 512, '11': 488}, ...}
Native batch result:
>>> task_ids = submit_batch([c1, c2, c3], backend='originq:WK_C180') >>> results = wait_for_result(task_ids[0]) >>> for r in results: ... print(r.counts)