"""OriginQ Cloud backend adapter.
Submits OriginIR circuits to the OriginQ Cloud service using pyqpanda3.
Installation:
pip install unified-quantum[originq]
"""
from __future__ import annotations
__all__ = ["OriginQAdapter"]
import time
import warnings
from typing import Any
from uniqc.backend_info import ORIGINQ_SIMULATOR_NAMES
from uniqc.task.adapters.base import (
TASK_STATUS_FAILED,
TASK_STATUS_RUNNING,
TASK_STATUS_SUCCESS,
QuantumAdapter,
)
from uniqc.task.config import load_originq_config
from uniqc.task.optional_deps import require
def _avg(values: list[float]) -> float | None:
"""Return the arithmetic mean of a list, or None if the list is empty."""
return sum(values) / len(values) if values else None
[docs]
class OriginQAdapter(QuantumAdapter):
"""Adapter for OriginQ Cloud (本源量子云) using pyqpanda3.
This adapter uses pyqpanda3's QCloudService API for cloud task submission,
which simplifies configuration by only requiring an API key.
Note:
The pyqpanda3 package is required for this adapter.
Install with: pip install unified-quantum[originq]
"""
name = "originq"
def __init__(self) -> None:
config = load_originq_config()
self._api_key = config["api_key"]
self._task_group_size = config.get("task_group_size", 200)
self._available_qubits = config.get("available_qubits", [])
# Lazy-loaded pyqpanda3 components
self._service: Any = None
self._QCloudOptions: Any = None
self._QCloudJob: Any = None
self._JobStatus: Any = None
self._DataBase: Any = None
self._convert_originir: Any = None
# State for the current/last submitted job
self._last_backend_name: str = "origin:wuyuan:d5"
self._last_n_qubits: int | None = None
def _ensure_imports(self) -> None:
"""Lazily import pyqpanda3 modules."""
if self._service is None:
require("pyqpanda3", "originq")
from pyqpanda3.intermediate_compiler import convert_originir_string_to_qprog
from pyqpanda3.qcloud import DataBase, JobStatus, QCloudJob, QCloudOptions, QCloudService
self._service = QCloudService(api_key=self._api_key)
self._QCloudOptions = QCloudOptions
self._QCloudJob = QCloudJob
self._JobStatus = JobStatus
self._DataBase = DataBase
self._convert_originir = convert_originir_string_to_qprog
[docs]
def is_available(self) -> bool:
"""Check if the OriginQ adapter is available (credentials configured).
Returns:
bool: True if api_key is configured.
"""
return bool(self._api_key)
[docs]
def list_backends(self) -> list[dict[str, Any]]:
"""Return raw OriginQ Cloud backend metadata.
For each hardware backend (non-simulator), fetches chip_info() to
populate qubit count, topology, fidelity, and coherence data.
Returns:
List of dicts with keys: ``name``, ``available``, ``num_qubits``,
``topology`` (list of [u, v] edge pairs), ``available_qubits``,
``avg_1q_fidelity``, ``avg_2q_fidelity``, ``avg_readout_fidelity``,
``coherence_t1``, ``coherence_t2``.
"""
self._ensure_imports()
raw: dict[str, bool] = self._service.backends()
results: list[dict[str, Any]] = []
for name, available in raw.items():
entry: dict[str, Any] = {"name": name, "available": available}
if name not in ORIGINQ_SIMULATOR_NAMES:
try:
backend = self._service.backend(name)
ci = backend.chip_info()
entry["num_qubits"] = ci.qubits_num()
entry["topology"] = ci.get_chip_topology()
entry["available_qubits"] = ci.available_qubits()
# Fidelity and coherence from single/double qubit info
sq_list = ci.single_qubit_info()
entry["avg_1q_fidelity"] = _avg([sq.get_single_gate_fidelity() for sq in sq_list])
entry["avg_readout_fidelity"] = _avg([sq.get_readout_fidelity() for sq in sq_list])
entry["coherence_t1"] = _avg([sq.get_t1() for sq in sq_list])
entry["coherence_t2"] = _avg([sq.get_t2() for sq in sq_list])
dq_list = ci.double_qubits_info()
entry["avg_2q_fidelity"] = _avg([dq.get_fidelity() for dq in dq_list]) if dq_list else None
except Exception: # noqa: BLE001
# chip_info() may not be available for all backends
entry["num_qubits"] = 0
entry["topology"] = []
entry["available_qubits"] = []
entry["avg_1q_fidelity"] = None
entry["avg_2q_fidelity"] = None
entry["avg_readout_fidelity"] = None
entry["coherence_t1"] = None
entry["coherence_t2"] = None
results.append(entry)
return results
# -------------------------------------------------------------------------
# Circuit translation (OriginIR to QProg)
# -------------------------------------------------------------------------
[docs]
def translate_circuit(self, originir: str) -> Any:
"""Convert OriginIR string to QProg using pyqpanda3.
Args:
originir: OriginIR format circuit string.
Returns:
QProg object for pyqpanda3.
"""
self._ensure_imports()
return self._convert_originir(originir)
# -------------------------------------------------------------------------
# Task submission
# -------------------------------------------------------------------------
[docs]
def submit(self, circuit: str, *, shots: int = 1000, **kwargs: Any) -> str:
"""Submit a single circuit to OriginQ Cloud.
Args:
circuit: OriginIR format circuit string.
shots: Number of measurement shots.
**kwargs: Additional options:
- backend_name: Backend name (e.g., 'origin:wuyuan:d5')
- circuit_optimize: Enable circuit optimization (default: True)
- measurement_amend: Enable measurement amendment (default: False)
- auto_mapping: Enable automatic qubit mapping (default: False)
Returns:
Task ID string.
"""
self._ensure_imports()
backend_name = kwargs.get("backend_name", "origin:wuyuan:d5")
circuit_optimize = kwargs.get("circuit_optimize", True)
measurement_amend = kwargs.get("measurement_amend", False)
auto_mapping = kwargs.get("auto_mapping", False)
# Get backend and cache backend name + qubit count for use in query()
backend = self._service.backend(backend_name)
self._last_backend_name = backend_name
self._last_n_qubits = backend.chip_info().qubits_num()
# Convert OriginIR to QProg
qprog = self.translate_circuit(circuit)
# Configure options
options = self._create_options(
amend=measurement_amend,
mapping=auto_mapping,
optimization=circuit_optimize,
)
# Submit job
job = backend.run(qprog, shots=shots, options=options)
return job.job_id()
[docs]
def submit_batch(self, circuits: list[str], *, shots: int = 1000, **kwargs: Any) -> str | list[str]:
"""Submit circuits as a group.
Note: pyqpanda3 handles batch submission internally. This method
submits circuits sequentially if needed for grouping.
Args:
circuits: List of OriginIR format circuit strings.
shots: Number of measurement shots.
**kwargs: Additional options (see submit()).
Returns:
Single task ID or list of task IDs if split into groups.
"""
self._ensure_imports()
backend_name = kwargs.get("backend_name", "origin:wuyuan:d5")
circuit_optimize = kwargs.get("circuit_optimize", True)
measurement_amend = kwargs.get("measurement_amend", False)
auto_mapping = kwargs.get("auto_mapping", False)
# Get backend and cache backend name + qubit count
backend = self._service.backend(backend_name)
self._last_backend_name = backend_name
self._last_n_qubits = backend.chip_info().qubits_num()
options = self._create_options(
amend=measurement_amend,
mapping=auto_mapping,
optimization=circuit_optimize,
)
# If circuits fit in one group, submit all together
if len(circuits) <= self._task_group_size:
qprogs = [self.translate_circuit(c) for c in circuits]
job = backend.run(qprogs, shots=shots, options=options)
return job.job_id()
# Split into groups
task_ids: list[str] = []
for i in range(0, len(circuits), self._task_group_size):
group = circuits[i : i + self._task_group_size]
qprogs = [self.translate_circuit(c) for c in group]
job = backend.run(qprogs, shots=shots, options=options)
task_ids.append(job.job_id())
return task_ids
def _create_options(self, amend: bool, mapping: bool, optimization: bool) -> Any:
"""Create QCloudOptions from adapter parameters.
Args:
amend: Enable measurement amendment.
mapping: Enable automatic qubit mapping.
optimization: Enable circuit optimization.
Returns:
QCloudOptions instance.
"""
options = self._QCloudOptions()
options.set_amend(amend)
options.set_mapping(mapping)
options.set_optimization(optimization)
return options
# -------------------------------------------------------------------------
# Task query
# -------------------------------------------------------------------------
[docs]
def query(self, taskid: str) -> dict[str, Any]:
"""Query a single task's status.
Args:
taskid: Task ID to query.
Returns:
dict with keys: taskid, status, result (if completed)
"""
self._ensure_imports()
job = self._QCloudJob(taskid)
status = job.status()
if status == self._JobStatus.FINISHED:
result = job.result()
counts = result.get_counts()
return {
"taskid": taskid,
"status": TASK_STATUS_SUCCESS,
"result": self._format_counts(counts),
}
elif status == self._JobStatus.FAILED:
return {
"taskid": taskid,
"status": TASK_STATUS_FAILED,
"result": {"error": "Job failed on cloud"},
}
else:
# RUNNING, QUEUING, WAITING
return {
"taskid": taskid,
"status": TASK_STATUS_RUNNING,
}
[docs]
def query_batch(self, taskids: list[str]) -> dict[str, Any]:
"""Query multiple tasks and merge results.
Args:
taskids: List of task IDs to query.
Returns:
Combined result dict with status and merged results.
"""
taskinfo: dict[str, Any] = {"status": TASK_STATUS_SUCCESS, "result": []}
for taskid in taskids:
result_i = self.query(taskid)
if result_i["status"] == TASK_STATUS_FAILED:
taskinfo["status"] = TASK_STATUS_FAILED
break
elif result_i["status"] == TASK_STATUS_RUNNING:
taskinfo["status"] = TASK_STATUS_RUNNING
if taskinfo["status"] == TASK_STATUS_SUCCESS:
taskinfo["result"].extend(result_i.get("result", []))
return taskinfo
def _format_counts(self, counts: Any) -> list[dict]:
"""Format pyqpanda3 counts to adapter result format.
Args:
counts: Counts from QCloudResult.get_counts().
Returns:
List of result dicts with 'key' and 'value' keys.
"""
if isinstance(counts, dict):
return [{"key": k, "value": v} for k, v in counts.items()]
elif isinstance(counts, list):
# Handle list of counts (for batch results)
results = []
for c in counts:
if isinstance(c, dict):
results.extend([{"key": k, "value": v} for k, v in c.items()])
return results
else:
return [{"key": str(counts), "value": 1}]
# -------------------------------------------------------------------------
# Synchronous wait
# -------------------------------------------------------------------------
[docs]
def query_sync(
self,
taskid: str | list[str],
interval: float = 2.0,
timeout: float = 60.0,
retry: int = 5,
) -> list[dict[str, Any]]:
"""Poll task status until completion or timeout.
Args:
taskid: Task ID or list of task IDs.
interval: Polling interval in seconds.
timeout: Maximum wait time in seconds.
retry: Number of retries on query failure.
Returns:
List of result dicts.
Raises:
TimeoutError: If timeout is reached.
RuntimeError: If task fails or retry exhausted.
"""
taskids = [taskid] if isinstance(taskid, str) else taskid
starttime = time.time()
while True:
elapsed = time.time() - starttime
if elapsed > timeout:
raise TimeoutError("Reached the maximum timeout.")
time.sleep(interval)
taskinfo = self.query_batch(taskids)
if taskinfo["status"] == TASK_STATUS_RUNNING:
continue
if taskinfo["status"] == TASK_STATUS_SUCCESS:
return taskinfo.get("result", [])
if taskinfo["status"] == TASK_STATUS_FAILED:
raise RuntimeError(f"Failed to execute, errorinfo = {taskinfo.get('result')}")
if retry > 0:
retry -= 1
warnings.warn(f"Query failed. Retry remains {retry} times.", stacklevel=2)
else:
raise RuntimeError("Retry count exhausted.")