"""OriginQ Cloud backend adapter.
Submits OriginIR circuits to the OriginQ Cloud service using pyqpanda3.
Installation:
pip install unified-quantum[originq]
"""
from __future__ import annotations
__all__ = ["OriginQAdapter"]
import time
import warnings
from typing import Any
from uniqc.backend_adapter.backend_info import ORIGINQ_SIMULATOR_NAMES
from uniqc.backend_adapter.task.adapters.base import (
TASK_STATUS_FAILED,
TASK_STATUS_RUNNING,
TASK_STATUS_SUCCESS,
DryRunResult,
QuantumAdapter,
)
from uniqc.backend_adapter.task.optional_deps import require
from uniqc.config import load_originq_config
from uniqc.exceptions import BackendNotAvailableError
def _avg(values: list[float]) -> float | None:
"""Return the arithmetic mean of a list, or None if the list is empty."""
return sum(values) / len(values) if values else None
# OriginQ's pyqpanda3 OriginIR parser does not currently accept ``SX`` /
# ``SX.dagger`` tokens even though the platform's basis gate set advertises
# SX. Rewrite them to the equivalent ``RX(±π/2)`` form before submission.
#
# SX = RX( π/2) (up to a global phase, irrelevant for sampling)
# SX.dagger = RX(-π/2)
import re as _re
_SX_PI_OVER_2 = "1.5707963267948966"
_NEG_SX_PI_OVER_2 = "-1.5707963267948966"
_SX_DAGGER_RE = _re.compile(r"^(\s*)SX\s+(q\[\d+\])\s*\.\s*dagger\s*$", _re.IGNORECASE)
_SX_RE = _re.compile(r"^(\s*)SX\s+(q\[\d+\])\s*$", _re.IGNORECASE)
def _rewrite_sx_to_rx(originir: str) -> str:
"""Rewrite ``SX q[i]`` → ``RX q[i],(π/2)`` and ``SX q[i].dagger`` → ``RX q[i],(-π/2)``.
OriginQ's remote pyqpanda3 parser rejects the bare ``SX`` token even
though the platform basis gate set lists SX. Substituting with the
equivalent ``RX(±π/2)`` form keeps the transpiled circuit compatible
with the cloud parser.
"""
if "SX" not in originir:
return originir
out_lines = []
for line in originir.splitlines():
m = _SX_DAGGER_RE.match(line)
if m:
out_lines.append(f"{m.group(1)}RX {m.group(2)},({_NEG_SX_PI_OVER_2})")
continue
m = _SX_RE.match(line)
if m:
out_lines.append(f"{m.group(1)}RX {m.group(2)},({_SX_PI_OVER_2})")
continue
out_lines.append(line)
return "\n".join(out_lines) + ("\n" if originir.endswith("\n") else "")
[docs]
class OriginQAdapter(QuantumAdapter):
"""Adapter for OriginQ Cloud (本源量子云) using pyqpanda3.
This adapter uses pyqpanda3's QCloudService API for cloud task submission,
which simplifies configuration by only requiring an API key.
Note:
The pyqpanda3 package is required for this adapter.
Install with: pip install unified-quantum[originq]
"""
name = "originq"
@property
def max_native_batch_size(self) -> int:
"""OriginQ native batch limit, taken from ``originq.task_group_size``.
Configurable via ``~/.uniqc/uniqc.yml``. Default ``200``. uniqc
slices any user batch larger than this into multiple platform
jobs (one shard per slice) and aggregates the results
transparently behind a single ``uqt_*`` task id.
"""
# ``int(...)`` defends against config returning a string.
return int(self._task_group_size or 200)
def __init__(self, backend_name: str | None = None) -> None:
"""Initialize the OriginQ adapter.
Args:
backend_name: Default backend name for submit() calls that don't
specify one. If None, defaults to "PQPUMESH8" (3-qubit
fully-connected hardware). Hardware backends (PQPUMESH8,
WK_C180, etc.) require explicit naming since there is no
single "default" chip in the cloud account.
"""
config = load_originq_config()
self._api_key = config["api_key"]
self._task_group_size = config.get("task_group_size", 200)
self._available_qubits = config.get("available_qubits", [])
# Lazy-loaded pyqpanda3 components
self._service: Any = None
self._QCloudOptions: Any = None
self._QCloudJob: Any = None
self._JobStatus: Any = None
self._DataBase: Any = None
self._convert_originir: Any = None
# State for the current/last submitted job
self._last_backend_name: str = backend_name if backend_name else "PQPUMESH8"
self._last_n_qubits: int | None = None
self._canonical_backend_cache: dict[str, str] = {}
self._batch_job_sizes: dict[str, tuple[int, int]] = {}
def _canonical_backend_name(self, backend_name: str) -> str:
"""Resolve user-facing OriginQ backend aliases to cloud backend names."""
if not hasattr(self, "_canonical_backend_cache"):
self._canonical_backend_cache = {}
if backend_name in self._canonical_backend_cache:
return self._canonical_backend_cache[backend_name]
if backend_name in ORIGINQ_SIMULATOR_NAMES:
return backend_name
raw_backends = (
self._service.backends() if self._service is not None and hasattr(self._service, "backends") else {}
)
candidates = [backend_name]
stripped = backend_name
for prefix in ("originq:", "origin:"):
if stripped.startswith(prefix):
stripped = stripped[len(prefix) :]
candidates.append(stripped)
if ":" in stripped:
candidates.append(stripped.rsplit(":", 1)[-1])
aliases = {
"wk180": "WK_C180",
"wuyuan:wk180": "WK_C180",
}
for candidate in list(candidates):
alias = aliases.get(candidate.lower())
if alias is not None:
candidates.append(alias)
for candidate in candidates:
if candidate in raw_backends:
self._canonical_backend_cache[backend_name] = candidate
return candidate
for candidate in candidates:
candidate_lower = candidate.lower()
for raw_name in raw_backends:
if raw_name.lower() == candidate_lower:
self._canonical_backend_cache[backend_name] = raw_name
return raw_name
self._canonical_backend_cache[backend_name] = stripped
return stripped
def _ensure_imports(self) -> None:
"""Lazily import pyqpanda3 modules.
Hardware and simulator backends are both accessed via ``QCloudService.backend()``.
The returned ``QCloudBackend`` object exposes ``run()`` regardless of backend
type — there is no separate simulator class.
Version requirement (``pyqpanda3>=0.3.5``) is enforced via ``pyproject.toml``,
not here. ``require()`` accepts only a module name.
"""
if self._service is None:
try:
require("pyqpanda3", "originq")
from pyqpanda3.intermediate_compiler import convert_originir_string_to_qprog
from pyqpanda3.qcloud import (
DataBase,
JobStatus,
QCloudJob,
QCloudOptions,
QCloudService,
)
self._service = QCloudService(api_key=self._api_key)
self._QCloudOptions = QCloudOptions
self._QCloudJob = QCloudJob
self._JobStatus = JobStatus
self._DataBase = DataBase
self._convert_originir = convert_originir_string_to_qprog
except Exception as e:
raise RuntimeError(f"Failed to initialize pyqpanda3 for OriginQ: {e}") from e
[docs]
def is_available(self) -> bool:
"""Check if the OriginQ adapter is available (credentials configured).
Returns:
bool: True if api_key is configured.
"""
return bool(self._api_key)
[docs]
def list_backends(self) -> list[dict[str, Any]]:
"""Return raw OriginQ Cloud backend metadata.
For each hardware backend (non-simulator), fetches chip_info() to
populate qubit count, topology, fidelity, and coherence data.
Returns:
List of dicts with keys: ``name``, ``available``, ``num_qubits``,
``topology`` (list of [u, v] edge pairs), ``available_qubits``,
``avg_1q_fidelity``, ``avg_2q_fidelity``, ``avg_readout_fidelity``,
``coherence_t1``, ``coherence_t2``.
"""
self._ensure_imports()
raw: dict[str, bool] = self._service.backends()
results: list[dict[str, Any]] = []
for name, available in raw.items():
entry: dict[str, Any] = {"name": name, "available": available}
if name not in ORIGINQ_SIMULATOR_NAMES:
try:
backend = self._service.backend(name)
ci = backend.chip_info()
entry["num_qubits"] = ci.qubits_num()
entry["topology"] = ci.get_chip_topology()
entry["available_qubits"] = ci.available_qubits()
# Fidelity and coherence from single/double qubit info
sq_list = ci.single_qubit_info()
entry["avg_1q_fidelity"] = _avg([sq.get_single_gate_fidelity() for sq in sq_list])
entry["avg_readout_fidelity"] = _avg([sq.get_readout_fidelity() for sq in sq_list])
entry["coherence_t1"] = _avg([sq.get_t1() for sq in sq_list])
entry["coherence_t2"] = _avg([sq.get_t2() for sq in sq_list])
dq_list = ci.double_qubits_info()
entry["avg_2q_fidelity"] = _avg([dq.get_fidelity() for dq in dq_list]) if dq_list else None
except Exception: # noqa: BLE001
# chip_info() may not be available for all backends
entry["num_qubits"] = 0
entry["topology"] = []
entry["available_qubits"] = []
entry["avg_1q_fidelity"] = None
entry["avg_2q_fidelity"] = None
entry["avg_readout_fidelity"] = None
entry["coherence_t1"] = None
entry["coherence_t2"] = None
results.append(entry)
return results
[docs]
def get_available_backends(self) -> list[dict[str, Any]]:
"""Return only backends that are currently available.
Hardware backends may become unavailable due to maintenance or queue
congestion. Use this method to get a curated list of backends that
can accept jobs right now.
Simulator backends (``full_amplitude``, ``partial_amplitude``,
``single_amplitude``) are always considered available.
Returns:
List of backend dicts (same format as ``list_backends()``) filtered
to ``available == True``.
"""
all_backends = self.list_backends()
return [b for b in all_backends if b["available"]]
def _validate_backend(self, backend_name: str) -> None:
"""Raise BackendNotAvailableError if a hardware backend is not available.
Simulator backends are always considered available.
Args:
backend_name: Backend name to validate.
Raises:
BackendNotAvailableError: If the backend is a hardware backend
but is currently unavailable.
"""
if backend_name in ORIGINQ_SIMULATOR_NAMES:
return # Simulators are always available
cached_available = self._backend_avail_cache.get(backend_name)
if cached_available is not None:
if not cached_available:
raise BackendNotAvailableError(
f"Hardware backend '{backend_name}' is currently unavailable. "
f"To see available backends, run: uniqc backend list --info"
)
return
all_backends = self.list_backends()
for b in all_backends:
if b["name"] == backend_name:
self._backend_avail_cache[backend_name] = bool(b["available"])
if not b["available"]:
raise BackendNotAvailableError(
f"Hardware backend '{backend_name}' is currently unavailable. "
f"To see available backends, run: uniqc backend list --info"
)
return
# Backend name not found at all — let submit() fail naturally
# (could be a typo or unsupported backend)
# Cache for backend availability (refreshed per adapter instance)
_backend_avail_cache: dict[str, bool] = {}
# -------------------------------------------------------------------------
# Circuit translation (OriginIR to QProg)
# -------------------------------------------------------------------------
[docs]
def translate_circuit(self, originir: str) -> Any:
"""Convert OriginIR string to QProg using pyqpanda3.
Args:
originir: OriginIR format circuit string.
Returns:
QProg object for pyqpanda3.
"""
self._ensure_imports()
return self._convert_originir(_rewrite_sx_to_rx(originir))
# -------------------------------------------------------------------------
# Task submission
# -------------------------------------------------------------------------
[docs]
def submit(self, circuit: str, *, shots: int = 1000, **kwargs: Any) -> str:
"""Submit a single circuit to OriginQ Cloud.
Args:
circuit: OriginIR format circuit string.
shots: Number of measurement shots.
**kwargs: Additional options:
- backend_name: Backend name (e.g., 'origin:wuyuan:d5')
- circuit_optimize: Enable circuit optimization (default: True)
- measurement_amend: Enable measurement amendment (default: False)
- auto_mapping: Enable automatic qubit mapping (default: False)
Returns:
Task ID string.
"""
self._ensure_imports()
backend_name = self._canonical_backend_name(kwargs.get("backend_name", self._last_backend_name))
# Validate hardware backend availability before attempting submission
self._validate_backend(backend_name)
# Simulator backends use the same QCloudBackend.run() API as hardware
if backend_name in ORIGINQ_SIMULATOR_NAMES:
return self._submit_simulator(backend_name, circuit, shots=shots)
circuit_optimize = kwargs.get("circuit_optimize", True)
measurement_amend = kwargs.get("measurement_amend", False)
auto_mapping = kwargs.get("auto_mapping", False)
# Get backend and cache backend name + qubit count for use in query()
backend = self._service.backend(backend_name)
self._last_backend_name = backend_name
# chip_info() may fail if the backend has no chip data loaded.
# Catch and ignore — we only need the job ID for tracking.
try:
self._last_n_qubits = backend.chip_info().qubits_num()
except Exception:
self._last_n_qubits = None
# Convert OriginIR to QProg
qprog = self.translate_circuit(circuit)
# Configure options
options = self._create_options(
amend=measurement_amend,
mapping=auto_mapping,
optimization=circuit_optimize,
)
# Submit job
job = backend.run(qprog, shots=shots, options=options)
return job.job_id()
def _submit_simulator(self, backend_name: str, circuit: str, *, shots: int = 1000) -> str:
"""Submit a circuit to an OriginQ simulator backend (full_amplitude, etc.).
Simulator backends use the same ``QCloudBackend.run()`` API as hardware
backends — there is no separate simulator class in pyqpanda3.
Args:
backend_name: Simulator backend name (e.g., ``"full_amplitude"``).
circuit: OriginIR format circuit string.
shots: Number of measurement shots.
Returns:
Task ID string.
"""
self._ensure_imports()
qprog = self.translate_circuit(circuit)
backend = self._service.backend(backend_name)
job = backend.run(qprog, shots=shots)
return job.job_id()
[docs]
def submit_batch(
self,
circuits: list[str],
*,
shots: int = 1000,
native_batch: bool = True,
**kwargs: Any,
) -> list[str]:
"""Submit a batch of circuits.
OriginQ Cloud (pyqpanda3) supports a *native* batch mode where many
circuits are scheduled under a single ``job_id`` via
``QCloudBackend.run_instruction([instr1, instr2, ...], shots, options)``.
That single job spends only one position in the queue, so a 100-circuit
batch can be tens of times faster end-to-end than submitting 100 jobs.
Args:
circuits: List of OriginIR format circuit strings.
shots: Number of measurement shots applied to every circuit.
native_batch: When ``True`` (default) on a hardware chip backend,
use pyqpanda3's native batch (returns a single ``job_id`` packed
in a one-element list). When ``False``, submit each circuit
individually (returns one ``job_id`` per circuit, preserving
the legacy 0.0.11 behaviour).
**kwargs: Additional options (see :meth:`submit`).
Returns:
* ``[batch_job_id]`` — single-element list when native batch is
used (one queue position, one task ID for the whole batch).
* ``[id_0, id_1, ...]`` — N-element list when ``native_batch=False``
or for simulator backends that do not support native batching.
"""
self._ensure_imports()
backend_name = self._canonical_backend_name(kwargs.get("backend_name", self._last_backend_name))
# Validate hardware backend availability before attempting submission
self._validate_backend(backend_name)
# Simulator backends use the same QCloudBackend.run() API as hardware
# but pyqpanda3 simulators don't expose run_instruction() reliably for
# batch — keep the per-circuit fallback there.
if backend_name in ORIGINQ_SIMULATOR_NAMES:
return self._submit_batch_simulator(backend_name, circuits, shots=shots)
circuit_optimize = kwargs.get("circuit_optimize", True)
measurement_amend = kwargs.get("measurement_amend", False)
auto_mapping = kwargs.get("auto_mapping", False)
# Get backend and cache backend name + qubit count
backend = self._service.backend(backend_name)
self._last_backend_name = backend_name
try:
self._last_n_qubits = backend.chip_info().qubits_num()
except Exception:
self._last_n_qubits = None
options = self._create_options(
amend=measurement_amend,
mapping=auto_mapping,
optimization=circuit_optimize,
)
if native_batch and len(circuits) > 1:
# pyqpanda3 supports native batch submission via the
# ``backend.run(list[QProg], shots, options)`` overload — all
# circuits share a single ``job_id`` and one queue position.
# ``QCloudResult.get_counts_list()`` then returns one counts
# dict per circuit, in submission order. This is the recommended
# path for high-throughput workflows like XEB.
qprogs = [self.translate_circuit(circuit) for circuit in circuits]
job = backend.run(qprogs, shots, options)
job_id = job.job_id()
self._batch_job_sizes[job_id] = (len(circuits), shots)
return [job_id]
# Legacy / opt-out path: submit each circuit independently.
task_ids: list[str] = []
for circuit in circuits:
qprog = self.translate_circuit(circuit)
job = backend.run(qprog, shots=shots, options=options)
job_id = job.job_id()
task_ids.append(job_id)
return task_ids
def _submit_batch_simulator(self, backend_name: str, circuits: list[str], *, shots: int = 1000) -> list[str]:
"""Submit circuits to an OriginQ simulator backend.
Args:
backend_name: Simulator backend name.
circuits: List of OriginIR format circuit strings.
shots: Number of measurement shots.
Returns:
List of task IDs.
"""
task_ids: list[str] = []
for circuit in circuits:
task_ids.append(self._submit_simulator(backend_name, circuit, shots=shots))
return task_ids
def _create_options(self, amend: bool, mapping: bool, optimization: bool) -> Any:
"""Create QCloudOptions from adapter parameters.
Args:
amend: Enable measurement amendment.
mapping: Enable automatic qubit mapping.
optimization: Enable circuit optimization.
Returns:
QCloudOptions instance.
"""
options = self._QCloudOptions()
options.set_amend(amend)
options.set_mapping(mapping)
options.set_optimization(optimization)
return options
# -------------------------------------------------------------------------
# Task query
# -------------------------------------------------------------------------
[docs]
def query(self, taskid: str) -> dict[str, Any]:
"""Query a single task's status.
Args:
taskid: Task ID to query.
Returns:
dict with keys: taskid, status, result (if completed)
"""
self._ensure_imports()
job = self._QCloudJob(taskid)
# Always use job.query() (not job.status()) — it returns a QCloudResult
# with the authoritative status from the cloud, even for failed/unknown
# status codes that job.status() cannot parse.
qr = job.query()
status_name = qr.job_status().name
error_msg = qr.error_message()
batch_info = self._batch_job_sizes.get(taskid)
expected_batch_size = batch_info[0] if batch_info is not None else None
batch_shots = batch_info[1] if batch_info is not None else None
# For native batch jobs prefer ``get_counts_list()`` so each circuit's
# distribution is preserved. Fall back to ``get_counts()`` for single
# jobs or if pyqpanda3 returns nothing list-shaped.
counts: Any = None
if expected_batch_size is not None and expected_batch_size > 1:
try:
counts_list = qr.get_counts_list()
except Exception:
counts_list = None
if counts_list:
counts = counts_list
if counts is None:
counts = qr.get_counts()
if status_name == "FINISHED":
return {
"taskid": taskid,
"status": TASK_STATUS_SUCCESS,
"result": self._format_counts(counts, expected_batch_size, batch_shots),
}
elif status_name == "FAILED" or status_name == "???":
# For "???" (unknown status), check if there's an error message or
# empty counts — both indicate the task failed.
if error_msg or not counts:
return {
"taskid": taskid,
"status": TASK_STATUS_FAILED,
"result": {"error": error_msg or "Job failed on cloud (unknown status)"},
}
# If no error and has counts, treat as success despite unknown status
return {
"taskid": taskid,
"status": TASK_STATUS_SUCCESS,
"result": self._format_counts(counts, expected_batch_size, batch_shots),
}
else:
# RUNNING, QUEUING, WAITING → treat as running
return {
"taskid": taskid,
"status": TASK_STATUS_RUNNING,
}
[docs]
def query_batch(self, taskids: str | list[str]) -> dict[str, Any]:
"""Query multiple tasks and merge results.
Args:
taskids: List of task IDs to query.
Returns:
Combined result dict with status and merged results.
"""
if isinstance(taskids, str):
taskids = [taskids]
taskinfo: dict[str, Any] = {"status": TASK_STATUS_SUCCESS, "result": []}
for taskid in taskids:
result_i = self.query(taskid)
if result_i["status"] == TASK_STATUS_FAILED:
taskinfo["status"] = TASK_STATUS_FAILED
break
elif result_i["status"] == TASK_STATUS_RUNNING:
taskinfo["status"] = TASK_STATUS_RUNNING
if taskinfo["status"] == TASK_STATUS_SUCCESS:
payload = result_i.get("result", [])
if isinstance(payload, list):
taskinfo["result"].extend(payload)
elif isinstance(payload, dict):
taskinfo["result"].append(payload)
return taskinfo
def _format_counts(
self,
counts: Any,
expected_batch_size: int | None = None,
shots: int | None = None,
) -> dict[str, int] | list[dict[str, int]]:
"""Format pyqpanda3 counts to flat counts or per-circuit batch counts.
Args:
counts: Counts from QCloudResult.get_counts().
Returns:
For single-circuit results, a dict mapping bitstrings to shot counts.
For batch results, a list of such dicts, one per submitted circuit.
"""
if isinstance(counts, dict):
return dict(counts)
elif isinstance(counts, list):
if not counts:
return {}
if all(isinstance(c, dict) for c in counts):
if expected_batch_size and expected_batch_size > 1:
return [dict(c) for c in counts]
merged: dict[str, int] = {}
for c in counts:
for bitstring, value in c.items():
merged[str(bitstring)] = merged.get(str(bitstring), 0) + int(value)
return merged
if all(isinstance(c, list) for c in counts):
return [self._count_bitstrings(c) for c in counts]
if all(isinstance(c, str) for c in counts):
if expected_batch_size and expected_batch_size > 1:
return self._split_flat_batch_bitstrings(counts, expected_batch_size, shots)
# Single-circuit cloud results are often a flat list of bitstrings.
return self._count_bitstrings(counts)
formatted: list[dict[str, int]] = []
for c in counts:
if isinstance(c, dict):
formatted.append(dict(c))
elif isinstance(c, list):
formatted.append(self._count_bitstrings(c))
elif isinstance(c, str):
formatted.append({c: 1})
return formatted
else:
return {str(counts): 1}
@staticmethod
def _count_bitstrings(bitstrings: list[str]) -> dict[str, int]:
counts: dict[str, int] = {}
for bitstring in bitstrings:
counts[str(bitstring)] = counts.get(str(bitstring), 0) + 1
return counts
def _split_flat_batch_bitstrings(
self,
bitstrings: list[str],
expected_batch_size: int,
shots: int | None,
) -> list[dict[str, int]]:
if expected_batch_size <= 0:
return [self._count_bitstrings(bitstrings)]
if shots is not None and shots > 0 and len(bitstrings) >= shots * expected_batch_size:
chunk_size = shots
else:
chunk_size = max(1, len(bitstrings) // expected_batch_size)
out = []
for i in range(expected_batch_size):
start = i * chunk_size
end = start + chunk_size
out.append(self._count_bitstrings(bitstrings[start:end]))
return out
# -------------------------------------------------------------------------
# Synchronous wait
# -------------------------------------------------------------------------
[docs]
def query_sync(
self,
taskid: str | list[str],
interval: float = 2.0,
timeout: float = 60.0,
retry: int = 5,
) -> list[dict[str, Any]]:
"""Poll task status until completion or timeout.
Args:
taskid: Task ID or list of task IDs.
interval: Polling interval in seconds.
timeout: Maximum wait time in seconds.
retry: Number of retries on query failure.
Returns:
List of result dicts.
Raises:
TimeoutError: If timeout is reached.
RuntimeError: If task fails or retry exhausted.
"""
taskids = [taskid] if isinstance(taskid, str) else taskid
starttime = time.time()
while True:
elapsed = time.time() - starttime
if elapsed > timeout:
raise TimeoutError("Reached the maximum timeout.")
time.sleep(interval)
taskinfo = self.query_batch(taskids)
if taskinfo["status"] == TASK_STATUS_RUNNING:
continue
if taskinfo["status"] == TASK_STATUS_SUCCESS:
return taskinfo.get("result", [])
if taskinfo["status"] == TASK_STATUS_FAILED:
raise RuntimeError(f"Failed to execute, errorinfo = {taskinfo.get('result')}")
if retry > 0:
retry -= 1
warnings.warn(f"Query failed. Retry remains {retry} times.", stacklevel=2)
else:
raise RuntimeError("Retry count exhausted.")
# -------------------------------------------------------------------------
# Chip characterization
# -------------------------------------------------------------------------
[docs]
def get_chip_characterization(self, backend_name: str):
"""Return per-qubit and per-pair calibration data for a hardware backend.
Parameters
----------
backend_name:
Backend name, e.g. ``"origin:wuyuan:wk180"``, ``"originq:WK_C180"``,
or the bare chip name ``"WK_C180"``. The chip name is extracted by
stripping any ``origin:`` or ``originq:`` prefix because
``QCloudBackend.chip_info()`` only accepts bare chip names.
Returns
-------
ChipCharacterization or None
None if the backend is not found or chip info is unavailable.
"""
from datetime import datetime, timezone
from uniqc.backend_adapter.backend_info import Platform, QubitTopology
from uniqc.cli.chip_info import (
ChipCharacterization,
ChipGlobalInfo,
SingleQubitData,
TwoQubitData,
TwoQubitGateData,
)
self._ensure_imports()
# pyqpanda3 chip_info() requires the canonical bare cloud backend name.
chip_name = self._canonical_backend_name(backend_name)
backend = self._service.backend(chip_name)
try:
ci = backend.chip_info()
except Exception:
return None
# Available qubits
available_qubits = tuple(int(q) for q in ci.available_qubits())
# Connectivity
raw_topo = ci.get_chip_topology() or []
connectivity = tuple(QubitTopology(u=u, v=v) for u, v in raw_topo)
# Per-qubit data
single_qubit_data: list[SingleQubitData] = []
for sq in ci.single_qubit_info() or []:
fid_0 = sq.get_readout_fidelity_0() if hasattr(sq, "get_readout_fidelity_0") else None
fid_1 = sq.get_readout_fidelity_1() if hasattr(sq, "get_readout_fidelity_1") else None
avg_ro = sq.get_readout_fidelity() if hasattr(sq, "get_readout_fidelity") else None
single_qubit_data.append(
SingleQubitData(
qubit_id=int(sq.get_qubit_id()) if hasattr(sq, "get_qubit_id") else 0,
t1=sq.get_t1(),
t2=sq.get_t2(),
single_gate_fidelity=sq.get_single_gate_fidelity(),
readout_fidelity_0=fid_0,
readout_fidelity_1=fid_1,
avg_readout_fidelity=avg_ro,
)
)
# Per-pair data
# Build a qubit-pair lookup from the topology so we can look up
# (u, v) by index even when double_qubits_info() objects lack
# get_qubit_u() / get_qubit_v() methods (the fallback used before).
topo_by_index: dict[int, tuple[int, int]] = {i: (u, v) for i, (u, v) in enumerate(raw_topo)}
two_qubit_data: list[TwoQubitData] = []
for idx, dq in enumerate(ci.double_qubits_info() or []):
fid = dq.get_fidelity() if hasattr(dq, "get_fidelity") else None
# Prefer the dedicated accessors if available; otherwise use topology index
if hasattr(dq, "get_qubit_u") and hasattr(dq, "get_qubit_v"):
u = dq.get_qubit_u()
v = dq.get_qubit_v()
else:
u, v = topo_by_index.get(idx, (0, 0))
two_qubit_data.append(
TwoQubitData(
qubit_u=u,
qubit_v=v,
gates=(TwoQubitGateData(gate="cx", fidelity=fid),),
)
)
# Global info
single_gates: list[str] = []
two_gates: list[str] = []
sq_gate_time: float | None = None
tq_gate_time: float | None = None
try:
cfg = backend.configuration()
gates = cfg.supported_gates() if hasattr(cfg, "supported_gates") else []
for g in gates:
g_lower = g.lower()
if (
g_lower in {"h", "x", "y", "z", "s", "sx", "t", "i", "rx", "ry", "rz", "u1", "u2", "u3"}
and g not in single_gates
):
single_gates.append(g)
elif g_lower in {"cnot", "cz", "iswap", "ecr", "swap"} and g not in two_gates:
two_gates.append(g)
if hasattr(cfg, "single_qubit_gate_time"):
sq_gate_time = float(cfg.single_qubit_gate_time())
if hasattr(cfg, "two_qubit_gate_time"):
tq_gate_time = float(cfg.two_qubit_gate_time())
except Exception:
pass
return ChipCharacterization(
platform=Platform.ORIGINQ,
chip_name=chip_name,
full_id=f"originq:{chip_name}",
available_qubits=available_qubits,
connectivity=connectivity,
single_qubit_data=tuple(single_qubit_data),
two_qubit_data=tuple(two_qubit_data),
global_info=ChipGlobalInfo(
single_qubit_gates=tuple(single_gates),
two_qubit_gates=tuple(two_gates),
single_qubit_gate_time=sq_gate_time,
two_qubit_gate_time=tq_gate_time,
),
calibrated_at=datetime.now(timezone.utc).isoformat(),
)
# -------------------------------------------------------------------------
# Dry-run validation
# -------------------------------------------------------------------------
[docs]
def dry_run(self, originir: str, *, shots: int = 1000, **kwargs: Any) -> DryRunResult:
"""Dry-run validation for OriginQ Cloud backends.
Validates offline by calling translate_circuit() which internally calls
convert_originir_string_to_qprog() — a purely local pyqpanda3 call.
The pyqpanda3 compiler will reject unknown gates.
This method makes NO network calls.
Note:
Any dry-run success followed by actual submission failure is a
critical bug. Please report it at the UnifiedQuantum issue tracker.
"""
from uniqc.backend_adapter.circuit_adapter import OriginQCircuitAdapter
from uniqc.backend_adapter.task.adapters.base import _dry_run_failed, _dry_run_success
backend_name = kwargs.get("backend_name", self._last_backend_name)
# Extract qubit count from OriginIR QINIT line (no API call)
circuit_qubits: int | None = None
try:
for line in originir.splitlines():
line = line.strip()
if line.startswith("QINIT"):
parts = line.split()
if len(parts) >= 2:
circuit_qubits = int(parts[1])
break
except Exception:
pass
# Attempt translation — this is a purely local pyqpanda3 call
try:
self.translate_circuit(originir)
except Exception as e:
return _dry_run_failed(
str(e),
details=(
f"OriginIR translation to QProg failed for backend '{backend_name}': {e}. "
"The circuit may use gates not supported by pyqpanda3."
),
backend_name=backend_name,
)
return _dry_run_success(
(
f"Dry-run passed for '{backend_name}': OriginIR translates cleanly "
f"to QProg. Qubits={circuit_qubits}, shots={shots}"
),
backend_name=backend_name,
circuit_qubits=circuit_qubits,
supported_gates=tuple(sorted(OriginQCircuitAdapter.SUPPORTED_GATES)),
)