Source code for uniqc.task.adapters.qiskit_adapter

"""Qiskit backend adapter.

Translates OriginIR circuits to Qiskit QuantumCircuit objects and submits
via the ``qiskit`` / ``qiskit_ibm_provider`` packages.  No raw REST calls.

Installation:
    pip install unified-quantum[qiskit]
"""

from __future__ import annotations

__all__ = ["QiskitAdapter"]

import time
from typing import TYPE_CHECKING, Any

from uniqc.task.adapters.base import (
    TASK_STATUS_FAILED,
    TASK_STATUS_RUNNING,
    TASK_STATUS_SUCCESS,
    QuantumAdapter,
)
from uniqc.task.config import load_ibm_config
from uniqc.task.optional_deps import MissingDependencyError, check_qiskit

if TYPE_CHECKING:
    import qiskit
    import qiskit_ibm_provider  # noqa: F401


[docs] class QiskitAdapter(QuantumAdapter): """Adapter for IBM Quantum backends via Qiskit. Proxy Configuration: Proxies can be passed via the `proxy` parameter: - Dict with 'http' and/or 'https' keys - Or a single proxy URL string for both protocols Raises: MissingDependencyError: If qiskit or qiskit_ibm_provider is not installed. Example: >>> adapter = QiskitAdapter(proxy={ ... "http": "http://proxy.example.com:8080", ... "https": "https://proxy.example.com:8080" ... }) """ name = "ibm" def __init__(self, proxy: dict[str, str] | str | None = None) -> None: """Initialize the Qiskit adapter. Args: proxy: Optional proxy configuration. - Dict with 'http' and/or 'https' keys - Or a single proxy URL string Raises: MissingDependencyError: If qiskit is not installed. """ # Check if qiskit is available if not check_qiskit(): raise MissingDependencyError("qiskit", "qiskit") config = load_ibm_config() self._api_token: str = config["api_token"] self._proxy: dict[str, str] | str | None = proxy import qiskit_ibm_provider # Set up proxy for qiskit-ibm-provider if proxy is configured if proxy: self._setup_proxy(proxy) qiskit_ibm_provider.IBMProvider.save_account(self._api_token) self._provider = qiskit_ibm_provider.IBMProvider(instance="ibm-q/open/main") self._backends = self._provider.backends() def _setup_proxy(self, proxy: dict[str, str] | str) -> None: """Configure proxy settings for Qiskit/IBM provider. Args: proxy: Proxy configuration dict or URL string. """ import os # Convert dict proxy to URL format if needed if isinstance(proxy, dict): https_proxy = proxy.get("https") http_proxy = proxy.get("http") # Prefer HTTPS proxy for IBM Quantum proxy_url = https_proxy or http_proxy else: proxy_url = proxy if proxy_url: # Set environment variables for qiskit and underlying libraries os.environ["HTTP_PROXY"] = proxy_url os.environ["HTTPS_PROXY"] = proxy_url os.environ["http_proxy"] = proxy_url os.environ["https_proxy"] = proxy_url
[docs] def is_available(self) -> bool: """Check if the Qiskit adapter is available (IBM provider initialized). Returns: bool: True if the IBM provider was successfully initialized. """ return check_qiskit() and hasattr(self, '_provider') and self._provider is not None
# ------------------------------------------------------------------------- # Circuit translation # -------------------------------------------------------------------------
[docs] def translate_circuit(self, originir: str) -> "qiskit.QuantumCircuit": """Translate an OriginIR string to a Qiskit QuantumCircuit. The conversion path is OriginIR → QASM string → Qiskit QuantumCircuit. This is the most compatible route given the current API surface of ``uniqc.circuit_builder.qcircuit`` (which exposes QASM export but not a direct Qiskit-native constructor). An optimised direct path can be evaluated in a future iteration if needed. """ import qiskit from uniqc.circuit_builder.qcircuit import Circuit circuit = Circuit() circuit.load_originir(originir) qasm_str = circuit.qasm return qiskit.QuantumCircuit.from_qasm_str(qasm_str)
# ------------------------------------------------------------------------- # Task submission # -------------------------------------------------------------------------
[docs] def submit( self, circuit: "qiskit.QuantumCircuit", *, shots: int = 1000, **kwargs: Any ) -> str: """Submit a single circuit to IBM Quantum.""" chip_id: str | None = kwargs.get("chip_id") auto_mapping: Any = kwargs.get("auto_mapping", False) circuit_optimize: bool = kwargs.get("circuit_optimize", True) task_name: str | None = kwargs.get("task_name") return self._submit_impl( circuits=[circuit], chip_id=chip_id, shots=shots, auto_mapping=auto_mapping, circuit_optimize=circuit_optimize, task_name=task_name, )
[docs] def submit_batch( self, circuits: list["qiskit.QuantumCircuit"], *, shots: int = 1000, **kwargs: Any ) -> str: """Submit multiple circuits as a batch. Returns a single job ID.""" chip_id: str | None = kwargs.get("chip_id") auto_mapping: Any = kwargs.get("auto_mapping", False) circuit_optimize: bool = kwargs.get("circuit_optimize", True) task_name: str | None = kwargs.get("task_name") return self._submit_impl( circuits=circuits, chip_id=chip_id, shots=shots, auto_mapping=auto_mapping, circuit_optimize=circuit_optimize, task_name=task_name, )
def _submit_impl( self, circuits: list["qiskit.QuantumCircuit"], *, chip_id: str | None, shots: int, auto_mapping: Any, circuit_optimize: bool, task_name: str | None, ) -> str: """Internal implementation shared by submit() and submit_batch(). Handles the common logic for submitting circuits to IBM Quantum: 1. Validates the backend exists 2. Checks shot count against backend limits 3. Applies circuit optimization 4. Handles qubit mapping/auto-mapping 5. Executes and returns job ID Args: circuits: List of Qiskit QuantumCircuit objects to submit. chip_id: Backend name (e.g., 'ibmq_qasm_simulator'). shots: Number of measurement shots. auto_mapping: Qubit mapping strategy: - True: Use SABRE layout method for automatic qubit mapping - list: Use as initial_layout for manual qubit mapping - False/None: Default transpilation without special mapping circuit_optimize: Whether to apply optimization level 3. task_name: Unused but kept for API compatibility with other adapters. Returns: str: The IBM Quantum job ID for result retrieval. Raises: ValueError: If chip_id is not in available backends, or if shots exceeds backend max_shots limit. """ import qiskit backends_name = [b.name for b in self._backends] if chip_id not in backends_name: raise ValueError(f"no such chip, should be one of {backends_name}") backend = self._provider.get_backend(chip_id) max_shots = backend.max_shots if shots > max_shots: raise ValueError( f"maximum shots number exceeded, should less than {max_shots}" ) if circuit_optimize: circuits = qiskit.compiler.transpile( circuits, backend=backend, optimization_level=3 ) if auto_mapping is True: circuits = qiskit.compiler.transpile( circuits, backend=backend, layout_method="sabre", optimization_level=1, ) elif isinstance(auto_mapping, list): circuits = qiskit.compiler.transpile( circuits, backend=backend, initial_layout=auto_mapping, optimization_level=1, ) else: circuits = qiskit.compiler.transpile( circuits, backend=backend, optimization_level=1 ) job = qiskit.execute(experiments=circuits, backend=backend, shots=shots) return job.job_id() # ------------------------------------------------------------------------- # Task query # -------------------------------------------------------------------------
[docs] def query(self, taskid: str) -> dict[str, Any]: """Query a single IBM Quantum job's status.""" job = self._provider.retrieve_job(job_id=taskid) status = job.status().name if status not in ("DONE",): return {"status": status, "value": job.status().value} taskinfo = job.result().to_dict() results = [] for single_result in taskinfo["results"]: results.append(single_result["data"]["counts"]) return { "status": TASK_STATUS_SUCCESS, "result": results, "time": taskinfo["date"].strftime("%a %d %b %Y, %I:%M%p"), "backend_name": taskinfo["backend_name"], }
[docs] def query_batch(self, taskids: list[str]) -> dict[str, Any]: """Query multiple IBM Quantum jobs and merge results.""" taskinfo: dict[str, Any] = {"status": TASK_STATUS_SUCCESS, "result": []} for taskid in taskids: result_i = self.query(taskid) if result_i["status"] in ("ERROR", "CANCELLED"): taskinfo["status"] = TASK_STATUS_FAILED break elif result_i["status"] in ( "INITIALIZING", "QUEUED", "VALIDATING", "RUNNING", ): taskinfo["status"] = TASK_STATUS_RUNNING if taskinfo["status"] == TASK_STATUS_SUCCESS: taskinfo["result"].extend(result_i.get("result", [])) return taskinfo
# ------------------------------------------------------------------------- # Synchronous wait # -------------------------------------------------------------------------
[docs] def query_sync( self, taskid: str | list[str], interval: float = 2.0, timeout: float = 60.0, retry: int = 5, ) -> list[dict[str, Any]]: """Poll task status until completion or timeout.""" starttime = time.time() taskids = [taskid] if isinstance(taskid, str) else taskid while True: elapsed = time.time() - starttime if elapsed > timeout: raise TimeoutError("Reach the maximum timeout.") taskinfo = self.query_batch(taskids) if taskinfo["status"] == TASK_STATUS_RUNNING: time.sleep(interval) continue if taskinfo["status"] == TASK_STATUS_SUCCESS: return taskinfo["result"] if taskinfo["status"] == TASK_STATUS_FAILED: raise RuntimeError( f"Failed to execute, errorinfo = {taskinfo.get('result')}" ) # Retry on transient errors if retry > 0: retry -= 1 time.sleep(interval) else: raise RuntimeError("Retry count exhausted.")