"""Calibration result schemas for UnifiedQuantum.
All calibration results include a `calibrated_at` ISO-8601 timestamp
set by the calibration module. Results are saved to ~/.uniqc/calibration_cache/.
"""
from __future__ import annotations
import dataclasses
import json
import pathlib
from datetime import datetime, timezone
from typing import Any, Literal
__all__ = [
"CalibrationResult",
"XEBResult",
"ReadoutCalibrationResult",
]
def _now_iso() -> str:
"""Return current UTC time as ISO-8601 string."""
return datetime.now(timezone.utc).isoformat()
# ---------------------------------------------------------------------------
# Base
# ---------------------------------------------------------------------------
[docs]
@dataclasses.dataclass(frozen=True, slots=True)
class CalibrationResult:
"""Base class for all calibration results."""
calibrated_at: str # ISO-8601 UTC timestamp, set by calibration module
backend: str # e.g. "dummy:local:simulator", "originq:WK_C180"
type: str # "xeb_1q" | "xeb_2q" | "xeb_2q_parallel" | "readout_1q" | "readout_2q"
[docs]
def to_dict(self) -> dict[str, Any]:
return dataclasses.asdict(self)
[docs]
@classmethod
def from_dict(cls, d: dict[str, Any]) -> CalibrationResult:
raise NotImplementedError(f"from_dict not implemented for {cls.__name__}")
# ---------------------------------------------------------------------------
# XEB
# ---------------------------------------------------------------------------
[docs]
@dataclasses.dataclass(frozen=True, slots=True)
class XEBResult(CalibrationResult):
"""Result of a cross-entropy benchmarking experiment."""
qubit: int | None = None # None for parallel multi-pair XEB
pairs: list[tuple[int, int]] | None = None # for parallel XEB
type: Literal["xeb_1q", "xeb_2q", "xeb_2q_parallel"] = "xeb_1q"
# Exponential fit: F(m) = A * r^m + B
fidelity_per_layer: float = 0.0 # r from exponential fit (0 < r <= 1)
fidelity_std_error: float = 0.0
fit_a: float = 0.0
fit_b: float = 0.0
fit_r: float = 0.0
depths: tuple[int, ...] = ()
n_circuits: int = 0
shots: int = 0
[docs]
def to_dict(self) -> dict[str, Any]:
d = dataclasses.asdict(self)
# Convert tuples to lists for JSON
d["pairs"] = list(self.pairs) if self.pairs is not None else None
d["depths"] = list(self.depths)
return d
[docs]
@classmethod
def from_dict(cls, d: dict[str, Any]) -> XEBResult:
d = dict(d)
d["depths"] = tuple(d["depths"])
d["pairs"] = [(int(a), int(b)) for a, b in d["pairs"]] if d.get("pairs") else None
return cls(**d)
# ---------------------------------------------------------------------------
# Readout Calibration
# ---------------------------------------------------------------------------
[docs]
@dataclasses.dataclass(frozen=True, slots=True)
class ReadoutCalibrationResult(CalibrationResult):
"""Result of a readout calibration experiment.
Attributes:
type: "readout_1q" or "readout_2q"
qubit: int for 1q, tuple[int, int] for 2q
confusion_matrix:
1q: 2x2 matrix where rows=measured, cols=prepared.
``confusion_matrix[i][j] = P(measure=i | prep=j)``, so the
diagonal is ``[P(0|0), P(1|1)]``.
2q: 4x4 matrix where rows=measured, cols=prepared (|00⟩,|01⟩,|10⟩,|11⟩)
assignment_fidelity: average diagonal element = (p00+p11)/2 (1q) or avg(diagonal) (2q)
"""
type: Literal["readout_1q", "readout_2q"] = "readout_1q"
qubit: int | tuple[int, int] = 0
confusion_matrix: tuple[tuple[float, ...], ...] = () # 2x2 or 4x4
assignment_fidelity: float = 0.0
def __getitem__(self, name: str) -> Any:
"""Dict-like access for backward compatibility."""
if hasattr(self, name):
return getattr(self, name)
raise KeyError(name)
def __contains__(self, name: str) -> bool:
"""Support ``"key" in result`` for backward compatibility."""
return hasattr(self, name)
[docs]
def to_dict(self) -> dict[str, Any]:
d = dataclasses.asdict(self)
d["confusion_matrix"] = [list(row) for row in self.confusion_matrix]
return d
[docs]
@classmethod
def from_dict(cls, d: dict[str, Any]) -> ReadoutCalibrationResult:
d = dict(d)
d["confusion_matrix"] = tuple(tuple(row) for row in d["confusion_matrix"])
q = d["qubit"]
d["qubit"] = tuple(q) if isinstance(q, list) else int(q)
return cls(**d)
# ---------------------------------------------------------------------------
# Cache I/O helpers
# ---------------------------------------------------------------------------
_CALIBRATION_CACHE_DIR = pathlib.Path.home() / ".uniqc" / "calibration_cache"
[docs]
def ensure_cache_dir() -> pathlib.Path:
"""Ensure the calibration cache directory exists."""
_CALIBRATION_CACHE_DIR.mkdir(parents=True, exist_ok=True)
return _CALIBRATION_CACHE_DIR
[docs]
def save_calibration_result(
result: CalibrationResult,
*,
type_prefix: str,
cache_dir: pathlib.Path | str | None = None,
) -> str:
"""Save a calibration result to the cache and return the file path."""
if cache_dir is None:
cache_dir = ensure_cache_dir()
else:
cache_dir = pathlib.Path(cache_dir)
cache_dir.mkdir(parents=True, exist_ok=True)
qid = _qubit_identifier(result)
ts = result.calibrated_at.replace("+00:00", "Z").replace(":", "").replace("-", "")
filename = f"{type_prefix}_{result.backend}_{qid}_{ts}.json"
path = cache_dir / filename
with open(path, "w") as f:
json.dump(result.to_dict(), f, indent=2)
return str(path)
def _qubit_identifier(result: CalibrationResult) -> str:
if isinstance(result, XEBResult):
if result.pairs is not None:
return "pairs-" + "-".join(f"{a}-{b}" for a, b in result.pairs)
return f"q{result.qubit}"
if isinstance(result, ReadoutCalibrationResult):
if isinstance(result.qubit, tuple):
return f"pair-{result.qubit[0]}-{result.qubit[1]}"
return f"q{result.qubit}"
return "unknown"
[docs]
def load_calibration_result(path: str | pathlib.Path) -> CalibrationResult:
"""Load a calibration result from a cache file."""
with open(path) as f:
d = json.load(f)
t = d["type"]
if t.startswith("xeb"):
return XEBResult.from_dict(d)
return ReadoutCalibrationResult.from_dict(d)
[docs]
def find_cached_results(
backend: str,
result_type: str,
*,
max_age_hours: float | None = None,
cache_dir: pathlib.Path | str | None = None,
) -> list[pathlib.Path]:
"""Find cached calibration result files matching backend + type.
Optionally filters by age.
"""
if cache_dir is None:
cache_dir = ensure_cache_dir()
else:
cache_dir = pathlib.Path(cache_dir)
cache_dir.mkdir(parents=True, exist_ok=True)
prefix = f"{result_type}_{backend}_"
results = []
for p in cache_dir.iterdir():
if p.is_file() and p.name.startswith(prefix):
if max_age_hours is not None:
mtime = datetime.fromtimestamp(p.stat().st_mtime, tz=timezone.utc)
now = datetime.now(timezone.utc)
age_hours = (now - mtime).total_seconds() / 3600
if age_hours > max_age_hours:
continue
results.append(p)
return sorted(results)