Source code for uniqc.backend_adapter.task.adapters.qiskit_adapter

"""Qiskit backend adapter.

Translates OriginIR circuits to Qiskit QuantumCircuit objects and submits
via the ``qiskit`` / ``qiskit_ibm_runtime`` packages.  No raw REST calls.

``qiskit`` is a core dependency of ``unified-quantum`` — no extra install
step is required.
"""

from __future__ import annotations

__all__ = ["QiskitAdapter"]

import time
from typing import TYPE_CHECKING, Any

from uniqc.backend_adapter.task.adapters.base import (
    TASK_STATUS_FAILED,
    TASK_STATUS_RUNNING,
    TASK_STATUS_SUCCESS,
    DryRunResult,
    QuantumAdapter,
)
from uniqc.backend_adapter.task.optional_deps import MissingDependencyError, check_qiskit
from uniqc.config import load_ibm_config

if TYPE_CHECKING:
    import qiskit

_QISKIT_TERMINAL_FAILED = {"ERROR", "CANCELLED", "FAILED"}


def _normalize_qiskit_status(status_name: str) -> str:
    """Map a Qiskit Runtime JobStatus name to a TASK_STATUS_* constant."""
    if status_name in ("DONE", "COMPLETED"):
        return TASK_STATUS_SUCCESS
    if status_name in _QISKIT_TERMINAL_FAILED:
        return TASK_STATUS_FAILED
    return TASK_STATUS_RUNNING


