Source code for uniqc.calibration.xeb.parallel_cz

"""Parallel-CZ XEB calibration: chip-pre-flight 2-qubit gate fidelity.

This module implements a *generic* parallel cross-entropy benchmarking
protocol useful as a pre-flight chip characterization step (i.e.
before running any larger experiment that depends on per-pair CZ
fidelity).

Protocol
--------
For a fixed CZ pattern (a matching of disjoint qubit pairs):

    cycle = U3(haar) on every region qubit, then CZ on every pair in the pattern

Repeat for ``depth`` cycles, then measure all region qubits. Sample
``shots`` bitstrings per circuit and ``K`` random circuit instances per
``(pattern, depth)`` combination.

For each pair in the matching, the full N-qubit circuit factorises as
a tensor product over pairs (1q ops are local; CZs are between
disjoint pairs). The 2-qubit marginal distribution on each pair is
exactly the 2-qubit pure-state distribution of that pair's
sub-circuit, so we can score every pair independently using its
2-qubit ``F_XEB(d)``. Fitting ``F(d) = beta · alpha^d`` per pair
yields the per-cycle CZ fidelity ``alpha``.

Public API
----------
* :class:`Schedule` — replayable per-cycle U3 angles + CZ pattern
* :func:`build_parallel_cz_xeb_circuit` — single circuit + Schedule
* :func:`build_parallel_cz_xeb_corpus` — full corpus
* :func:`pair_marginal_counts` — 2-qubit marginal of N-qubit counts
* :func:`pair_ideal_probs` — 2-qubit ideal probabilities for a pair
* :func:`per_pair_F_XEB` — per-(record, pair) F_XEB
* :func:`fit_pair_decays` — weighted log-LS fit ``F(d) = beta·alpha^d``
* :class:`ParallelCZBenchmarker` — end-to-end runner
"""

from __future__ import annotations

import dataclasses
import math
import time
from collections.abc import Iterable, Mapping, Sequence
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any

import numpy as np

from uniqc.calibration.results import (
    XEBResult,
    save_calibration_result,
)

if TYPE_CHECKING:
    from uniqc.backend_adapter.task.adapters.base import QuantumAdapter

PairKey = tuple[int, int]

__all__ = [
    "Schedule",
    "ProbeCircuit",
    "PairCircuitFit",
    "PairDecay",
    "build_parallel_cz_xeb_circuit",
    "build_parallel_cz_xeb_corpus",
    "pair_marginal_counts",
    "pair_ideal_probs",
    "per_pair_F_XEB",
    "fit_pair_decays",
    "ParallelCZBenchmarker",
]


# ---------------------------------------------------------------------------
# Schedule + circuit construction
# ---------------------------------------------------------------------------


