Source code for uniqc.backend_adapter.task.adapters.quafu_adapter

"""[Deprecated] Quafu backend adapter.

.. deprecated::
   The Quafu platform path is deprecated and is **no longer installable via a
   ``[quafu]`` extra**.  The adapter code is retained for backwards
   compatibility but future releases do not guarantee consistency or
   completeness.  Users who still need it must install ``pyquafu``
   manually::

       pip install pyquafu

   Note: ``pyquafu`` requires ``numpy<2`` and may downgrade your environment.

Translates OriginIR circuits to Quafu QuantumCircuit objects and submits
via the ``quafu`` package (User / Task API).  No raw REST calls.
"""

from __future__ import annotations

import warnings

warnings.warn(
    "uniqc.backend_adapter.task.adapters.quafu_adapter is deprecated; "
    "the Quafu platform SDK (pyquafu) is no longer maintained and the "
    "[quafu] extra has been removed. Install pyquafu manually if needed "
    "(pip install pyquafu, requires numpy<2).",
    DeprecationWarning,
    stacklevel=2,
)

__all__ = ["QuafuAdapter"]

import re
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any

from uniqc.backend_adapter.backend_info import Platform, QubitTopology
from uniqc.backend_adapter.task.adapters.base import (
    TASK_STATUS_FAILED,
    TASK_STATUS_RUNNING,
    TASK_STATUS_SUCCESS,
    DryRunResult,
    QuantumAdapter,
)
from uniqc.backend_adapter.task.optional_deps import MissingDependencyError, check_quafu
from uniqc.cli.chip_info import (
    ChipCharacterization,
    ChipGlobalInfo,
    SingleQubitData,
    TwoQubitData,
    TwoQubitGateData,
)
from uniqc.config import load_quafu_config

if TYPE_CHECKING:
    pass  # type hints use string annotations via `from __future__ import annotations`


# Static qubit count map for offline dry-run qubit validation.
# Sources: BAQIS ScQ public chip specifications.
CHIP_QUBIT_COUNTS: dict[str, int] = {
    "ScQ-P10": 10,
    "ScQ-P18": 18,
    "ScQ-P136": 136,
    "ScQ-P10C": 10,
    "Dongling": 24,
    "ScQ-Sim10": 10,
    "ScQ-Sim": 25,
    "ScQ-P5": 5,
    "ScQ-P102": 102,
    "ScQ-P21": 21,
    "ScQ-P3": 3,
    "ScQ-TEST": 5,
    "Baiwang": 100,
    "Miaofeng": 44,
    "Haituo": 136,
    "Baihua": 10,
    "Yunmeng": 18,
    "Xiang": 21,
}


def _avg(values: list[float]) -> float | None:
    """Return the arithmetic mean of a list, or None if the list is empty."""
    return sum(values) / len(values) if values else None


@dataclass(frozen=True, slots=True)
class _QuafuCalibration:
    """Normalized calibration payload extracted from Quafu chip_info."""

    available_qubits: tuple[int, ...]
    connectivity: tuple[QubitTopology, ...]
    single_qubit_data: tuple[SingleQubitData, ...]
    two_qubit_data: tuple[TwoQubitData, ...]
    global_info: ChipGlobalInfo


def _clean_quafu_gate_name(gate: Any) -> str:
    return str(gate).strip().lstrip('[ "').rstrip('" ]')


def _parse_quafu_qubit_id(value: Any) -> int | None:
    """Parse Quafu qubit labels such as ``Q0``, ``q12`` or ``12``."""
    if isinstance(value, int):
        return value
    text = str(value).strip()
    if text.isdigit():
        return int(text)
    match = re.search(r"\d+", text)
    return int(match.group(0)) if match else None


