Source code for uniqc.calibration.readout.calibrator

"""Readout calibrator for single-qubit and two-qubit measurement errors.

Generates calibration circuits for all computational basis states,
executes them on the given adapter, builds confusion matrices,
and saves results to the calibration cache.
"""

from __future__ import annotations

import pathlib
import time
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any

from uniqc.circuit_builder import Circuit

if TYPE_CHECKING:
    from uniqc.backend_adapter.task.adapters.base import QuantumAdapter
    from uniqc.calibration.results import ReadoutCalibrationResult

__all__ = ["ReadoutCalibrator"]


[docs] class ReadoutCalibrator: """Calibrates readout (measurement) errors for 1-qubit and 2-qubit systems. For 1-qubit: runs two circuits to build a 2×2 confusion matrix: [[P(0|0), P(1|0)], [P(0|1), P(1|1)]] For 2-qubit: runs four circuits to build a 4×4 confusion matrix: rows = measured outcome (00,01,10,11) cols = prepared state (00,01,10,11) Results are automatically saved to ``~/.uniqc/calibration_cache/`` with an ISO-8601 ``calibrated_at`` timestamp. Args: adapter: A ``QuantumAdapter`` instance (e.g. ``DummyAdapter``). Must implement ``submit`` and ``query`` methods. shots: Number of measurement shots per calibration circuit. cache_dir: Directory to save calibration results. Defaults to ``~/.uniqc/calibration_cache/``. """ def __init__( self, adapter: QuantumAdapter, shots: int = 1000, cache_dir: str | pathlib.Path | None = None, timeout: float = 300.0, poll_interval: float = 2.0, ) -> None: self.adapter = adapter self.shots = shots self.timeout = timeout self.poll_interval = poll_interval if cache_dir is None: cache_dir = pathlib.Path.home() / ".uniqc" / "calibration_cache" self.cache_dir = pathlib.Path(cache_dir) self.cache_dir.mkdir(parents=True, exist_ok=True) # ------------------------------------------------------------------------- # Public API # -------------------------------------------------------------------------
[docs] def calibrate_1q(self, qubit: int) -> ReadoutCalibrationResult: """Calibrate readout for a single qubit. Args: qubit: Qubit index. Returns: A :class:`ReadoutCalibrationResult` dataclass with fields: ``qubit``, ``type="readout_1q"``, ``confusion_matrix`` (tuple of tuples), ``assignment_fidelity``, ``calibrated_at``, ``backend``. Supports dict-like access (``result["confusion_matrix"]``) for backward compatibility. """ from uniqc.calibration.results import ReadoutCalibrationResult, save_calibration_result # Run calibration circuits counts_0 = self._run_prepared_state([], qubit) # |0⟩: identity counts_1 = self._run_prepared_state([("X", qubit)], qubit) # |1⟩ # Build 2×2 confusion matrix # Row 0: measured=0; Row 1: measured=1 # Col 0: prepared=0; Col 1: prepared=1 n0 = sum(counts_0.values()) n1 = sum(counts_1.values()) p00 = counts_0.get(0, 0) / n0 if n0 > 0 else 0.0 # P(meas=0|prep=0) p10 = counts_0.get(1, 0) / n0 if n0 > 0 else 0.0 # P(meas=1|prep=0) p01 = counts_1.get(0, 0) / n1 if n1 > 0 else 0.0 # P(meas=0|prep=1) p11 = counts_1.get(1, 0) / n1 if n1 > 0 else 0.0 # P(meas=1|prep=1) confusion = [[p00, p01], [p10, p11]] assignment_fid = (p00 + p11) / 2.0 result = ReadoutCalibrationResult( calibrated_at=_utc_now(), backend=getattr(self.adapter, "name", "unknown"), type="readout_1q", qubit=qubit, confusion_matrix=tuple(tuple(row) for row in confusion), assignment_fidelity=assignment_fid, ) save_calibration_result(result, type_prefix="readout_1q", cache_dir=self.cache_dir) return result
[docs] def calibrate_2q(self, qubit_u: int, qubit_v: int) -> ReadoutCalibrationResult: """Calibrate joint readout for a two-qubit pair. Args: qubit_u: First qubit index. qubit_v: Second qubit index. Returns: A :class:`ReadoutCalibrationResult` dataclass with fields: ``qubit`` (tuple), ``type="readout_2q"``, ``confusion_matrix`` (tuple of tuples), ``assignment_fidelity``, ``calibrated_at``, ``backend``. Supports dict-like access (``result["confusion_matrix"]``) for backward compatibility. """ from uniqc.calibration.results import ReadoutCalibrationResult, save_calibration_result # 4 basis states: |00⟩, |01⟩, |10⟩, |11⟩ # Index in confusion matrix: 0=|00⟩, 1=|01⟩, 2=|10⟩, 3=|11⟩ prep_circuits = [ ([], 0), # |00⟩ → idx 0 ([("X", qubit_v)], 2), # |01⟩ → idx 2 ([("X", qubit_u)], 1), # |10⟩ → idx 1 ([("X", qubit_u), ("X", qubit_v)], 3), # |11⟩ → idx 3 ] confusion = [[0.0] * 4 for _ in range(4)] ns = [0.0] * 4 for gates, prep_idx in prep_circuits: counts = self._run_prepared_state_2q(gates, qubit_u, qubit_v) n_total = sum(counts.values()) ns[prep_idx] = n_total for outcome, cnt in counts.items(): confusion[outcome][prep_idx] = cnt / n_total if n_total > 0 else 0.0 assignment_fid = sum(confusion[i][i] for i in range(4)) / 4.0 result = ReadoutCalibrationResult( calibrated_at=_utc_now(), backend=getattr(self.adapter, "name", "unknown"), type="readout_2q", qubit=(qubit_u, qubit_v), confusion_matrix=tuple(tuple(row) for row in confusion), assignment_fidelity=assignment_fid, ) save_calibration_result(result, type_prefix="readout_2q", cache_dir=self.cache_dir) return result
[docs] def calibrate_qubits(self, qubits: list[int]) -> dict[int, ReadoutCalibrationResult]: """Calibrate readout for multiple single qubits. Args: qubits: List of qubit indices. Returns: Dict mapping qubit index → :class:`ReadoutCalibrationResult`. """ return {q: self.calibrate_1q(q) for q in qubits}
[docs] def calibrate_pairs(self, pairs: list[tuple[int, int]]) -> dict[tuple[int, int], ReadoutCalibrationResult]: """Calibrate joint readout for multiple qubit pairs. Args: pairs: List of (qubit_u, qubit_v) tuples. Returns: Dict mapping ``(u, v)`` → :class:`ReadoutCalibrationResult`. """ return {(u, v): self.calibrate_2q(u, v) for u, v in pairs}
# ------------------------------------------------------------------------- # Internal helpers # ------------------------------------------------------------------------- def _run_prepared_state(self, prep_gates: list[tuple[str, int]], qubit: int) -> dict[int, int]: """Run a 1-qubit calibration circuit and return measurement counts. Args: prep_gates: List of (gate_name, qubit) gates to apply before measure. qubit: Qubit index. Returns: Dict mapping measured bit (0 or 1) → count. """ c = Circuit(1) for gate, q in prep_gates: c.add_gate(gate, qubits=[q]) c.measure(qubit) # measure qubit to next available classical bit return self._submit_and_measure(c) def _run_prepared_state_2q( self, prep_gates: list[tuple[str, int]], qubit_u: int, qubit_v: int, ) -> dict[int, int]: """Run a 2-qubit calibration circuit and return measurement counts. Args: prep_gates: List of (gate_name, qubit) gates. qubit_u: First qubit index. qubit_v: Second qubit index. Returns: Dict mapping measured bitstring (0-3) → count. Bitstring encoding: bit 0 = qubit_u (LSB), bit 1 = qubit_v. """ c = Circuit(2) for gate, q in prep_gates: c.add_gate(gate, qubits=[q]) c.measure(qubit_u) c.measure(qubit_v) return self._submit_and_measure_2q(c, qubit_u, qubit_v) def _submit_and_measure(self, circuit: Circuit) -> dict[int, int]: """Submit a 1-qubit circuit and return counts as ``{0: n0, 1: n1}``. Polls the cloud backend until the task reaches a terminal status, using ``self.timeout`` (default ``300.0`` s) as the maximum wait and ``self.poll_interval`` (default ``2.0`` s) between polls. Both can be overridden via the constructor's ``timeout=`` / ``poll_interval=`` keyword arguments. """ originir = circuit.originir task_id = self.adapter.submit(originir, shots=self.shots) timeout = self.timeout interval = self.poll_interval elapsed = 0.0 while elapsed < timeout: result = self.adapter.query(task_id) status = result.get("status", "") if status == "success": unified = result.get("result") if hasattr(unified, "counts"): counts = unified.counts elif isinstance(unified, dict): counts = unified else: counts = {} return {_outcome_to_int(k, 1): v for k, v in counts.items()} if status == "failed": raise RuntimeError(f"Readout calibration circuit failed: {result.get('result') or result.get('error')}") time.sleep(interval) elapsed += interval raise TimeoutError(f"Timed out waiting for readout calibration task {task_id}") def _submit_and_measure_2q(self, circuit: Circuit, qubit_u: int, qubit_v: int) -> dict[int, int]: """Submit a 2-qubit circuit and return counts as ``{0..3: n}``. Converts simulator outcome indices (``0="00"``, ``1="01"``, ``2="10"``, ``3="11"``) to integers where ``qubit_u`` is the LSB. Polls the cloud backend until the task reaches a terminal status, using ``self.timeout`` (default ``300.0`` s) and ``self.poll_interval`` (default ``2.0`` s); both are configurable via the constructor's ``timeout=`` / ``poll_interval=`` keyword arguments. """ originir = circuit.originir task_id = self.adapter.submit(originir, shots=self.shots) timeout = self.timeout interval = self.poll_interval elapsed = 0.0 while elapsed < timeout: result = self.adapter.query(task_id) status = result.get("status", "") if status == "success": unified = result.get("result") if hasattr(unified, "counts"): counts = unified.counts elif isinstance(unified, dict): counts = unified else: counts = {} out = {} for k, v in counts.items(): k_int = _outcome_to_int(k, 2) out[k_int] = v return out if status == "failed": raise RuntimeError(f"Readout calibration circuit failed: {result.get('result') or result.get('error')}") time.sleep(interval) elapsed += interval raise TimeoutError(f"Timed out waiting for readout calibration task {task_id}")
def _utc_now() -> str: return datetime.now(timezone.utc).isoformat() def _outcome_to_int(outcome: Any, width: int) -> int: if isinstance(outcome, int): return outcome text = str(outcome).strip() if text.startswith("0b"): return int(text, 2) if set(text) <= {"0", "1"}: return int(text[-width:] if width > 0 else text, 2) return int(text)