Source code for uniqc.task.adapters.quafu_adapter

"""Quafu backend adapter.

Translates OriginIR circuits to Quafu QuantumCircuit objects and submits
via the ``quafu`` package (User / Task API).  No raw REST calls.

Installation:
    pip install unified-quantum[quafu]
"""

from __future__ import annotations

__all__ = ["QuafuAdapter"]

from typing import TYPE_CHECKING, Any

from uniqc.task.adapters.base import (
    TASK_STATUS_FAILED,
    TASK_STATUS_RUNNING,
    TASK_STATUS_SUCCESS,
    QuantumAdapter,
)
from uniqc.task.config import load_quafu_config
from uniqc.task.optional_deps import MissingDependencyError, check_quafu

if TYPE_CHECKING:
    pass  # type hints use string annotations via `from __future__ import annotations`


def _avg(values: list[float]) -> float | None:
    """Return the arithmetic mean of a list, or None if the list is empty."""
    return sum(values) / len(values) if values else None


def _compute_quafu_fidelity(chip_info: dict[str, Any]) -> dict[str, Any]:
    """Extract fidelity and coherence metrics from a Quafu get_chip_info() result.

    Available:
      - Avg. 2Q fidelity: from ``full_info.topological_structure[edge]['cz']['fidelity']``
      - Avg. T1 / T2: from ``full_info.qubits_info[q]['T1']`` / ``['T2']`` (microseconds)

    Not available from Quafu API:
      - Avg. 1Q gate fidelity
      - Avg. readout fidelity

    Returns:
        dict with keys: avg_1q_fidelity (None), avg_2q_fidelity, avg_readout_fidelity (None),
        coherence_t1, coherence_t2.
    """
    full_info: dict[str, Any] = chip_info.get("full_info") or {}

    # T1/T2 from qubits_info (values are in microseconds already)
    t1s, t2s = [], []
    qubits_info: dict[str, dict[str, Any]] = full_info.get("qubits_info") or {}
    for qdata in qubits_info.values():
        if (t1 := qdata.get("T1")) is not None:
            t1s.append(float(t1))
        if (t2 := qdata.get("T2")) is not None:
            t2s.append(float(t2))

    # 2Q fidelity from topological_structure (directed edges, take each once)
    seen: set[tuple[int, int]] = set()
    tq_fids: list[float] = []
    topo: dict[str, dict[str, Any]] = full_info.get("topological_structure") or {}
    for edge_key, gate_data in topo.items():
        parts = edge_key.split("_")
        if len(parts) == 2:
            u, v = (
                int(parts[0][1:]) if parts[0].startswith("Q") else int(parts[0]),
                int(parts[1][1:]) if parts[1].startswith("Q") else int(parts[1]),
            )
            key = (u, v)
            if key not in seen:
                seen.add(key)
                if (f := gate_data.get("cz", {}).get("fidelity")) is not None:
                    tq_fids.append(float(f))

    return {
        "avg_1q_fidelity": None,  # not available from Quafu API
        "avg_2q_fidelity": _avg(tq_fids) if tq_fids else None,
        "avg_readout_fidelity": None,  # not available from Quafu API
        "coherence_t1": _avg(t1s),
        "coherence_t2": _avg(t2s),
    }