def _parse_quafu_edge_key(edge_key: Any) -> tuple[int, int] | None:
    """Parse Quafu topology edge keys, normally ``Q0_Q1``."""
    if isinstance(edge_key, (list, tuple)) and len(edge_key) == 2:
        u = _parse_quafu_qubit_id(edge_key[0])
        v = _parse_quafu_qubit_id(edge_key[1])
        return (u, v) if u is not None and v is not None else None

    parts = [part for part in re.split(r"[_\-,\s]+", str(edge_key).strip()) if part]
    if len(parts) != 2:
        return None
    u = _parse_quafu_qubit_id(parts[0])
    v = _parse_quafu_qubit_id(parts[1])
    return (u, v) if u is not None and v is not None else None


def _number(value: Any) -> float | None:
    if value is None:
        return None
    try:
        return float(value)
    except (TypeError, ValueError):
        return None


def _quafu_full_info(chip_info: dict[str, Any]) -> dict[str, Any]:
    full_info = chip_info.get("full_info")
    return full_info if isinstance(full_info, dict) else chip_info


def _quafu_topological_structure(chip_info: dict[str, Any]) -> dict[Any, Any]:
    full_info = _quafu_full_info(chip_info)
    topo = full_info.get("topological_structure") or chip_info.get("topological_structure")
    return topo if isinstance(topo, dict) else {}


def _extract_quafu_gate_fidelities(gate_data: Any) -> list[TwoQubitGateData]:
    """Extract per-gate fidelities from one Quafu edge payload."""
    if not isinstance(gate_data, dict):
        return []

    if "fidelity" in gate_data:
        return [TwoQubitGateData(gate="unknown", fidelity=_number(gate_data.get("fidelity")))]

    gates: list[TwoQubitGateData] = []
    for gate_name, gate_attrs in gate_data.items():
        gate = _clean_quafu_gate_name(gate_name)
        if not gate:
            continue
        fidelity = _number(gate_attrs.get("fidelity")) if isinstance(gate_attrs, dict) else _number(gate_attrs)
        gates.append(TwoQubitGateData(gate=gate, fidelity=fidelity))
    return gates


def _merge_gate_fidelity(existing: float | None, incoming: float | None) -> float | None:
    """Merge directed Quafu edge records into one undirected edge value.

    Quafu's own topology drawing logic collapses reverse directions by keeping
    the lower fidelity. Use the same conservative rule when both directions
    expose the same gate.
    """
    if existing is None:
        return incoming
    if incoming is None:
        return existing
    return min(existing, incoming)


def _extract_quafu_calibration(
    chip_info: dict[str, Any],
    *,
    num_qubits: int | None = None,
    valid_gates: list[str] | tuple[str, ...] | None = None,
) -> _QuafuCalibration:
    """Normalize Quafu SDK ``get_chip_info()`` output."""
    full_info = _quafu_full_info(chip_info)
    qubits_info = full_info.get("qubits_info") or {}

    single_qubit_data: list[SingleQubitData] = []
    available_qubits: set[int] = set()
    if isinstance(qubits_info, dict):
        for key, qdata in qubits_info.items():
            qid = _parse_quafu_qubit_id(key)
            if qid is None:
                continue
            available_qubits.add(qid)
            qdata = qdata if isinstance(qdata, dict) else {}
            single_qubit_data.append(
                SingleQubitData(
                    qubit_id=qid,
                    t1=_number(qdata.get("T1")),
                    t2=_number(qdata.get("T2")),
                    single_gate_fidelity=None,
                    readout_fidelity_0=None,
                    readout_fidelity_1=None,
                    avg_readout_fidelity=None,
                )
            )

    pair_gates: dict[tuple[int, int], dict[str, float | None]] = {}
    for edge_key, gate_data in _quafu_topological_structure(chip_info).items():
        edge = _parse_quafu_edge_key(edge_key)
        if edge is None:
            continue
        u, v = edge
        if u == v:
            continue
        available_qubits.update((u, v))
        key = tuple(sorted((u, v)))
        gates = pair_gates.setdefault(key, {})
        for gate in _extract_quafu_gate_fidelities(gate_data):
            gate_name = gate.gate.lower()
            gates[gate_name] = _merge_gate_fidelity(gates.get(gate_name), gate.fidelity)

    if not available_qubits and num_qubits:
        available_qubits.update(range(num_qubits))

    connectivity = tuple(QubitTopology(u=u, v=v) for u, v in sorted(pair_gates))
    two_qubit_data = tuple(
        TwoQubitData(
            qubit_u=u,
            qubit_v=v,
            gates=tuple(TwoQubitGateData(gate=gate, fidelity=fidelity) for gate, fidelity in sorted(gates.items())),
        )
        for (u, v), gates in sorted(pair_gates.items())
    )

    sq_gates: list[str] = []
    tq_gates: list[str] = []
    for gate in valid_gates or []:
        cleaned = _clean_quafu_gate_name(gate)
        if not cleaned:
            continue
        lowered = cleaned.lower()
        if lowered in {"h", "x", "y", "z", "s", "sx", "t", "rx", "ry", "rz", "u1", "u2", "u3"}:
            if cleaned not in sq_gates:
                sq_gates.append(cleaned)
        elif lowered in {"cx", "cnot", "cz", "iswap", "swap", "rxx", "ryy", "rzz", "xy"} and cleaned not in tq_gates:
            tq_gates.append(cleaned)
    for pair in two_qubit_data:
        for gate in pair.gates:
            if gate.gate not in tq_gates:
                tq_gates.append(gate.gate)

    return _QuafuCalibration(
        available_qubits=tuple(sorted(available_qubits)),
        connectivity=connectivity,
        single_qubit_data=tuple(sorted(single_qubit_data, key=lambda item: item.qubit_id)),
        two_qubit_data=two_qubit_data,
        global_info=ChipGlobalInfo(
            single_qubit_gates=tuple(sq_gates),
            two_qubit_gates=tuple(tq_gates),
        ),
    )


