"""Qiskit backend adapter.
Translates OriginIR circuits to Qiskit QuantumCircuit objects and submits
via the ``qiskit`` / ``qiskit_ibm_runtime`` packages. No raw REST calls.
``qiskit`` is a core dependency of ``unified-quantum`` — no extra install
step is required.
"""
from __future__ import annotations
__all__ = ["QiskitAdapter"]
import time
from typing import TYPE_CHECKING, Any
from uniqc.backend_adapter.task.adapters.base import (
TASK_STATUS_FAILED,
TASK_STATUS_RUNNING,
TASK_STATUS_SUCCESS,
DryRunResult,
QuantumAdapter,
)
from uniqc.backend_adapter.task.optional_deps import MissingDependencyError, check_qiskit
from uniqc.config import load_ibm_config
if TYPE_CHECKING:
import qiskit
_QISKIT_TERMINAL_FAILED = {"ERROR", "CANCELLED", "FAILED"}
def _normalize_qiskit_status(status_name: str) -> str:
"""Map a Qiskit Runtime JobStatus name to a TASK_STATUS_* constant."""
if status_name in ("DONE", "COMPLETED"):
return TASK_STATUS_SUCCESS
if status_name in _QISKIT_TERMINAL_FAILED:
return TASK_STATUS_FAILED
return TASK_STATUS_RUNNING
[docs]
class QiskitAdapter(QuantumAdapter):
"""Adapter for IBM Quantum backends via Qiskit.
Proxy Configuration:
Proxies can be passed via the `proxy` parameter:
- Dict with 'http' and/or 'https' keys
- Or a single proxy URL string for both protocols
If no proxy is configured:
- Automatically detects and uses system proxy settings
- If no system proxy, uses direct connection
If proxy is configured:
- Uses configured proxy in addition to any system proxy
Raises:
MissingDependencyError: If qiskit or qiskit_ibm_runtime is not installed.
Example:
>>> adapter = QiskitAdapter(proxy={
... "http": "http://proxy.example.com:8080",
... "https": "https://proxy.example.com:8080"
... })
"""
name = "ibm"
# Qiskit Runtime ``Sampler.run([circuits])`` accepts a list of PUBs
# natively; one Sampler job covers many circuits and one queue slot.
# ``100`` is a conservative default; raise via subclass override or
# config if your access tier permits more.
max_native_batch_size: int = 100
def __init__(self, proxy: dict[str, str] | str | None = None) -> None:
"""Initialize the Qiskit adapter.
Args:
proxy: Optional proxy configuration.
- Dict with 'http' and/or 'https' keys
- Or a single proxy URL string
- If None, uses system proxy if available, otherwise direct
Raises:
MissingDependencyError: If qiskit is not installed.
"""
if not check_qiskit():
raise MissingDependencyError("qiskit", "qiskit")
config = load_ibm_config()
self._api_token: str = config["api_token"]
self._proxy: dict[str, str] | str | None = proxy
from qiskit_ibm_runtime import QiskitRuntimeService
# Setup proxy: merge configured proxy with system proxy
self._setup_proxy(proxy)
self._service = QiskitRuntimeService(
channel="ibm_quantum_platform",
token=self._api_token,
)
# Track batch jobs so query() can decide between scalar/list result.
self._batch_job_sizes: dict[str, int] = {}
def _setup_proxy(self, proxy: dict[str, str] | str | None) -> None:
"""Configure proxy settings for Qiskit/IBM provider.
Proxy behavior:
- If proxy is configured: use it as the proxy (replacing system proxy)
- If no proxy configured but system has proxy: use system proxy
- If no proxy configured and no system proxy: direct connection
Args:
proxy: Proxy configuration dict, URL string, or None.
"""
import os
from uniqc.backend_adapter.network_utils import detect_system_proxy
# Determine which proxies to use
if proxy is not None:
# Configured proxy takes priority
configured_proxies: dict[str, str] = {}
if isinstance(proxy, dict):
if proxy.get("http"):
configured_proxies["http"] = proxy["http"]
if proxy.get("https"):
configured_proxies["https"] = proxy["https"]
else:
configured_proxies["http"] = proxy
configured_proxies["https"] = proxy
proxies_to_use = configured_proxies
else:
# No configured proxy, use system proxy
system_proxies = detect_system_proxy()
proxies_to_use = {k: v for k, v in system_proxies.items() if v}
# Set environment variables if we have any proxies
for proto in ("http", "https"):
if proto in proxies_to_use:
proxy_url = proxies_to_use[proto]
os.environ[f"{proto.upper()}_PROXY"] = proxy_url
os.environ[f"{proto}_proxy"] = proxy_url
[docs]
def is_available(self) -> bool:
"""Check if the Qiskit adapter is available (IBM service initialized)."""
return check_qiskit() and hasattr(self, "_service") and self._service is not None
# -------------------------------------------------------------------------
# Backend discovery
# -------------------------------------------------------------------------
[docs]
def list_backends(self) -> list[dict[str, Any]]:
"""List IBM backends through QiskitRuntimeService.
The returned entries include both average metrics and per-qubit/per-edge
calibration details from ``backend.target`` so the Gateway can color
chip topologies without flattening every edge to the global average.
"""
# Lazy import to avoid a circular import at module load time —
# ``ibm_adapter`` itself imports ``QiskitAdapter`` inside its
# constructor for backwards-compatible delegation.
from uniqc.backend_adapter.task.adapters.ibm_adapter import (
_backend_configuration,
_backend_coupling_map,
_backend_is_simulator,
_backend_name,
_chip_characterization_from_backend,
_compute_ibm_fidelity,
)
raw_backends: list[dict[str, Any]] = []
for b in self._service.backends():
name = _backend_name(b)
cfg = _backend_configuration(b)
try:
status = "available" if b.status().operational else "unavailable"
except Exception:
status = "unknown"
try:
pt = b.processor_type
processor_type = pt.get("family", "") if isinstance(pt, dict) else str(pt) if pt else ""
except Exception:
processor_type = ""
coupling_map = _backend_coupling_map(b, cfg)
entry: dict[str, Any] = {
"name": name,
"simulator": _backend_is_simulator(b),
"configuration": {
"num_qubits": getattr(b, "num_qubits", 0),
"coupling_map": [list(edge) for edge in coupling_map],
"basis_gates": list(getattr(b, "basis_gates", []) or getattr(cfg, "basis_gates", []) or []),
"max_shots": getattr(b, "max_shots", None) or getattr(cfg, "max_shots", None),
"memory": getattr(b, "memory", False) or getattr(cfg, "memory", False),
"qobd": getattr(b, "qobd", False) or getattr(cfg, "qobd", False),
"supported_instructions": list(getattr(b, "supported_instructions", []) or [])
if hasattr(b, "supported_instructions")
else [],
"processor_type": processor_type,
},
"status": status,
"description": getattr(b, "description", ""),
}
try:
od = b.online_date
if od:
entry["online_date"] = str(od)
except Exception:
pass
fidelity = _compute_ibm_fidelity(b)
entry.update(fidelity)
chip = _chip_characterization_from_backend(b, backend_name=name)
if chip is not None:
entry["per_qubit_calibration"] = [item.to_dict() for item in chip.single_qubit_data]
entry["per_pair_calibration"] = [item.to_dict() for item in chip.two_qubit_data]
entry["global_info"] = chip.global_info.to_dict()
entry["calibrated_at"] = chip.calibrated_at
raw_backends.append(entry)
return raw_backends
[docs]
def get_chip_characterization(self, backend_name: str):
"""Return per-qubit and per-pair calibration data for an IBM backend.
Parameters
----------
backend_name:
IBM backend name, e.g. ``"ibm_brisbane"``.
Returns
-------
ChipCharacterization or None
"""
from uniqc.backend_adapter.task.adapters.ibm_adapter import (
_chip_characterization_from_backend,
)
try:
backend = self._service.backend(backend_name)
except Exception:
return None
return _chip_characterization_from_backend(backend, backend_name=backend_name)
# -------------------------------------------------------------------------
# Circuit translation
# -------------------------------------------------------------------------
[docs]
def translate_circuit(self, originir: str) -> qiskit.QuantumCircuit:
"""Translate an OriginIR string to a Qiskit QuantumCircuit.
The conversion path is OriginIR → QASM string → Qiskit QuantumCircuit.
"""
import qiskit
from uniqc.compile.originir import OriginIR_BaseParser
parser = OriginIR_BaseParser()
parser.parse(originir)
qasm_str = parser.to_qasm()
return qiskit.QuantumCircuit.from_qasm_str(qasm_str)
# -------------------------------------------------------------------------
# Task submission
# -------------------------------------------------------------------------
[docs]
def submit(self, circuit: qiskit.QuantumCircuit, *, shots: int = 1000, **kwargs: Any) -> str:
"""Submit a single circuit to IBM Quantum."""
chip_id: str | None = kwargs.get("chip_id")
auto_mapping: Any = kwargs.get("auto_mapping", False)
circuit_optimize: bool = kwargs.get("circuit_optimize", True)
task_name: str | None = kwargs.get("task_name")
return self._submit_impl(
circuits=[circuit],
chip_id=chip_id,
shots=shots,
auto_mapping=auto_mapping,
circuit_optimize=circuit_optimize,
task_name=task_name,
)
[docs]
def submit_batch(
self, circuits: list[qiskit.QuantumCircuit], *, shots: int = 1000, native_batch: bool = True, **kwargs: Any
) -> list[str]:
"""Submit multiple circuits as a batch.
IBM Quantum (qiskit-runtime ``Sampler``) natively supports running a
list of circuits inside a single job, which spends only one position
in the queue. With ``native_batch=True`` (default), this returns a
single-element list containing that one job ID; ``query()`` /
``wait_for_result()`` will then surface the per-circuit count
distributions as a list.
With ``native_batch=False``, each circuit is submitted as its own
job (one ID per circuit) — useful when downstream code wants
individually addressable task IDs.
Returns:
list[str]: ``[batch_job_id]`` for native batch, or one job ID per
circuit otherwise.
"""
chip_id: str | None = kwargs.get("chip_id")
auto_mapping: Any = kwargs.get("auto_mapping", False)
circuit_optimize: bool = kwargs.get("circuit_optimize", True)
task_name: str | None = kwargs.get("task_name")
if not native_batch:
return [
self._submit_impl(
circuits=[c],
chip_id=chip_id,
shots=shots,
auto_mapping=auto_mapping,
circuit_optimize=circuit_optimize,
task_name=task_name,
)
for c in circuits
]
job_id = self._submit_impl(
circuits=circuits,
chip_id=chip_id,
shots=shots,
auto_mapping=auto_mapping,
circuit_optimize=circuit_optimize,
task_name=task_name,
)
if len(circuits) > 1:
self._batch_job_sizes[job_id] = len(circuits)
return [job_id]
def _submit_impl(
self,
circuits: list[qiskit.QuantumCircuit],
*,
chip_id: str | None,
shots: int,
auto_mapping: Any,
circuit_optimize: bool,
task_name: str | None,
) -> str:
"""Internal implementation shared by submit() and submit_batch()."""
import qiskit
backends_name = [b.name for b in self._service.backends()]
if chip_id not in backends_name:
raise ValueError(f"no such chip, should be one of {backends_name}")
backend = self._service.backend(chip_id)
max_shots = backend.configuration().max_shots
if shots > max_shots:
raise ValueError(f"maximum shots number exceeded, should less than {max_shots}")
if circuit_optimize:
circuits = qiskit.compiler.transpile(circuits, backend=backend, optimization_level=3)
if auto_mapping is True:
circuits = qiskit.compiler.transpile(
circuits,
backend=backend,
layout_method="sabre",
optimization_level=1,
)
elif isinstance(auto_mapping, list):
circuits = qiskit.compiler.transpile(
circuits,
backend=backend,
initial_layout=auto_mapping,
optimization_level=1,
)
else:
circuits = qiskit.compiler.transpile(circuits, backend=backend, optimization_level=1)
from qiskit_ibm_runtime import Sampler
sampler = Sampler(mode=backend)
job = sampler.run(circuits, shots=shots)
return job.job_id()
# -------------------------------------------------------------------------
# Task query
# -------------------------------------------------------------------------
[docs]
def query(self, taskid: str) -> dict[str, Any]:
"""Query a single IBM Quantum job's status."""
job = self._service.job(taskid)
status = job.status()
status_name = status.name if hasattr(status, "name") else str(status)
if status_name not in ("DONE", "COMPLETED"):
return {
"status": _normalize_qiskit_status(status_name),
"value": status.value if hasattr(status, "value") else status_name,
}
raw_result = job.result()
results = []
# Qiskit Runtime Sampler returns PrimitiveResult — iterate over pub results
for pub_result in raw_result:
data = pub_result.data
# Each data field is a BitArray (one per measurement)
counts: dict[str, int] = {}
shots = None
n_bits = None
for field_name in dir(data):
if field_name.startswith("_"):
continue
bit_array = getattr(data, field_name)
if hasattr(bit_array, "num_shots"):
shots = bit_array.num_shots
n_bits = bit_array.num_bits
arr = bit_array._array # shape: (shots, 1) or (shots,)
arr = arr.flatten()
for val in arr:
val_int = int(val)
# uniqc convention (see docs/source/guide/
# platform_conventions.md §2.6): c[0] is the RIGHTMOST
# character of the bitstring (LSB). Qiskit's BitArray
# packs the lowest-indexed classical bit (c[0]) into
# bit 0 of the integer, so ``format(val, f"0{n}b")``
# already yields the expected ``c[N-1]…c[0]`` layout —
# do NOT reverse.
bitstring = format(val_int, f"0{n_bits}b")
counts[bitstring] = counts.get(bitstring, 0) + 1
break
if shots is None:
counts = {}
results.append(counts)
return {
"status": TASK_STATUS_SUCCESS,
"result": (
results
if (self._batch_job_sizes.get(taskid, 1) > 1 or len(results) > 1)
else (results[0] if results else {})
),
"time": job.creation_date.strftime("%a %d %b %Y, %I:%M%p") if hasattr(job, "creation_date") else "",
"backend_name": job.backend().name if hasattr(job, "backend") else "",
}
[docs]
def query_batch(self, taskids: list[str]) -> dict[str, Any]:
"""Query multiple IBM Quantum jobs and merge results.
Each per-job ``query`` may return either a single counts ``dict``
(when one Sampler job covers exactly one PUB) or a ``list[dict]``
(when one Sampler job covers many PUBs — IBM's native batch
execution). The merged ``result`` must always be a flat
``list[dict]`` so downstream code (``query_sync``, the task
manager normalisers, integration tests like
``run_test_result_shape_batch``) can iterate per-circuit.
"""
taskinfo: dict[str, Any] = {"status": TASK_STATUS_SUCCESS, "result": []}
for taskid in taskids:
result_i = self.query(taskid)
status = result_i.get("status", TASK_STATUS_RUNNING)
if status == TASK_STATUS_FAILED:
taskinfo["status"] = TASK_STATUS_FAILED
break
elif status == TASK_STATUS_RUNNING:
taskinfo["status"] = TASK_STATUS_RUNNING
if taskinfo["status"] == TASK_STATUS_SUCCESS:
payload = result_i.get("result", {})
if isinstance(payload, list):
taskinfo["result"].extend(payload)
elif isinstance(payload, dict):
taskinfo["result"].append(payload)
else:
taskinfo["result"].append(payload)
return taskinfo
# -------------------------------------------------------------------------
# Synchronous wait
# -------------------------------------------------------------------------
[docs]
def query_sync(
self,
taskid: str | list[str],
interval: float = 2.0,
timeout: float = 60.0,
retry: int = 5,
) -> list[dict[str, Any]]:
"""Poll task status until completion or timeout."""
starttime = time.time()
taskids = [taskid] if isinstance(taskid, str) else taskid
while True:
elapsed = time.time() - starttime
if elapsed > timeout:
raise TimeoutError("Reach the maximum timeout.")
taskinfo = self.query_batch(taskids)
if taskinfo["status"] == TASK_STATUS_RUNNING:
time.sleep(interval)
continue
if taskinfo["status"] == TASK_STATUS_SUCCESS:
return taskinfo["result"]
if taskinfo["status"] == TASK_STATUS_FAILED:
raise RuntimeError(f"Failed to execute, errorinfo = {taskinfo.get('result')}")
# Retry on transient errors
if retry > 0:
retry -= 1
time.sleep(interval)
else:
raise RuntimeError("Retry count exhausted.")
# -------------------------------------------------------------------------
# Dry-run validation
# -------------------------------------------------------------------------
[docs]
def dry_run(self, originir: str, *, shots: int = 1000, **kwargs: Any) -> DryRunResult:
"""Dry-run validation for IBM Quantum backends.
Validates offline by:
1. Parsing OriginIR -> Qiskit QuantumCircuit.
2. Checking chip_id is in available backends (local config lookup).
3. Checking shots <= max_shots (local config lookup).
4. Checking qubit count against backend limits.
5. Attempting transpilation against the backend's basis_gates
(purely local — catches unsupported gates).
This method makes NO network calls. ``service.backend(chip_id)``
and ``backend.configuration()`` are local config reads.
Note:
Any dry-run success followed by actual submission failure is a
critical bug. Please report it at the UnifiedQuantum issue tracker.
"""
import qiskit
from uniqc.backend_adapter.task.adapters.base import _dry_run_failed, _dry_run_success
chip_id: str | None = kwargs.get("chip_id")
backend_name = chip_id or "simulator"
# Step 1: Parse OriginIR -> Qiskit QuantumCircuit
try:
qiskit_circuit = self.translate_circuit(originir)
except Exception as e:
return _dry_run_failed(
str(e),
details=f"Failed to translate OriginIR to Qiskit QuantumCircuit: {e}",
backend_name=backend_name,
)
circuit_qubits = qiskit_circuit.num_qubits
# Step 2: Determine backend configuration (local — no network call)
try:
if chip_id:
backend = self._service.backend(chip_id)
backend_config = backend.configuration()
max_shots = backend_config.max_shots
basis_gates = backend_config.basis_gates
num_qubits = backend_config.num_qubits
else:
# Fall back to generic simulator basis gates if no chip_id given
max_shots = 100000
basis_gates = [
"cx",
"u1",
"u2",
"u3",
"id",
"x",
"y",
"z",
"h",
"s",
"sdg",
"t",
"tdg",
"reset",
]
num_qubits = 127
except Exception as e:
return _dry_run_failed(
str(e),
details=f"Failed to access backend configuration for '{chip_id}': {e}",
backend_name=backend_name,
)
# Step 3: Shots limit check
if shots > max_shots:
return _dry_run_failed(
f"shots ({shots}) exceeds backend maximum ({max_shots})",
details=f"Shot count validation failed: {shots} > {max_shots}",
backend_name=backend_name,
)
# Step 4: Qubit count check
if circuit_qubits > num_qubits:
return _dry_run_failed(
f"circuit requires {circuit_qubits} qubits but backend '{chip_id}' has {num_qubits}",
details=f"Qubit count validation failed: circuit={circuit_qubits}, backend={num_qubits}",
backend_name=backend_name,
)
# Step 5: Transpile against basis_gates (fully offline)
# This catches unsupported gates without needing a real backend.
try:
from qiskit.transpiler import CouplingMap
coupling_map = CouplingMap.from_heavy_hex(num_qubits) if chip_id else None
qiskit.compiler.transpile(
qiskit_circuit,
basis_gates=basis_gates,
coupling_map=coupling_map,
optimization_level=0,
)
transpile_warnings: tuple[str, ...] = ()
except Exception as e:
return _dry_run_failed(
f"transpilation failed: {e}",
details=(
f"Circuit uses gates not supported by '{chip_id or 'simulator'}' "
f"basis gates. Basis gates: {basis_gates}. "
f"Transpilation error: {e}"
),
backend_name=backend_name,
)
return _dry_run_success(
(
f"Dry-run passed for '{chip_id or 'simulator'}': "
f"circuit translates cleanly and transpiles to basis gates. "
f"Qubits={circuit_qubits}, shots={shots}"
),
backend_name=backend_name,
circuit_qubits=circuit_qubits,
warnings=transpile_warnings,
)