[docs] class QiskitAdapter(QuantumAdapter): """Adapter for IBM Quantum backends via Qiskit. Proxy Configuration: Proxies can be passed via the `proxy` parameter: - Dict with 'http' and/or 'https' keys - Or a single proxy URL string for both protocols If no proxy is configured: - Automatically detects and uses system proxy settings - If no system proxy, uses direct connection If proxy is configured: - Uses configured proxy in addition to any system proxy Raises: MissingDependencyError: If qiskit or qiskit_ibm_runtime is not installed. Example: >>> adapter = QiskitAdapter(proxy={ ... "http": "http://proxy.example.com:8080", ... "https": "https://proxy.example.com:8080" ... }) """ name = "ibm" # Qiskit Runtime ``Sampler.run([circuits])`` accepts a list of PUBs # natively; one Sampler job covers many circuits and one queue slot. # ``100`` is a conservative default; raise via subclass override or # config if your access tier permits more. max_native_batch_size: int = 100 def __init__(self, proxy: dict[str, str] | str | None = None) -> None: """Initialize the Qiskit adapter. Args: proxy: Optional proxy configuration. - Dict with 'http' and/or 'https' keys - Or a single proxy URL string - If None, uses system proxy if available, otherwise direct Raises: MissingDependencyError: If qiskit is not installed. """ if not check_qiskit(): raise MissingDependencyError("qiskit", "qiskit") config = load_ibm_config() self._api_token: str = config["api_token"] self._proxy: dict[str, str] | str | None = proxy from qiskit_ibm_runtime import QiskitRuntimeService # Setup proxy: merge configured proxy with system proxy self._setup_proxy(proxy) self._service = QiskitRuntimeService( channel="ibm_quantum_platform", token=self._api_token, ) # Track batch jobs so query() can decide between scalar/list result. self._batch_job_sizes: dict[str, int] = {} def _setup_proxy(self, proxy: dict[str, str] | str | None) -> None: """Configure proxy settings for Qiskit/IBM provider. Proxy behavior: - If proxy is configured: use it as the proxy (replacing system proxy) - If no proxy configured but system has proxy: use system proxy - If no proxy configured and no system proxy: direct connection Args: proxy: Proxy configuration dict, URL string, or None. """ import os from uniqc.backend_adapter.network_utils import detect_system_proxy # Determine which proxies to use if proxy is not None: # Configured proxy takes priority configured_proxies: dict[str, str] = {} if isinstance(proxy, dict): if proxy.get("http"): configured_proxies["http"] = proxy["http"] if proxy.get("https"): configured_proxies["https"] = proxy["https"] else: configured_proxies["http"] = proxy configured_proxies["https"] = proxy proxies_to_use = configured_proxies else: # No configured proxy, use system proxy system_proxies = detect_system_proxy() proxies_to_use = {k: v for k, v in system_proxies.items() if v} # Set environment variables if we have any proxies for proto in ("http", "https"): if proto in proxies_to_use: proxy_url = proxies_to_use[proto] os.environ[f"{proto.upper()}_PROXY"] = proxy_url os.environ[f"{proto}_proxy"] = proxy_url
[docs] def is_available(self) -> bool: """Check if the Qiskit adapter is available (IBM service initialized).""" return check_qiskit() and hasattr(self, "_service") and self._service is not None
# ------------------------------------------------------------------------- # Backend discovery # -------------------------------------------------------------------------
[docs] def list_backends(self) -> list[dict[str, Any]]: """List IBM backends through QiskitRuntimeService. The returned entries include both average metrics and per-qubit/per-edge calibration details from ``backend.target`` so the Gateway can color chip topologies without flattening every edge to the global average. """ # Lazy import to avoid a circular import at module load time — # ``ibm_adapter`` itself imports ``QiskitAdapter`` inside its # constructor for backwards-compatible delegation. from uniqc.backend_adapter.task.adapters.ibm_adapter import ( _backend_configuration, _backend_coupling_map, _backend_is_simulator, _backend_name, _chip_characterization_from_backend, _compute_ibm_fidelity, ) raw_backends: list[dict[str, Any]] = [] for b in self._service.backends(): name = _backend_name(b) cfg = _backend_configuration(b) try: status = "available" if b.status().operational else "unavailable" except Exception: status = "unknown" try: pt = b.processor_type processor_type = pt.get("family", "") if isinstance(pt, dict) else str(pt) if pt else "" except Exception: processor_type = "" coupling_map = _backend_coupling_map(b, cfg) entry: dict[str, Any] = { "name": name, "simulator": _backend_is_simulator(b), "configuration": { "num_qubits": getattr(b, "num_qubits", 0), "coupling_map": [list(edge) for edge in coupling_map], "basis_gates": list(getattr(b, "basis_gates", []) or getattr(cfg, "basis_gates", []) or []), "max_shots": getattr(b, "max_shots", None) or getattr(cfg, "max_shots", None), "memory": getattr(b, "memory", False) or getattr(cfg, "memory", False), "qobd": getattr(b, "qobd", False) or getattr(cfg, "qobd", False), "supported_instructions": list(getattr(b, "supported_instructions", []) or []) if hasattr(b, "supported_instructions") else [], "processor_type": processor_type, }, "status": status, "description": getattr(b, "description", ""), } try: od = b.online_date if od: entry["online_date"] = str(od) except Exception: pass fidelity = _compute_ibm_fidelity(b) entry.update(fidelity) chip = _chip_characterization_from_backend(b, backend_name=name) if chip is not None: entry["per_qubit_calibration"] = [item.to_dict() for item in chip.single_qubit_data] entry["per_pair_calibration"] = [item.to_dict() for item in chip.two_qubit_data] entry["global_info"] = chip.global_info.to_dict() entry["calibrated_at"] = chip.calibrated_at raw_backends.append(entry) return raw_backends
[docs] def get_chip_characterization(self, backend_name: str): """Return per-qubit and per-pair calibration data for an IBM backend. Parameters ---------- backend_name: IBM backend name, e.g. ``"ibm_brisbane"``. Returns ------- ChipCharacterization or None """ from uniqc.backend_adapter.task.adapters.ibm_adapter import ( _chip_characterization_from_backend, ) try: backend = self._service.backend(backend_name) except Exception: return None return _chip_characterization_from_backend(backend, backend_name=backend_name)
# ------------------------------------------------------------------------- # Circuit translation # -------------------------------------------------------------------------
[docs] def translate_circuit(self, originir: str) -> qiskit.QuantumCircuit: """Translate an OriginIR string to a Qiskit QuantumCircuit. The conversion path is OriginIR → QASM string → Qiskit QuantumCircuit. """ import qiskit from uniqc.compile.originir import OriginIR_BaseParser parser = OriginIR_BaseParser() parser.parse(originir) qasm_str = parser.to_qasm() return qiskit.QuantumCircuit.from_qasm_str(qasm_str)
# ------------------------------------------------------------------------- # Task submission # -------------------------------------------------------------------------
[docs] def submit(self, circuit: qiskit.QuantumCircuit, *, shots: int = 1000, **kwargs: Any) -> str: """Submit a single circuit to IBM Quantum.""" chip_id: str | None = kwargs.get("chip_id") auto_mapping: Any = kwargs.get("auto_mapping", False) circuit_optimize: bool = kwargs.get("circuit_optimize", True) task_name: str | None = kwargs.get("task_name") return self._submit_impl( circuits=[circuit], chip_id=chip_id, shots=shots, auto_mapping=auto_mapping, circuit_optimize=circuit_optimize, task_name=task_name, )
[docs] def submit_batch( self, circuits: list[qiskit.QuantumCircuit], *, shots: int = 1000, native_batch: bool = True, **kwargs: Any ) -> list[str]: """Submit multiple circuits as a batch. IBM Quantum (qiskit-runtime ``Sampler``) natively supports running a list of circuits inside a single job, which spends only one position in the queue. With ``native_batch=True`` (default), this returns a single-element list containing that one job ID; ``query()`` / ``wait_for_result()`` will then surface the per-circuit count distributions as a list. With ``native_batch=False``, each circuit is submitted as its own job (one ID per circuit) — useful when downstream code wants individually addressable task IDs. Returns: list[str]: ``[batch_job_id]`` for native batch, or one job ID per circuit otherwise. """ chip_id: str | None = kwargs.get("chip_id") auto_mapping: Any = kwargs.get("auto_mapping", False) circuit_optimize: bool = kwargs.get("circuit_optimize", True) task_name: str | None = kwargs.get("task_name") if not native_batch: return [ self._submit_impl( circuits=[c], chip_id=chip_id, shots=shots, auto_mapping=auto_mapping, circuit_optimize=circuit_optimize, task_name=task_name, ) for c in circuits ] job_id = self._submit_impl( circuits=circuits, chip_id=chip_id, shots=shots, auto_mapping=auto_mapping, circuit_optimize=circuit_optimize, task_name=task_name, ) if len(circuits) > 1: self._batch_job_sizes[job_id] = len(circuits) return [job_id]
def _submit_impl( self, circuits: list[qiskit.QuantumCircuit], *, chip_id: str | None, shots: int, auto_mapping: Any, circuit_optimize: bool, task_name: str | None, ) -> str: """Internal implementation shared by submit() and submit_batch().""" import qiskit backends_name = [b.name for b in self._service.backends()] if chip_id not in backends_name: raise ValueError(f"no such chip, should be one of {backends_name}") backend = self._service.backend(chip_id) max_shots = backend.configuration().max_shots if shots > max_shots: raise ValueError(f"maximum shots number exceeded, should less than {max_shots}") if circuit_optimize: circuits = qiskit.compiler.transpile(circuits, backend=backend, optimization_level=3) if auto_mapping is True: circuits = qiskit.compiler.transpile( circuits, backend=backend, layout_method="sabre", optimization_level=1, ) elif isinstance(auto_mapping, list): circuits = qiskit.compiler.transpile( circuits, backend=backend, initial_layout=auto_mapping, optimization_level=1, ) else: circuits = qiskit.compiler.transpile(circuits, backend=backend, optimization_level=1) from qiskit_ibm_runtime import Sampler sampler = Sampler(mode=backend) job = sampler.run(circuits, shots=shots) return job.job_id() # ------------------------------------------------------------------------- # Task query # -------------------------------------------------------------------------
[docs] def query(self, taskid: str) -> dict[str, Any]: """Query a single IBM Quantum job's status.""" job = self._service.job(taskid) status = job.status() status_name = status.name if hasattr(status, "name") else str(status) if status_name not in ("DONE", "COMPLETED"): return { "status": _normalize_qiskit_status(status_name), "value": status.value if hasattr(status, "value") else status_name, } raw_result = job.result() results = [] # Qiskit Runtime Sampler returns PrimitiveResult — iterate over pub results for pub_result in raw_result: data = pub_result.data # Each data field is a BitArray (one per measurement) counts: dict[str, int] = {} shots = None n_bits = None for field_name in dir(data): if field_name.startswith("_"): continue bit_array = getattr(data, field_name) if hasattr(bit_array, "num_shots"): shots = bit_array.num_shots n_bits = bit_array.num_bits arr = bit_array._array # shape: (shots, 1) or (shots,) arr = arr.flatten() for val in arr: val_int = int(val) # uniqc convention (see docs/source/guide/ # platform_conventions.md §2.6): c[0] is the RIGHTMOST # character of the bitstring (LSB). Qiskit's BitArray # packs the lowest-indexed classical bit (c[0]) into # bit 0 of the integer, so ``format(val, f"0{n}b")`` # already yields the expected ``c[N-1]…c[0]`` layout — # do NOT reverse. bitstring = format(val_int, f"0{n_bits}b") counts[bitstring] = counts.get(bitstring, 0) + 1 break if shots is None: counts = {} results.append(counts) return { "status": TASK_STATUS_SUCCESS, "result": ( results if (self._batch_job_sizes.get(taskid, 1) > 1 or len(results) > 1) else (results[0] if results else {}) ), "time": job.creation_date.strftime("%a %d %b %Y, %I:%M%p") if hasattr(job, "creation_date") else "", "backend_name": job.backend().name if hasattr(job, "backend") else "", }
[docs] def query_batch(self, taskids: list[str]) -> dict[str, Any]: """Query multiple IBM Quantum jobs and merge results. Each per-job ``query`` may return either a single counts ``dict`` (when one Sampler job covers exactly one PUB) or a ``list[dict]`` (when one Sampler job covers many PUBs — IBM's native batch execution). The merged ``result`` must always be a flat ``list[dict]`` so downstream code (``query_sync``, the task manager normalisers, integration tests like ``run_test_result_shape_batch``) can iterate per-circuit. """ taskinfo: dict[str, Any] = {"status": TASK_STATUS_SUCCESS, "result": []} for taskid in taskids: result_i = self.query(taskid) status = result_i.get("status", TASK_STATUS_RUNNING) if status == TASK_STATUS_FAILED: taskinfo["status"] = TASK_STATUS_FAILED break elif status == TASK_STATUS_RUNNING: taskinfo["status"] = TASK_STATUS_RUNNING if taskinfo["status"] == TASK_STATUS_SUCCESS: payload = result_i.get("result", {}) if isinstance(payload, list): taskinfo["result"].extend(payload) elif isinstance(payload, dict): taskinfo["result"].append(payload) else: taskinfo["result"].append(payload) return taskinfo
# ------------------------------------------------------------------------- # Synchronous wait # -------------------------------------------------------------------------
[docs] def query_sync( self, taskid: str | list[str], interval: float = 2.0, timeout: float = 60.0, retry: int = 5, ) -> list[dict[str, Any]]: """Poll task status until completion or timeout.""" starttime = time.time() taskids = [taskid] if isinstance(taskid, str) else taskid while True: elapsed = time.time() - starttime if elapsed > timeout: raise TimeoutError("Reach the maximum timeout.") taskinfo = self.query_batch(taskids) if taskinfo["status"] == TASK_STATUS_RUNNING: time.sleep(interval) continue if taskinfo["status"] == TASK_STATUS_SUCCESS: return taskinfo["result"] if taskinfo["status"] == TASK_STATUS_FAILED: raise RuntimeError(f"Failed to execute, errorinfo = {taskinfo.get('result')}") # Retry on transient errors if retry > 0: retry -= 1 time.sleep(interval) else: raise RuntimeError("Retry count exhausted.")
# ------------------------------------------------------------------------- # Dry-run validation # -------------------------------------------------------------------------
[docs] def dry_run(self, originir: str, *, shots: int = 1000, **kwargs: Any) -> DryRunResult: """Dry-run validation for IBM Quantum backends. Validates offline by: 1. Parsing OriginIR -> Qiskit QuantumCircuit. 2. Checking chip_id is in available backends (local config lookup). 3. Checking shots <= max_shots (local config lookup). 4. Checking qubit count against backend limits. 5. Attempting transpilation against the backend's basis_gates (purely local — catches unsupported gates). This method makes NO network calls. ``service.backend(chip_id)`` and ``backend.configuration()`` are local config reads. Note: Any dry-run success followed by actual submission failure is a critical bug. Please report it at the UnifiedQuantum issue tracker. """ import qiskit from uniqc.backend_adapter.task.adapters.base import _dry_run_failed, _dry_run_success chip_id: str | None = kwargs.get("chip_id") backend_name = chip_id or "simulator" # Step 1: Parse OriginIR -> Qiskit QuantumCircuit try: qiskit_circuit = self.translate_circuit(originir) except Exception as e: return _dry_run_failed( str(e), details=f"Failed to translate OriginIR to Qiskit QuantumCircuit: {e}", backend_name=backend_name, ) circuit_qubits = qiskit_circuit.num_qubits # Step 2: Determine backend configuration (local — no network call) try: if chip_id: backend = self._service.backend(chip_id) backend_config = backend.configuration() max_shots = backend_config.max_shots basis_gates = backend_config.basis_gates num_qubits = backend_config.num_qubits else: # Fall back to generic simulator basis gates if no chip_id given max_shots = 100000 basis_gates = [ "cx", "u1", "u2", "u3", "id", "x", "y", "z", "h", "s", "sdg", "t", "tdg", "reset", ] num_qubits = 127 except Exception as e: return _dry_run_failed( str(e), details=f"Failed to access backend configuration for '{chip_id}': {e}", backend_name=backend_name, ) # Step 3: Shots limit check if shots > max_shots: return _dry_run_failed( f"shots ({shots}) exceeds backend maximum ({max_shots})", details=f"Shot count validation failed: {shots} > {max_shots}", backend_name=backend_name, ) # Step 4: Qubit count check if circuit_qubits > num_qubits: return _dry_run_failed( f"circuit requires {circuit_qubits} qubits but backend '{chip_id}' has {num_qubits}", details=f"Qubit count validation failed: circuit={circuit_qubits}, backend={num_qubits}", backend_name=backend_name, ) # Step 5: Transpile against basis_gates (fully offline) # This catches unsupported gates without needing a real backend. try: from qiskit.transpiler import CouplingMap coupling_map = CouplingMap.from_heavy_hex(num_qubits) if chip_id else None qiskit.compiler.transpile( qiskit_circuit, basis_gates=basis_gates, coupling_map=coupling_map, optimization_level=0, ) transpile_warnings: tuple[str, ...] = () except Exception as e: return _dry_run_failed( f"transpilation failed: {e}", details=( f"Circuit uses gates not supported by '{chip_id or 'simulator'}' " f"basis gates. Basis gates: {basis_gates}. " f"Transpilation error: {e}" ), backend_name=backend_name, ) return _dry_run_success( ( f"Dry-run passed for '{chip_id or 'simulator'}': " f"circuit translates cleanly and transpiles to basis gates. " f"Qubits={circuit_qubits}, shots={shots}" ), backend_name=backend_name, circuit_qubits=circuit_qubits, warnings=transpile_warnings, )