def _compute_quafu_fidelity(chip_info: dict[str, Any]) -> dict[str, Any]:
    """Extract fidelity and coherence metrics from a Quafu get_chip_info() result.

    Available:
      - Avg. 2Q fidelity: from ``full_info.topological_structure[edge]['cz']['fidelity']``
      - Avg. T1 / T2: from ``full_info.qubits_info[q]['T1']`` / ``['T2']`` (microseconds)

    Not available from Quafu API:
      - Avg. 1Q gate fidelity
      - Avg. readout fidelity

    Returns:
        dict with keys: avg_1q_fidelity (None), avg_2q_fidelity, avg_readout_fidelity (None),
        coherence_t1, coherence_t2.
    """
    calibration = _extract_quafu_calibration(chip_info)
    t1s = [float(item.t1) for item in calibration.single_qubit_data if item.t1 is not None]
    t2s = [float(item.t2) for item in calibration.single_qubit_data if item.t2 is not None]
    tq_fids = [
        float(gate.fidelity) for item in calibration.two_qubit_data for gate in item.gates if gate.fidelity is not None
    ]

    return {
        "avg_1q_fidelity": None,  # not available from Quafu API
        "avg_2q_fidelity": _avg(tq_fids) if tq_fids else None,
        "avg_readout_fidelity": None,  # not available from Quafu API
        "coherence_t1": _avg(t1s),
        "coherence_t2": _avg(t2s),
    }