[docs] class QuafuAdapter(QuantumAdapter): """Adapter for the BAQIS Quafu (ScQ) quantum cloud platform. Raises: MissingDependencyError: If quafu package is not installed. """ name = "quafu" # Valid chip IDs VALID_CHIP_IDS = frozenset({"ScQ-P10", "ScQ-P18", "ScQ-P136", "ScQ-P10C", "Dongling"}) # Upper limit on the number of groups retained in _task_history. # Beyond this threshold the oldest entry is evicted to avoid unbounded # memory growth in long-running processes. _MAX_HISTORY_GROUPS: int = 100 @property def api_token(self) -> str: """Return the API token used for Quafu authentication. Returns: str: The Quafu API token. """ return self._api_token def __init__(self) -> None: # Check if quafu is available if not check_quafu(): raise MissingDependencyError("quafu", "quafu") config = load_quafu_config() self._api_token: str = config["api_token"] # Internal task history: group_name -> {taskid: task_index} # Updated on each submit_batch call so retrieve() can work without # requiring the caller to pass history. self._task_history: dict[str, dict[str, int]] = {} # Track insertion order so we can evict the oldest group when the # cap is reached (simple FIFO). self._history_order: list[str] = [] from quafu import QuantumCircuit, Task, User self._QuantumCircuit = QuantumCircuit self._Task = Task self._User = User
[docs] def is_available(self) -> bool: """Check if the Quafu adapter is available (quafu package installed). Returns: bool: True if the quafu package was successfully imported. """ return check_quafu()
[docs] def list_backends(self) -> list[dict[str, Any]]: """Return raw Quafu backend metadata. For hardware backends, fetches chip_info() to populate fidelity and coherence data. Returns: List of dicts with keys: ``name``, ``num_qubits``, ``status``, ``task_in_queue``, ``qv``, ``valid_gates``, plus fidelity/coherence fields (avg_1q_fidelity, avg_2q_fidelity, avg_readout_fidelity, coherence_t1, coherence_t2). """ user = self._User(api_token=self._api_token) user.save_apitoken() raw_backends = user.get_available_backends() result: list[dict[str, Any]] = [] for name, backend in raw_backends.items(): entry: dict[str, Any] = { "name": name, "num_qubits": backend.qubit_num, "status": backend.status, "task_in_queue": backend.task_in_queue, "qv": backend.qv, "system_id": backend.system_id, "valid_gates": backend.get_valid_gates(), } # Attempt to fetch chip info for fidelity / coherence try: chip_info = backend.get_chip_info() if isinstance(chip_info, dict) and chip_info.get("full_info"): entry.update(_compute_quafu_fidelity(chip_info)) except Exception: # noqa: BLE001 pass result.append(entry) return result
# ------------------------------------------------------------------------- # Circuit translation # -------------------------------------------------------------------------
[docs] def translate_circuit(self, originir: str) -> "QuantumCircuit": # noqa: UP037,F821 """Translate an OriginIR string to a Quafu QuantumCircuit.""" from uniqc.originir.originir_line_parser import OriginIR_LineParser lines = originir.splitlines() qc: "QuantumCircuit | None" = None # noqa: UP037,F821 for line in lines: try: operation, qubit, cbit, parameter, dagger_flag, control_qubits = OriginIR_LineParser.parse_line(line) except NotImplementedError: raise RuntimeError(f"Unknown OriginIR operation in quafu adapter: {line.strip()}") from None if operation == "QINIT": qc = self._QuantumCircuit(int(qubit)) # type: ignore[arg-type] continue if qc is None: raise RuntimeError("QINIT must appear before any gate operation.") qc = self._reconstruct_qasm(qc, operation, qubit, cbit, parameter) if qc is None: raise RuntimeError("OriginIR string produced no circuit.") return qc
def _reconstruct_qasm( self, qc: "QuantumCircuit", # noqa: UP037,F821 operation: str | None, qubit: int | list[int], cbit: int | None, parameter: float | list[float] | None, ) -> "QuantumCircuit": # noqa: UP037,F821 """Append a single gate to a Quafu QuantumCircuit based on parsed OriginIR. This method is called internally by translate_circuit() for each line of OriginIR after QINIT. It maps OriginIR gate names to Quafu QuantumCircuit method calls. Args: qc: The Quafu QuantumCircuit to modify (modified in-place). operation: The gate operation name (e.g., 'RX', 'H', 'CNOT'). qubit: Target qubit index or list of indices for multi-qubit gates. cbit: Classical bit index for MEASURE operations. parameter: Rotation angle for parametric gates (e.g., RX, RY, RZ). Returns: The modified QuantumCircuit (same object as input). Raises: RuntimeError: If the operation is not supported by this adapter. Note: Supported gates: RX, RY, RZ, H, X, CZ, CNOT, MEASURE. CREG and None operations are silently ignored. """ if operation == "RX": qc.rx(int(qubit), parameter) # type: ignore[arg-type] elif operation == "RY": qc.ry(int(qubit), parameter) # type: ignore[arg-type] elif operation == "RZ": qc.rz(int(qubit), parameter) # type: ignore[arg-type] elif operation == "H": qc.h(int(qubit)) # type: ignore[arg-type] elif operation == "X": qc.x(int(qubit)) # type: ignore[arg-type] elif operation == "CZ": qc.cz(int(qubit[0]), int(qubit[1])) # type: ignore[index] elif operation == "CNOT": qc.cnot(int(qubit[0]), int(qubit[1])) # type: ignore[index] elif operation == "MEASURE": qc.measure([int(qubit)], [int(cbit)]) # type: ignore[list-item] elif operation is None or operation == "CREG": pass else: raise RuntimeError(f"Unknown OriginIR operation in quafu adapter: {operation}.") return qc # ------------------------------------------------------------------------- # Task submission # -------------------------------------------------------------------------
[docs] def submit( self, circuit: "QuantumCircuit", # noqa: UP037,F821 *, shots: int = 10000, **kwargs: Any, ) -> str: """Submit a single circuit to Quafu.""" chip_id: str | None = kwargs.get("chip_id") auto_mapping: bool = kwargs.get("auto_mapping", True) task_name: str | None = kwargs.get("task_name") if chip_id not in self.VALID_CHIP_IDS: raise RuntimeError( r"Invalid chip_id. " r"Current quafu chip_id list: " r"['ScQ-P10','ScQ-P18','ScQ-P136', 'ScQ-P10C', 'Dongling']" ) user = self._User(api_token=self._api_token) user.save_apitoken() task = self._Task() task.config(backend=chip_id, shots=shots, compile=auto_mapping) result = task.send(circuit, wait=False, name=task_name) # type: ignore[arg-type] return result.taskid
[docs] def submit_batch( self, circuits: list["QuantumCircuit"], # noqa: UP037,F821 *, shots: int = 10000, **kwargs: Any, ) -> list[str]: """Submit multiple circuits as a group to Quafu.""" chip_id: str | None = kwargs.get("chip_id") auto_mapping: bool = kwargs.get("auto_mapping", True) task_name: str | None = kwargs.get("task_name") group_name: str | None = kwargs.get("group_name") if chip_id not in self.VALID_CHIP_IDS: raise RuntimeError( r"Invalid chip_id. " r"Current quafu chip_id list: " r"['ScQ-P10','ScQ-P18','ScQ-P136', 'ScQ-P10C', 'Dongling']" ) user = self._User(api_token=self._api_token) user.save_apitoken() task = self._Task() task.config(backend=chip_id, shots=shots, compile=auto_mapping) taskids: list[str] = [] for index, c in enumerate(circuits): result = task.send( c, wait=False, name=f"{task_name}-{index}", group=group_name, # type: ignore[arg-type] ) taskids.append(result.taskid) # Maintain history so query() can retrieve without caller-supplied history. # Apply FIFO eviction when the cap is reached. if group_name: if group_name not in self._task_history: if len(self._history_order) >= self._MAX_HISTORY_GROUPS: oldest = self._history_order.pop(0) self._task_history.pop(oldest, None) self._task_history[group_name] = {} self._history_order.append(group_name) for i, taskid in enumerate(taskids): self._task_history[group_name][taskid] = i return taskids
# ------------------------------------------------------------------------- # Task query # -------------------------------------------------------------------------
[docs] def query(self, taskid: str) -> dict[str, Any]: """Query a single Quafu task's status via SDK ``Task.retrieve()``. Uses the internally maintained history dict so the caller does not need to pass any additional context. """ user = self._User(api_token=self._api_token) user.save_apitoken() task = self._Task() # Build a minimal history dict: try all known groups. # Task.retrieve(taskid, history) will look up the taskid in history. for group_name, id_to_idx in self._task_history.items(): if taskid in id_to_idx: result = task.retrieve(taskid, history={group_name: {taskid: id_to_idx[taskid]}}) return self._result_to_dict(result) # Fallback: try without history (may work if server accepts taskid alone) result = task.retrieve(taskid) return self._result_to_dict(result)
def _result_to_dict(self, result) -> dict[str, Any]: """Convert a Quafu ExecResult to the adapter's standard result dict. This method normalizes Quafu's task status strings and extracts measurement results when the task has completed successfully. Args: result: A quafu.ExecResult object from Task.retrieve(). Returns: dict with keys: - ``status``: ``'success'`` | ``'failed'`` | ``'running'`` - ``result``: dict with ``counts`` and ``probabilities`` (when success) Note: The Quafu status strings are mapped as follows: - 'Completed' -> 'success' - 'Running', 'In Queue' -> 'running' - 'Failed', 'Canceled' -> 'failed' """ status_map = { "Completed": TASK_STATUS_SUCCESS, "Running": TASK_STATUS_RUNNING, "In Queue": TASK_STATUS_RUNNING, "Failed": TASK_STATUS_FAILED, "Canceled": TASK_STATUS_FAILED, } status_str = result.task_status status = status_map.get(status_str, TASK_STATUS_RUNNING) if status == TASK_STATUS_SUCCESS: return { "status": status, "result": { "counts": result.counts, "probabilities": result.probabilities, }, } return {"status": status}
[docs] def query_batch(self, taskids: list[str]) -> dict[str, Any]: """Query multiple Quafu tasks and merge results.""" taskinfo: dict[str, Any] = {"status": TASK_STATUS_SUCCESS, "result": []} for taskid in taskids: result_i = self.query(taskid) if result_i["status"] == TASK_STATUS_FAILED: taskinfo["status"] = TASK_STATUS_FAILED break elif result_i["status"] == TASK_STATUS_RUNNING: taskinfo["status"] = TASK_STATUS_RUNNING if taskinfo["status"] == TASK_STATUS_SUCCESS: taskinfo["result"].append(result_i.get("result", {})) return taskinfo