[docs] @dataclasses.dataclass(frozen=True) class Schedule: """Per-cycle U3 angles and CZ pattern for a single XEB instance. ``cycles`` is a tuple of ``(angles_per_qubit, pattern)`` pairs. ``angles_per_qubit`` is ordered the same as ``region_qubits``. """ region_qubits: tuple[int, ...] measured_qubits: tuple[int, ...] cycles: tuple[ tuple[ tuple[tuple[float, float, float], ...], tuple[tuple[int, int], ...], ], ..., ] seed: int @property def depth(self) -> int: return len(self.cycles)
[docs] @dataclasses.dataclass(frozen=True) class ProbeCircuit: """One built circuit + its replayable schedule and metadata.""" circuit: Any # uniqc.Circuit schedule: Schedule pattern_idx: int pattern: tuple[PairKey, ...] depth: int instance: int record_id: str
def _haar_u3_angles(rng: np.random.Generator) -> tuple[float, float, float]: """Draw one Haar-random U3 Euler triple (theta, phi, lam). Sampled from the Haar measure on SU(2): cos(theta) ~ U[-1, 1] (i.e. theta = arccos(1 - 2u), u ~ U[0,1]) phi ~ U[0, 2π], lam ~ U[0, 2π] """ u = float(rng.random()) theta = float(math.acos(1.0 - 2.0 * u)) phi = float(rng.uniform(0.0, 2.0 * math.pi)) lam = float(rng.uniform(0.0, 2.0 * math.pi)) return theta, phi, lam def _pattern_id(pattern: Sequence[tuple[int, int]]) -> int: """Deterministic 32-bit fingerprint of a pattern (independent of Python's randomised tuple hashing).""" h = 0 for a, b in pattern: h = (h * 1_000_003 + int(a) * 65537 + int(b)) & 0xFFFF_FFFF return h
[docs] def build_parallel_cz_xeb_circuit( region_qubits: Sequence[int], pattern: Sequence[tuple[int, int]], depth: int, *, seed: int = 0, instance: int = 0, ) -> ProbeCircuit: """Build one parallel-CZ XEB probe circuit. Each cycle is ``U3(haar) on every region qubit, then CZ on every pair in pattern``. Single-qubit angles are drawn from a deterministic RNG seeded by ``(seed, depth, instance, pattern_id)`` so different patterns get statistically independent fillings. """ if depth <= 0: raise ValueError("depth must be positive") region = tuple(int(q) for q in region_qubits) qset = set(region) pat_t = tuple((int(a), int(b)) for a, b in pattern) for a, b in pat_t: if a not in qset or b not in qset: raise ValueError(f"pattern edge ({a},{b}) outside region {region}") from uniqc import Circuit # 0x50435A58 = "PCZX" — fixed salt, makes seeds disjoint from other XEB families ss = np.random.SeedSequence([int(seed), int(depth), int(instance), 0x50435A58, _pattern_id(pat_t)]) rng = np.random.default_rng(ss) cycles: list[tuple[tuple[tuple[float, float, float], ...], tuple[tuple[int, int], ...]]] = [] circuit = Circuit() for _ in range(depth): angles: list[tuple[float, float, float]] = [] for q in region: theta, phi, lam = _haar_u3_angles(rng) angles.append((theta, phi, lam)) circuit.u3(q, theta, phi, lam) for a, b in pat_t: circuit.cz(a, b) cycles.append((tuple(angles), pat_t)) for q in region: circuit.measure(q) schedule = Schedule( region_qubits=region, measured_qubits=region, cycles=tuple(cycles), seed=int(seed), ) return ProbeCircuit( circuit=circuit, schedule=schedule, pattern_idx=0, pattern=pat_t, depth=int(depth), instance=int(instance), record_id=f"P0_d{int(depth)}_inst{int(instance)}", )
[docs] def build_parallel_cz_xeb_corpus( region_qubits: Sequence[int], patterns: Sequence[Sequence[tuple[int, int]]], depths: Sequence[int], instances: int, *, seed: int = 2026, ) -> list[ProbeCircuit]: """Build the full XEB corpus. Returns ``len(patterns) * len(depths) * instances`` probes. Within a single ``(pattern, depth)`` slot, ``instances`` independent random U3 fillings are emitted. """ out: list[ProbeCircuit] = [] for p_idx, pat in enumerate(patterns): pat_t = tuple((int(a), int(b)) for a, b in pat) for d in depths: for inst in range(instances): pc = build_parallel_cz_xeb_circuit( region_qubits, pat_t, depth=int(d), seed=seed, instance=int(inst), ) rid = f"P{p_idx}_d{int(d)}_inst{int(inst)}" out.append( dataclasses.replace( pc, pattern_idx=p_idx, record_id=rid, ) ) return out
# --------------------------------------------------------------------------- # 2-qubit marginal counts + ideal probabilities # --------------------------------------------------------------------------- def _outcome_to_int(outcome: Any, width: int) -> int: """Normalise a counts dict key to a non-negative integer. Accepts ``int`` (used as-is), ``"0b…"``, or a binary string of any length (padded / truncated from the right to ``width`` bits via ``int(text[-width:], 2)`` to match standard left-padded conventions). """ if isinstance(outcome, int): return int(outcome) text = str(outcome).strip() if text.startswith("0b"): return int(text, 2) if set(text) <= {"0", "1"} and len(text) >= 1: return int(text[-width:] if width > 0 and len(text) >= width else text, 2) return int(text)
[docs] def pair_marginal_counts( counts: Mapping[Any, int], measured_qubits: Sequence[int], pair: PairKey, ) -> dict[int, int]: """Marginalise full N-qubit counts to a 2-qubit pair. ``measured_qubits`` defines the bit positions of the count keys: the integer index of an outcome has **bit ``k`` (LSB+k) = state of the ``k``-th measured qubit**. This matches ``Simulator``'s ``simulate_pmeasure`` ordering when ``format(idx, f'0{n}b')`` is used to render the count key as a binary string. Returns counts indexed by ``(bb << 1) | ba`` where ``ba`` (bit 0, LSB) is the measured outcome on ``pair[0]`` and ``bb`` (bit 1) is the outcome on ``pair[1]``. This matches the layout produced by the 2-qubit reference simulation in :func:`pair_ideal_probs`. """ a, b = int(pair[0]), int(pair[1]) measured = list(int(q) for q in measured_qubits) pos_a = measured.index(a) pos_b = measured.index(b) n = len(measured) out: dict[int, int] = {0: 0, 1: 0, 2: 0, 3: 0} for k, v in counts.items(): ki = _outcome_to_int(k, n) ba = (ki >> pos_a) & 1 bb = (ki >> pos_b) & 1 out[(bb << 1) | ba] += int(v) return {k: v for k, v in out.items() if v > 0}
[docs] def pair_ideal_probs(schedule: Schedule, pair: PairKey) -> np.ndarray: """Build and simulate the 2-qubit subcircuit on ``pair``. Only the cycle's 1q ops on the pair's two qubits and the CZ on that pair (when present in the cycle's pattern) are kept. Returns a length-4 probability vector indexed as ``(bb << 1) | ba`` where ``ba`` is ``pair[0]``'s outcome and ``bb`` is ``pair[1]``'s — i.e. the index has **bit 0 = pair[0]**, **bit 1 = pair[1]**, matching the layout produced by :func:`pair_marginal_counts`. """ from uniqc import Circuit from uniqc.simulator import Simulator a, b = int(pair[0]), int(pair[1]) region = schedule.region_qubits pos_a = region.index(a) pos_b = region.index(b) c = Circuit() for angles, pattern in schedule.cycles: ta, pa, la = angles[pos_a] tb, pb, lb = angles[pos_b] c.u3(0, ta, pa, la) c.u3(1, tb, pb, lb) if (a, b) in pattern or (b, a) in pattern: c.cz(0, 1) # measure(0) -> bit 0 of integer index = sim-q0 = pair[0] # measure(1) -> bit 1 = sim-q1 = pair[1] c.measure(0) c.measure(1) sim = Simulator(backend_type="statevector") return np.asarray(sim.simulate_pmeasure(c.originir), dtype=float)
# --------------------------------------------------------------------------- # Per-record / per-pair XEB + decay fit # ---------------------------------------------------------------------------
[docs] @dataclasses.dataclass(frozen=True) class PairCircuitFit: """Per-(record, pair) F_XEB computed on the 2-qubit marginal.""" record_id: str pair: PairKey pattern_idx: int depth: int instance: int F_XEB: float F_XEB_sigma: float shots: float
[docs] @dataclasses.dataclass(frozen=True) class PairDecay: """Per-pair weighted log-LS fit ``F(d) = beta · alpha^d``.""" pair: PairKey n_points: int alpha: float beta: float log_alpha: float log_beta: float sigma_log_alpha: float sigma_log_beta: float log_residual_std: float
def _record_view(probe: ProbeCircuit | dict, counts: Mapping[Any, int]) -> dict: """Normalise either a ``ProbeCircuit`` + counts pair, or a dict view, into the dict shape used by the per-pair pipeline.""" if isinstance(probe, ProbeCircuit): return { "record_id": probe.record_id, "schedule": probe.schedule, "pattern_idx": probe.pattern_idx, "pattern": probe.pattern, "depth": probe.depth, "instance": probe.instance, "counts": dict(counts), } out = dict(probe) if "counts" not in out: out["counts"] = dict(counts) return out
[docs] def per_pair_F_XEB( records: Iterable[dict | ProbeCircuit], pairs: Sequence[PairKey], *, counts_by_record: Mapping[str, Mapping[Any, int]] | None = None, ) -> list[PairCircuitFit]: """Compute per-pair F_XEB for every (record, pair) combination. Each ``record`` must carry a :class:`Schedule` (under key ``"schedule"``) plus ``record_id``, ``pattern_idx``, ``depth``, ``instance``. Counts may be embedded under ``"counts"`` or supplied via ``counts_by_record`` (keyed by ``record_id``). Uses the *normalised* (un-biased at small N) linear XEB estimator F = (<P_id>_meas - 1/D) / (Σ P_id² - 1/D) which equals ``1`` for the noiseless circuit and ``0`` for a fully depolarised pair, regardless of depth-dependent deviations from Porter–Thomas. Returns one :class:`PairCircuitFit` per (record, pair) where the pair is in the record's ``measured_qubits``. """ out: list[PairCircuitFit] = [] for r in records: if isinstance(r, ProbeCircuit): counts = counts_by_record.get(r.record_id, {}) if counts_by_record else {} d = _record_view(r, counts) else: counts = ( counts_by_record.get(r["record_id"], r.get("counts", {})) if counts_by_record else r.get("counts", {}) ) d = _record_view(r, counts) sched: Schedule = d["schedule"] if not isinstance(sched, Schedule): raise TypeError("record['schedule'] must be a Schedule instance") measured = sched.measured_qubits for pair in pairs: if pair[0] not in measured or pair[1] not in measured: continue ideal = pair_ideal_probs(sched, pair) marg = pair_marginal_counts(d["counts"], measured, pair) if not marg: continue shots = float(sum(marg.values())) keys = np.fromiter(marg.keys(), dtype=np.int64) wts = np.fromiter(marg.values(), dtype=np.float64) p_at_keys = ideal[keys] mean_p_meas = float(np.dot(wts, p_at_keys) / shots) D = float(ideal.size) # 4 for 2-qubit uniform_overlap = 1.0 / D ideal_contrast = float(np.dot(ideal, ideal)) - uniform_overlap if abs(ideal_contrast) < 1e-12: # Ideal distribution is uniform — F is undefined; skip. continue f_xeb = (mean_p_meas - uniform_overlap) / ideal_contrast var_p = float(np.dot(wts, (p_at_keys - mean_p_meas) ** 2) / shots) sigma = math.sqrt(max(var_p, 0.0) / shots) / abs(ideal_contrast) out.append( PairCircuitFit( record_id=str(d["record_id"]), pair=(int(pair[0]), int(pair[1])), pattern_idx=int(d["pattern_idx"]), depth=int(d["depth"]), instance=int(d["instance"]), F_XEB=f_xeb, F_XEB_sigma=sigma, shots=shots, ) ) return out
[docs] def fit_pair_decays( fits: Sequence[PairCircuitFit], *, log_floor: float = 1e-4, ) -> dict[PairKey, PairDecay]: """Per-pair weighted log-LS fit of ``F(d) = beta · alpha^d``. Drops circuits where ``F <= log_floor`` (cannot take log). Weights are ``1/sigma_logF^2`` with ``sigma_logF = sigma_F / F``. """ by_pair: dict[PairKey, list[PairCircuitFit]] = {} for f in fits: by_pair.setdefault(f.pair, []).append(f) out: dict[PairKey, PairDecay] = {} for pair, group in by_pair.items(): depths = np.array([g.depth for g in group], dtype=float) Fs = np.array([g.F_XEB for g in group], dtype=float) sig = np.array([max(g.F_XEB_sigma, 1e-9) for g in group], dtype=float) mask = Fs > log_floor if mask.sum() < 2: continue ds_m, fs_m, sig_m = depths[mask], Fs[mask], sig[mask] log_f = np.log(fs_m) sig_log = sig_m / fs_m w = 1.0 / np.maximum(sig_log, 1e-9) ** 2 A = np.vstack([ds_m, np.ones_like(ds_m)]).T AtWA = A.T @ (w[:, None] * A) AtWy = A.T @ (w * log_f) try: cov = np.linalg.inv(AtWA) except np.linalg.LinAlgError: continue sol = cov @ AtWy m, c = float(sol[0]), float(sol[1]) sigma_m = float(math.sqrt(max(cov[0, 0], 0.0))) sigma_c = float(math.sqrt(max(cov[1, 1], 0.0))) residuals = log_f - (m * ds_m + c) res_std = float(np.std(residuals, ddof=1)) if mask.sum() > 2 else 0.0 out[pair] = PairDecay( pair=pair, n_points=int(mask.sum()), alpha=float(np.exp(m)), beta=float(np.exp(c)), log_alpha=m, log_beta=c, sigma_log_alpha=sigma_m, sigma_log_beta=sigma_c, log_residual_std=res_std, ) return out
# --------------------------------------------------------------------------- # Benchmarker # ---------------------------------------------------------------------------
[docs] class ParallelCZBenchmarker: """End-to-end parallel-CZ XEB runner. Args: adapter: A ``QuantumAdapter`` supporting ``submit``/``query`` (and optionally ``submit_batch``/``query_batch``). DummyAdapter is also supported via its ``simulate_pmeasure`` fast path. shots: Number of shots per circuit. cache_dir: If supplied, per-pair :class:`XEBResult` objects are saved here under the standard naming convention. seed: Master seed for the corpus RNG. query_timeout: Per-circuit / per-batch poll timeout (seconds). query_interval: Poll interval (seconds). """ def __init__( self, adapter: QuantumAdapter, shots: int = 5000, cache_dir: str | None = None, seed: int | None = None, query_timeout: float = 600.0, query_interval: float = 2.0, ) -> None: self.adapter = adapter self.shots = int(shots) self.cache_dir = cache_dir self.seed = 2026 if seed is None else int(seed) self.query_timeout = float(query_timeout) self.query_interval = float(query_interval) # -- public API ------------------------------------------------------
[docs] def run( self, region_qubits: Sequence[int], patterns: Sequence[Sequence[tuple[int, int]]], depths: Sequence[int], instances: int = 20, ) -> dict: """Build a corpus, execute it, fit per-pair decays. Returns a dict with keys: * ``corpus``: list[ProbeCircuit] * ``counts_by_record``: dict[record_id, counts] * ``per_pair_fits``: list[PairCircuitFit] * ``per_pair_decays``: dict[pair, PairDecay] * ``per_pair_results``: dict[pair, XEBResult] """ corpus = build_parallel_cz_xeb_corpus( region_qubits, patterns, depths, instances, seed=self.seed, ) counts_by_record = self._execute(corpus) records = [_record_view(pc, counts_by_record.get(pc.record_id, {})) for pc in corpus] all_pairs: dict[PairKey, None] = {} for pat in patterns: for a, b in pat: all_pairs.setdefault((int(a), int(b)), None) pairs = list(all_pairs.keys()) fits = per_pair_F_XEB(records, pairs) decays = fit_pair_decays(fits) backend_name = getattr(self.adapter, "name", None) or type(self.adapter).__name__.lower() per_pair_results: dict[PairKey, XEBResult] = {} depth_tuple = tuple(int(d) for d in depths) ts = _utc_now() for pair, decay in decays.items(): r = XEBResult( calibrated_at=ts, backend=str(backend_name), type="xeb_2q_parallel", qubit=None, pairs=[pair], fidelity_per_layer=float(decay.alpha), fidelity_std_error=float(decay.alpha * decay.sigma_log_alpha), fit_a=float(decay.beta), fit_b=0.0, fit_r=float(decay.alpha), depths=depth_tuple, n_circuits=int(instances), shots=self.shots, ) per_pair_results[pair] = r if self.cache_dir is not None: save_calibration_result( r, type_prefix="xeb_2q_parallel", cache_dir=self.cache_dir, ) return { "corpus": corpus, "counts_by_record": counts_by_record, "per_pair_fits": fits, "per_pair_decays": decays, "per_pair_results": per_pair_results, }
# -- internal -------------------------------------------------------- def _execute(self, corpus: Sequence[ProbeCircuit]) -> dict[str, dict[Any, int]]: """Run the corpus on the adapter and return a {record_id: counts} map.""" # DummyAdapter fast path: exact pmeasure -> sample shots locally. if hasattr(self.adapter, "simulate_pmeasure"): return self._execute_via_pmeasure(corpus) # Cloud / generic adapters: prefer batch when available. if hasattr(self.adapter, "submit_batch") and hasattr(self.adapter, "query_batch"): return self._execute_via_batch(corpus) # Fallback: per-circuit submit/query loop. out: dict[str, dict[Any, int]] = {} for pc in corpus: counts = self._submit_and_wait_counts(pc.circuit.originir) out[pc.record_id] = counts return out def _execute_via_pmeasure(self, corpus: Sequence[ProbeCircuit]) -> dict[str, dict[Any, int]]: rng = np.random.default_rng(self.seed + 1) out: dict[str, dict[Any, int]] = {} for pc in corpus: probs = np.asarray( self.adapter.simulate_pmeasure(pc.circuit.originir), dtype=float, ) n_meas = len(pc.schedule.measured_qubits) if probs.size == 0: out[pc.record_id] = {} continue probs = probs / probs.sum() samples = rng.choice(probs.size, size=self.shots, p=probs) keys, vals = np.unique(samples, return_counts=True) out[pc.record_id] = {format(int(k), f"0{n_meas}b"): int(v) for k, v in zip(keys, vals)} return out def _execute_via_batch(self, corpus: Sequence[ProbeCircuit]) -> dict[str, dict[Any, int]]: originirs = [pc.circuit.originir for pc in corpus] task_ids = self.adapter.submit_batch(originirs, shots=self.shots) deadline = time.time() + max(self.query_timeout, self.query_timeout * len(originirs) / 20) while True: result = self.adapter.query_batch(task_ids) status = result.get("status", "") if status == "success": payload = result.get("result", []) if isinstance(payload, dict): payload = [payload] if not isinstance(payload, list) or len(payload) != len(corpus): raise RuntimeError( f"Unexpected batch result payload: " f"got {len(payload) if hasattr(payload, '__len__') else '?'}" f" results for {len(corpus)} circuits" ) return {pc.record_id: dict(p) for pc, p in zip(corpus, payload)} if status == "failed": raise RuntimeError(f"XEB batch failed: {result.get('result') or result.get('error')}") if time.time() >= deadline: raise TimeoutError(f"Timed out waiting for XEB batch task(s) {task_ids}") time.sleep(self.query_interval) def _submit_and_wait_counts(self, originir: str) -> dict[Any, int]: task_id = self.adapter.submit(originir, shots=self.shots) deadline = time.time() + self.query_timeout while True: result = self.adapter.query(task_id) status = result.get("status", "") if status == "success": raw = result.get("result", {}) if hasattr(raw, "counts"): return raw.counts if isinstance(raw, dict): return raw return {} if status == "failed": raise RuntimeError(f"XEB circuit failed: {result.get('result') or result.get('error')}") if time.time() >= deadline: raise TimeoutError(f"Timed out waiting for XEB task {task_id}") time.sleep(self.query_interval)
def _utc_now() -> str: return datetime.now(timezone.utc).isoformat()