[docs] class QuafuAdapter(QuantumAdapter): """Adapter for the BAQIS Quafu (ScQ) quantum cloud platform. Raises: MissingDependencyError: If quafu package is not installed. """ name = "quafu" # Valid chip IDs (known ScQ series chips and simulators) VALID_CHIP_IDS = frozenset( { "ScQ-P10", "ScQ-P18", "ScQ-P136", "ScQ-P10C", "Dongling", "ScQ-Sim10", "ScQ-Sim", "ScQ-P5", "ScQ-P102", "ScQ-P21", "ScQ-P3", "ScQ-TEST", "Baiwang", "Miaofeng", "Haituo", "Baihua", "Yunmeng", "Xiang", } ) # Upper limit on the number of groups retained in _task_history. # Beyond this threshold the oldest entry is evicted to avoid unbounded # memory growth in long-running processes. _MAX_HISTORY_GROUPS: int = 100 @property def api_token(self) -> str: """Return the API token used for Quafu authentication. Returns: str: The Quafu API token. """ return self._api_token def __init__(self) -> None: # Check if quafu is available if not check_quafu(): raise MissingDependencyError("quafu", "quafu") config = load_quafu_config() self._api_token: str = config["api_token"] # Internal task history: group_name -> {taskid: task_index} # Updated on each submit_batch call so retrieve() can work without # requiring the caller to pass history. self._task_history: dict[str, dict[str, int]] = {} # Track insertion order so we can evict the oldest group when the # cap is reached (simple FIFO). self._history_order: list[str] = [] from quafu import QuantumCircuit, Task, User self._QuantumCircuit = QuantumCircuit self._Task = Task self._User = User
[docs] def is_available(self) -> bool: """Check if the Quafu adapter is available (quafu package installed). Returns: bool: True if the quafu package was successfully imported. """ return check_quafu()
[docs] def list_backends(self) -> list[dict[str, Any]]: """Return raw Quafu backend metadata. For hardware backends, fetches chip_info() to populate fidelity and coherence data. Returns: List of dicts with keys: ``name``, ``num_qubits``, ``status``, ``task_in_queue``, ``qv``, ``valid_gates``, plus fidelity/coherence fields (avg_1q_fidelity, avg_2q_fidelity, avg_readout_fidelity, coherence_t1, coherence_t2). """ user = self._User(api_token=self._api_token) user.save_apitoken() raw_backends = user.get_available_backends() result: list[dict[str, Any]] = [] for name, backend in raw_backends.items(): entry: dict[str, Any] = { "name": name, "num_qubits": backend.qubit_num, "status": backend.status, "task_in_queue": backend.task_in_queue, "qv": backend.qv, "system_id": backend.system_id, "valid_gates": backend.get_valid_gates(), } # Attempt to fetch chip info for fidelity / coherence try: chip_info = backend.get_chip_info() if isinstance(chip_info, dict) and _quafu_topological_structure(chip_info): entry.update(_compute_quafu_fidelity(chip_info)) calibration = _extract_quafu_calibration( chip_info, num_qubits=backend.qubit_num, valid_gates=entry["valid_gates"], ) entry["available_qubits"] = list(calibration.available_qubits) entry["topology"] = [[edge.u, edge.v] for edge in calibration.connectivity] entry["per_qubit_calibration"] = [item.to_dict() for item in calibration.single_qubit_data] entry["per_pair_calibration"] = [item.to_dict() for item in calibration.two_qubit_data] entry["global_info"] = calibration.global_info.to_dict() entry["calibrated_at"] = datetime.now(timezone.utc).isoformat() except Exception: # noqa: BLE001 pass result.append(entry) return result
# ------------------------------------------------------------------------- # Circuit translation # -------------------------------------------------------------------------
[docs] def translate_circuit(self, originir: str) -> "QuantumCircuit": # noqa: UP037,F821 """Translate an OriginIR string to a Quafu QuantumCircuit.""" from uniqc.compile.originir.originir_line_parser import OriginIR_LineParser lines = originir.splitlines() qc: "QuantumCircuit | None" = None # noqa: UP037,F821 for line in lines: try: operation, qubit, cbit, parameter, dagger_flag, control_qubits = OriginIR_LineParser.parse_line(line) except NotImplementedError: raise RuntimeError(f"Unknown OriginIR operation in quafu adapter: {line.strip()}") from None if operation == "QINIT": qc = self._QuantumCircuit(int(qubit)) # type: ignore[arg-type] continue if qc is None: raise RuntimeError("QINIT must appear before any gate operation.") qc = self._reconstruct_qasm(qc, operation, qubit, cbit, parameter, bool(dagger_flag)) if qc is None: raise RuntimeError("OriginIR string produced no circuit.") return qc
def _reconstruct_qasm( self, qc: "QuantumCircuit", # noqa: UP037,F821 operation: str | None, qubit: int | list[int], cbit: int | None, parameter: float | list[float] | None, dagger_flag: bool = False, ) -> "QuantumCircuit": # noqa: UP037,F821 """Append a single gate to a Quafu QuantumCircuit based on parsed OriginIR. This method is called internally by translate_circuit() for each line of OriginIR after QINIT. It maps OriginIR gate names to Quafu QuantumCircuit method calls. Args: qc: The Quafu QuantumCircuit to modify (modified in-place). operation: The gate operation name (e.g., 'RX', 'H', 'CNOT'). qubit: Target qubit index or list of indices for multi-qubit gates. cbit: Classical bit index for MEASURE operations. parameter: Rotation angle for parametric gates (e.g., RX, RY, RZ). dagger_flag: Whether the gate is daggered (for S, SX, T). Returns: The modified QuantumCircuit (same object as input). Raises: RuntimeError: If the operation is not supported by this adapter. Note: Supported gates: RX, RY, RZ, H, X, Y, Z, S, SX, T, CZ, CNOT, SWAP, ISWAP, MEASURE, BARRIER. CREG and None operations are silently ignored. """ if operation == "RX": qc.rx(int(qubit), parameter) # type: ignore[arg-type] elif operation == "RY": qc.ry(int(qubit), parameter) # type: ignore[arg-type] elif operation == "RZ": qc.rz(int(qubit), parameter) # type: ignore[arg-type] elif operation == "H": qc.h(int(qubit)) # type: ignore[arg-type] elif operation == "X": qc.x(int(qubit)) # type: ignore[arg-type] elif operation == "Y": qc.y(int(qubit)) # type: ignore[arg-type] elif operation == "Z": qc.z(int(qubit)) # type: ignore[arg-type] elif operation == "S": if dagger_flag: qc.sdg(int(qubit)) # type: ignore[arg-type] else: qc.s(int(qubit)) # type: ignore[arg-type] elif operation == "SX": if dagger_flag: qc.sxdg(int(qubit)) # type: ignore[arg-type] else: qc.sx(int(qubit)) # type: ignore[arg-type] elif operation == "T": if dagger_flag: qc.tdg(int(qubit)) # type: ignore[arg-type] else: qc.t(int(qubit)) # type: ignore[arg-type] elif operation == "CZ": qc.cz(int(qubit[0]), int(qubit[1])) # type: ignore[index] elif operation == "CNOT": qc.cnot(int(qubit[0]), int(qubit[1])) # type: ignore[index] elif operation == "SWAP": qc.swap(int(qubit[0]), int(qubit[1])) # type: ignore[index] elif operation == "ISWAP": qc.iswap(int(qubit[0]), int(qubit[1])) # type: ignore[index] elif operation == "MEASURE": qc.measure([int(qubit)], [int(cbit)]) # type: ignore[list-item] elif operation == "BARRIER": # BARRIER is a no-op for execution; skip without error pass elif operation is None or operation == "CREG": pass else: raise RuntimeError(f"Unknown OriginIR operation in quafu adapter: {operation}.") return qc # ------------------------------------------------------------------------- # Task submission # -------------------------------------------------------------------------
[docs] def submit( self, circuit: "QuantumCircuit", # noqa: UP037,F821 *, shots: int = 10000, wait: bool = False, **kwargs: Any, ) -> str: """Submit a single circuit to Quafu. Args: circuit: Quafu QuantumCircuit object. shots: Number of measurement shots (default: 10000). wait: If True, block until the server acknowledges receipt (default: False). **kwargs: Additional options: - chip_id: Quafu chip ID (required, e.g. 'ScQ-P18') - auto_mapping: Enable automatic qubit mapping (default: True) - task_name: Optional task name string Returns: Task ID string. """ chip_id: str | None = kwargs.get("chip_id") auto_mapping: bool = kwargs.get("auto_mapping", True) task_name: str | None = kwargs.get("task_name") if chip_id not in self.VALID_CHIP_IDS: raise RuntimeError( r"Invalid chip_id. " r"Current quafu chip_id list: " r"ScQ-P10, ScQ-P18, ScQ-P136, ScQ-P10C, Dongling, " r"ScQ-Sim10, ScQ-Sim, ScQ-P5, ScQ-P102, ScQ-P21, ScQ-P3, ScQ-TEST, " r"Baiwang, Miaofeng, Haituo, Baihua, Yunmeng, Xiang" ) user = self._User(api_token=self._api_token) user.save_apitoken() task = self._Task() task.config(backend=chip_id, shots=shots, compile=auto_mapping) result = task.send(circuit, wait=wait, name=task_name) # type: ignore[arg-type] return result.taskid
[docs] def submit_batch( self, circuits: list["QuantumCircuit"], # noqa: UP037,F821 *, shots: int = 10000, wait: bool = False, **kwargs: Any, ) -> list[str]: """Submit multiple circuits as a group to Quafu. Args: circuits: List of Quafu QuantumCircuit objects. shots: Number of measurement shots per circuit (default: 10000). wait: If True, block until server acknowledges each submission (default: False). **kwargs: Additional options: - chip_id: Quafu chip ID (required) - auto_mapping: Enable automatic qubit mapping (default: True) - task_name: Base task name (suffixed with -0, -1, ...) - group_name: Group name for server-side batch tracking Returns: List of task ID strings. """ chip_id: str | None = kwargs.get("chip_id") auto_mapping: bool = kwargs.get("auto_mapping", True) task_name: str | None = kwargs.get("task_name") group_name: str | None = kwargs.get("group_name") if chip_id not in self.VALID_CHIP_IDS: raise RuntimeError( r"Invalid chip_id. " r"Current quafu chip_id list: " r"ScQ-P10, ScQ-P18, ScQ-P136, ScQ-P10C, Dongling, " r"ScQ-Sim10, ScQ-Sim, ScQ-P5, ScQ-P102, ScQ-P21, ScQ-P3, ScQ-TEST, " r"Baiwang, Miaofeng, Haituo, Baihua, Yunmeng, Xiang" ) user = self._User(api_token=self._api_token) user.save_apitoken() task = self._Task() task.config(backend=chip_id, shots=shots, compile=auto_mapping) taskids: list[str] = [] for index, c in enumerate(circuits): result = task.send( c, wait=wait, name=f"{task_name}-{index}" if task_name else None, group=group_name, # type: ignore[arg-type] ) taskids.append(result.taskid) # Maintain history so query() can retrieve without caller-supplied history. # Apply FIFO eviction when the cap is reached. if group_name: if group_name not in self._task_history: if len(self._history_order) >= self._MAX_HISTORY_GROUPS: oldest = self._history_order.pop(0) self._task_history.pop(oldest, None) self._task_history[group_name] = {} self._history_order.append(group_name) for i, taskid in enumerate(taskids): self._task_history[group_name][taskid] = i return taskids
# ------------------------------------------------------------------------- # Task query # -------------------------------------------------------------------------
[docs] def query(self, taskid: str) -> dict[str, Any]: """Query a single Quafu task's status via SDK ``Task.retrieve()``. Uses the internally maintained history dict so the caller does not need to pass any additional context. """ user = self._User(api_token=self._api_token) user.save_apitoken() task = self._Task() # Build a minimal history dict: try all known groups. # Task.retrieve(taskid, history) will look up the taskid in history. for group_name, id_to_idx in self._task_history.items(): if taskid in id_to_idx: result = task.retrieve(taskid, history={group_name: {taskid: id_to_idx[taskid]}}) return self._result_to_dict(result) # Fallback: try without history (may work if server accepts taskid alone) result = task.retrieve(taskid) return self._result_to_dict(result)
def _result_to_dict(self, result) -> dict[str, Any]: """Convert a Quafu ExecResult to the adapter's standard result dict. This method normalizes Quafu's task status strings and extracts measurement results when the task has completed successfully. Args: result: A quafu.ExecResult object from Task.retrieve(). Returns: dict with keys: - ``status``: ``'success'`` | ``'failed'`` | ``'running'`` - ``result``: dict with ``counts`` and ``probabilities`` (when success) Note: The Quafu status strings are mapped as follows: - 'Completed' -> 'success' - 'Running', 'In Queue' -> 'running' - 'Failed', 'Canceled' -> 'failed' """ status_map = { "Completed": TASK_STATUS_SUCCESS, "Running": TASK_STATUS_RUNNING, "In Queue": TASK_STATUS_RUNNING, "Failed": TASK_STATUS_FAILED, "Canceled": TASK_STATUS_FAILED, } status_str = result.task_status status = status_map.get(status_str, TASK_STATUS_RUNNING) if status == TASK_STATUS_SUCCESS: # Quafu's bitstring convention is q[0]/c[0] as the LEFTMOST # character. uniqc convention (see docs/source/guide/ # platform_conventions.md §2.6) is c[0] as the RIGHTMOST # character. Reverse each key on the way out. raw_counts = dict(result.counts) counts = {str(k)[::-1]: int(v) for k, v in raw_counts.items()} return { "status": status, "result": counts, } return {"status": status}
[docs] def query_batch(self, taskids: list[str]) -> dict[str, Any]: """Query multiple Quafu tasks and merge results.""" taskinfo: dict[str, Any] = {"status": TASK_STATUS_SUCCESS, "result": []} for taskid in taskids: result_i = self.query(taskid) if result_i["status"] == TASK_STATUS_FAILED: taskinfo["status"] = TASK_STATUS_FAILED break elif result_i["status"] == TASK_STATUS_RUNNING: taskinfo["status"] = TASK_STATUS_RUNNING if taskinfo["status"] == TASK_STATUS_SUCCESS: taskinfo["result"].append(result_i.get("result", {})) return taskinfo
[docs] def query_sync( self, taskid: str | list[str], interval: float = 2.0, timeout: float = 60.0, ) -> list[dict[str, Any]]: """Poll task status until completion or timeout. Args: taskid: Task ID or list of task IDs. interval: Polling interval in seconds (default: 2.0). timeout: Maximum wait time in seconds (default: 60.0). Returns: List of result dicts (one per task ID). Raises: TimeoutError: If timeout is reached before completion. """ import time taskids = [taskid] if isinstance(taskid, str) else taskid start = time.time() while True: if time.time() - start > timeout: raise TimeoutError(f"Task(s) timed out after {timeout}s.") time.sleep(interval) batch_result = self.query_batch(taskids) if batch_result["status"] != TASK_STATUS_RUNNING: return batch_result.get("result", [])
# ------------------------------------------------------------------------- # Chip characterization # -------------------------------------------------------------------------
[docs] def get_chip_characterization(self, chip_name: str): """Return per-qubit and per-pair calibration data for a Quafu chip. Parameters ---------- chip_name: Quafu chip ID, e.g. ``"ScQ-P18"``. Returns ------- ChipCharacterization or None """ user = self._User(api_token=self._api_token) user.save_apitoken() raw_backends = user.get_available_backends() backend = raw_backends.get(chip_name) if backend is None: return None try: chip_info = backend.get_chip_info() except Exception: return None valid_gates = backend.get_valid_gates() if hasattr(backend, "get_valid_gates") else [] calibration = _extract_quafu_calibration( chip_info, num_qubits=backend.qubit_num, valid_gates=valid_gates, ) # Gate times — quafu backend may expose them via gate_time attribute sq_time: float | None = None tq_time: float | None = None try: if hasattr(backend, "gate_time"): gt = backend.gate_time if isinstance(gt, dict): sq_time = gt.get("single", None) tq_time = gt.get("double", None) except Exception: pass return ChipCharacterization( platform=Platform.QUAFU, chip_name=chip_name, full_id=f"quafu:{chip_name}", available_qubits=calibration.available_qubits, connectivity=calibration.connectivity, single_qubit_data=calibration.single_qubit_data, two_qubit_data=calibration.two_qubit_data, global_info=ChipGlobalInfo( single_qubit_gates=calibration.global_info.single_qubit_gates, two_qubit_gates=calibration.global_info.two_qubit_gates, single_qubit_gate_time=sq_time, two_qubit_gate_time=tq_time, ), calibrated_at=datetime.now(timezone.utc).isoformat(), )
# ------------------------------------------------------------------------- # Dry-run validation # -------------------------------------------------------------------------
[docs] def dry_run(self, originir: str, *, shots: int = 10000, **kwargs: Any) -> DryRunResult: """Dry-run validation for Quafu backends. Validates offline by: 1. Extracting qubit count from OriginIR QINIT line (no API call). 2. Checking chip_id in VALID_CHIP_IDS (static check). 3. Checking qubit count against CHIP_QUBIT_COUNTS (static map). 4. Attempting translation via translate_circuit() (no API call — catches unsupported gates). This method makes NO network calls. Note: Any dry-run success followed by actual submission failure is a critical bug. Please report it at the UnifiedQuantum issue tracker. """ from uniqc.backend_adapter.task.adapters.base import _dry_run_failed, _dry_run_success chip_id: str | None = kwargs.get("chip_id") # Step 1: Extract qubit count from OriginIR QINIT line (no API call) circuit_qubits: int | None = None try: for line in originir.splitlines(): line = line.strip() if line.startswith("QINIT"): parts = line.split() if len(parts) >= 2: circuit_qubits = int(parts[1]) break except Exception: pass # Step 2: Validate chip_id against static VALID_CHIP_IDS if chip_id is None: return _dry_run_success( "Dry-run passed (no chip_id): OriginIR is syntactically valid. " "Specify --chip-id for full chip-level validation.", backend_name=None, circuit_qubits=circuit_qubits, warnings=("No chip_id provided — skipping chip-level validation.",), ) if chip_id not in self.VALID_CHIP_IDS: return _dry_run_failed( f"chip_id '{chip_id}' is not in the known Quafu chip list", details=(f"Invalid chip_id. Valid chips: {', '.join(sorted(self.VALID_CHIP_IDS))}"), backend_name=chip_id, ) # Step 3: Qubit count check against static chip map if circuit_qubits is not None: max_qubits = CHIP_QUBIT_COUNTS.get(chip_id) if max_qubits is not None and circuit_qubits > max_qubits: return _dry_run_failed( f"circuit requires {circuit_qubits} qubits but chip '{chip_id}' has {max_qubits}", details=f"Qubit count validation failed: circuit={circuit_qubits}, chip={max_qubits}", backend_name=chip_id, ) # Step 4: Attempt translation (catches unsupported gates) try: self.translate_circuit(originir) except RuntimeError as e: err_str = str(e) if "Unknown OriginIR operation" in err_str or "not supported" in err_str.lower(): return _dry_run_failed( err_str, details=( f"Circuit contains gates not supported by Quafu adapter. " f"Supported gates: {', '.join(sorted(self.SUPPORTED_GATES))}. " f"Error: {e}" ), backend_name=chip_id, ) return _dry_run_failed( str(e), details=f"Translation failed with unexpected RuntimeError: {e}", backend_name=chip_id, ) except Exception as e: return _dry_run_failed( str(e), details=f"Failed to translate OriginIR to Quafu QuantumCircuit: {e}", backend_name=chip_id, ) # Shots validation — Quafu default max is typically 100000 MAX_QUAFU_SHOTS = 100000 if shots > MAX_QUAFU_SHOTS: return _dry_run_failed( f"shots ({shots}) exceeds Quafu maximum ({MAX_QUAFU_SHOTS})", details="Shot count validation failed.", backend_name=chip_id, ) return _dry_run_success( ( f"Dry-run passed for '{chip_id}': circuit translates cleanly to " f"Quafu QuantumCircuit. Qubits={circuit_qubits}, shots={shots}" ), backend_name=chip_id, circuit_qubits=circuit_qubits, supported_gates=tuple(sorted(self.SUPPORTED_GATES)), )