Source code for uniqc.task.adapters.originq_adapter

"""OriginQ Cloud backend adapter.

Submits OriginIR circuits to the OriginQ Cloud service using pyqpanda3.

Installation:
    pip install unified-quantum[originq]
"""

from __future__ import annotations

__all__ = ["OriginQAdapter"]

import time
import warnings
from typing import Any

from uniqc.backend_info import ORIGINQ_SIMULATOR_NAMES
from uniqc.task.adapters.base import (
    TASK_STATUS_FAILED,
    TASK_STATUS_RUNNING,
    TASK_STATUS_SUCCESS,
    QuantumAdapter,
)
from uniqc.task.config import load_originq_config
from uniqc.task.optional_deps import require


def _avg(values: list[float]) -> float | None:
    """Return the arithmetic mean of a list, or None if the list is empty."""
    return sum(values) / len(values) if values else None


[docs] class OriginQAdapter(QuantumAdapter): """Adapter for OriginQ Cloud (本源量子云) using pyqpanda3. This adapter uses pyqpanda3's QCloudService API for cloud task submission, which simplifies configuration by only requiring an API key. Note: The pyqpanda3 package is required for this adapter. Install with: pip install unified-quantum[originq] """ name = "originq" def __init__(self) -> None: config = load_originq_config() self._api_key = config["api_key"] self._task_group_size = config.get("task_group_size", 200) self._available_qubits = config.get("available_qubits", []) # Lazy-loaded pyqpanda3 components self._service: Any = None self._QCloudOptions: Any = None self._QCloudJob: Any = None self._JobStatus: Any = None self._DataBase: Any = None self._convert_originir: Any = None # State for the current/last submitted job self._last_backend_name: str = "origin:wuyuan:d5" self._last_n_qubits: int | None = None def _ensure_imports(self) -> None: """Lazily import pyqpanda3 modules.""" if self._service is None: require("pyqpanda3", "originq") from pyqpanda3.intermediate_compiler import convert_originir_string_to_qprog from pyqpanda3.qcloud import DataBase, JobStatus, QCloudJob, QCloudOptions, QCloudService self._service = QCloudService(api_key=self._api_key) self._QCloudOptions = QCloudOptions self._QCloudJob = QCloudJob self._JobStatus = JobStatus self._DataBase = DataBase self._convert_originir = convert_originir_string_to_qprog
[docs] def is_available(self) -> bool: """Check if the OriginQ adapter is available (credentials configured). Returns: bool: True if api_key is configured. """ return bool(self._api_key)
[docs] def list_backends(self) -> list[dict[str, Any]]: """Return raw OriginQ Cloud backend metadata. For each hardware backend (non-simulator), fetches chip_info() to populate qubit count, topology, fidelity, and coherence data. Returns: List of dicts with keys: ``name``, ``available``, ``num_qubits``, ``topology`` (list of [u, v] edge pairs), ``available_qubits``, ``avg_1q_fidelity``, ``avg_2q_fidelity``, ``avg_readout_fidelity``, ``coherence_t1``, ``coherence_t2``. """ self._ensure_imports() raw: dict[str, bool] = self._service.backends() results: list[dict[str, Any]] = [] for name, available in raw.items(): entry: dict[str, Any] = {"name": name, "available": available} if name not in ORIGINQ_SIMULATOR_NAMES: try: backend = self._service.backend(name) ci = backend.chip_info() entry["num_qubits"] = ci.qubits_num() entry["topology"] = ci.get_chip_topology() entry["available_qubits"] = ci.available_qubits() # Fidelity and coherence from single/double qubit info sq_list = ci.single_qubit_info() entry["avg_1q_fidelity"] = _avg([sq.get_single_gate_fidelity() for sq in sq_list]) entry["avg_readout_fidelity"] = _avg([sq.get_readout_fidelity() for sq in sq_list]) entry["coherence_t1"] = _avg([sq.get_t1() for sq in sq_list]) entry["coherence_t2"] = _avg([sq.get_t2() for sq in sq_list]) dq_list = ci.double_qubits_info() entry["avg_2q_fidelity"] = _avg([dq.get_fidelity() for dq in dq_list]) if dq_list else None except Exception: # noqa: BLE001 # chip_info() may not be available for all backends entry["num_qubits"] = 0 entry["topology"] = [] entry["available_qubits"] = [] entry["avg_1q_fidelity"] = None entry["avg_2q_fidelity"] = None entry["avg_readout_fidelity"] = None entry["coherence_t1"] = None entry["coherence_t2"] = None results.append(entry) return results
# ------------------------------------------------------------------------- # Circuit translation (OriginIR to QProg) # -------------------------------------------------------------------------
[docs] def translate_circuit(self, originir: str) -> Any: """Convert OriginIR string to QProg using pyqpanda3. Args: originir: OriginIR format circuit string. Returns: QProg object for pyqpanda3. """ self._ensure_imports() return self._convert_originir(originir)
# ------------------------------------------------------------------------- # Task submission # -------------------------------------------------------------------------
[docs] def submit(self, circuit: str, *, shots: int = 1000, **kwargs: Any) -> str: """Submit a single circuit to OriginQ Cloud. Args: circuit: OriginIR format circuit string. shots: Number of measurement shots. **kwargs: Additional options: - backend_name: Backend name (e.g., 'origin:wuyuan:d5') - circuit_optimize: Enable circuit optimization (default: True) - measurement_amend: Enable measurement amendment (default: False) - auto_mapping: Enable automatic qubit mapping (default: False) Returns: Task ID string. """ self._ensure_imports() backend_name = kwargs.get("backend_name", "origin:wuyuan:d5") circuit_optimize = kwargs.get("circuit_optimize", True) measurement_amend = kwargs.get("measurement_amend", False) auto_mapping = kwargs.get("auto_mapping", False) # Get backend and cache backend name + qubit count for use in query() backend = self._service.backend(backend_name) self._last_backend_name = backend_name self._last_n_qubits = backend.chip_info().qubits_num() # Convert OriginIR to QProg qprog = self.translate_circuit(circuit) # Configure options options = self._create_options( amend=measurement_amend, mapping=auto_mapping, optimization=circuit_optimize, ) # Submit job job = backend.run(qprog, shots=shots, options=options) return job.job_id()
[docs] def submit_batch(self, circuits: list[str], *, shots: int = 1000, **kwargs: Any) -> str | list[str]: """Submit circuits as a group. Note: pyqpanda3 handles batch submission internally. This method submits circuits sequentially if needed for grouping. Args: circuits: List of OriginIR format circuit strings. shots: Number of measurement shots. **kwargs: Additional options (see submit()). Returns: Single task ID or list of task IDs if split into groups. """ self._ensure_imports() backend_name = kwargs.get("backend_name", "origin:wuyuan:d5") circuit_optimize = kwargs.get("circuit_optimize", True) measurement_amend = kwargs.get("measurement_amend", False) auto_mapping = kwargs.get("auto_mapping", False) # Get backend and cache backend name + qubit count backend = self._service.backend(backend_name) self._last_backend_name = backend_name self._last_n_qubits = backend.chip_info().qubits_num() options = self._create_options( amend=measurement_amend, mapping=auto_mapping, optimization=circuit_optimize, ) # If circuits fit in one group, submit all together if len(circuits) <= self._task_group_size: qprogs = [self.translate_circuit(c) for c in circuits] job = backend.run(qprogs, shots=shots, options=options) return job.job_id() # Split into groups task_ids: list[str] = [] for i in range(0, len(circuits), self._task_group_size): group = circuits[i : i + self._task_group_size] qprogs = [self.translate_circuit(c) for c in group] job = backend.run(qprogs, shots=shots, options=options) task_ids.append(job.job_id()) return task_ids
def _create_options(self, amend: bool, mapping: bool, optimization: bool) -> Any: """Create QCloudOptions from adapter parameters. Args: amend: Enable measurement amendment. mapping: Enable automatic qubit mapping. optimization: Enable circuit optimization. Returns: QCloudOptions instance. """ options = self._QCloudOptions() options.set_amend(amend) options.set_mapping(mapping) options.set_optimization(optimization) return options # ------------------------------------------------------------------------- # Task query # -------------------------------------------------------------------------
[docs] def query(self, taskid: str) -> dict[str, Any]: """Query a single task's status. Args: taskid: Task ID to query. Returns: dict with keys: taskid, status, result (if completed) """ self._ensure_imports() job = self._QCloudJob(taskid) status = job.status() if status == self._JobStatus.FINISHED: result = job.result() counts = result.get_counts() return { "taskid": taskid, "status": TASK_STATUS_SUCCESS, "result": self._format_counts(counts), } elif status == self._JobStatus.FAILED: return { "taskid": taskid, "status": TASK_STATUS_FAILED, "result": {"error": "Job failed on cloud"}, } else: # RUNNING, QUEUING, WAITING return { "taskid": taskid, "status": TASK_STATUS_RUNNING, }
[docs] def query_batch(self, taskids: list[str]) -> dict[str, Any]: """Query multiple tasks and merge results. Args: taskids: List of task IDs to query. Returns: Combined result dict with status and merged results. """ taskinfo: dict[str, Any] = {"status": TASK_STATUS_SUCCESS, "result": []} for taskid in taskids: result_i = self.query(taskid) if result_i["status"] == TASK_STATUS_FAILED: taskinfo["status"] = TASK_STATUS_FAILED break elif result_i["status"] == TASK_STATUS_RUNNING: taskinfo["status"] = TASK_STATUS_RUNNING if taskinfo["status"] == TASK_STATUS_SUCCESS: taskinfo["result"].extend(result_i.get("result", [])) return taskinfo
def _format_counts(self, counts: Any) -> list[dict]: """Format pyqpanda3 counts to adapter result format. Args: counts: Counts from QCloudResult.get_counts(). Returns: List of result dicts with 'key' and 'value' keys. """ if isinstance(counts, dict): return [{"key": k, "value": v} for k, v in counts.items()] elif isinstance(counts, list): # Handle list of counts (for batch results) results = [] for c in counts: if isinstance(c, dict): results.extend([{"key": k, "value": v} for k, v in c.items()]) return results else: return [{"key": str(counts), "value": 1}] # ------------------------------------------------------------------------- # Synchronous wait # -------------------------------------------------------------------------
[docs] def query_sync( self, taskid: str | list[str], interval: float = 2.0, timeout: float = 60.0, retry: int = 5, ) -> list[dict[str, Any]]: """Poll task status until completion or timeout. Args: taskid: Task ID or list of task IDs. interval: Polling interval in seconds. timeout: Maximum wait time in seconds. retry: Number of retries on query failure. Returns: List of result dicts. Raises: TimeoutError: If timeout is reached. RuntimeError: If task fails or retry exhausted. """ taskids = [taskid] if isinstance(taskid, str) else taskid starttime = time.time() while True: elapsed = time.time() - starttime if elapsed > timeout: raise TimeoutError("Reached the maximum timeout.") time.sleep(interval) taskinfo = self.query_batch(taskids) if taskinfo["status"] == TASK_STATUS_RUNNING: continue if taskinfo["status"] == TASK_STATUS_SUCCESS: return taskinfo.get("result", []) if taskinfo["status"] == TASK_STATUS_FAILED: raise RuntimeError(f"Failed to execute, errorinfo = {taskinfo.get('result')}") if retry > 0: retry -= 1 warnings.warn(f"Query failed. Retry remains {retry} times.", stacklevel=2) else: raise RuntimeError("Retry count exhausted.")