Source code for uniqc.backend_adapter.task.adapters.base

"""Base adapter interface for quantum cloud backends.

Every backend adapter must implement this interface, providing:
1. Translation from OriginIR string to the provider's native circuit type.
2. Task submission via the provider's Python SDK (not raw REST).
3. Task status query and result retrieval.
4. Dry-run validation without making any network calls.

The adapter layer replaces all direct ``requests`` REST calls within the
task modules.  Each adapter is a stateful object that holds the provider
session / client and configuration.

Dry-run Validation
------------------
Every adapter implements ``dry_run(originir, shots, **kwargs)`` to validate
a circuit offline before submission. This checks:
  1. OriginIR parses without error.
  2. All gates are supported by the target backend.
  3. Qubit count fits within the backend's limits.
  4. The native circuit object is structurally valid.

No cloud API calls are made. A dry-run success followed by actual submission
failure is a critical bug — please report it at the issue tracker.
"""

from __future__ import annotations

__all__ = [
    "QuantumAdapter",
    "DryRunResult",
    "TASK_STATUS_FAILED",
    "TASK_STATUS_SUCCESS",
    "TASK_STATUS_RUNNING",
    "_dry_run_success",
    "_dry_run_failed",
]

import abc
from typing import TYPE_CHECKING, Any

from uniqc.backend_adapter.task.result_types import DryRunResult

if TYPE_CHECKING:
    pass


TASK_STATUS_FAILED = "failed"
TASK_STATUS_SUCCESS = "success"
TASK_STATUS_RUNNING = "running"


def _dry_run_failed(
    error_message: str,
    *,
    details: str,
    backend_name: str | None = None,
    warnings: tuple[str, ...] = (),
) -> DryRunResult:
    """Build a failure DryRunResult from a caught exception."""
    return DryRunResult(
        success=False,
        details=details,
        error=error_message,
        warnings=warnings,
        backend_name=backend_name,
    )


def _dry_run_success(
    details: str,
    *,
    backend_name: str | None = None,
    circuit_qubits: int | None = None,
    supported_gates: tuple[str, ...] = (),
    warnings: tuple[str, ...] = (),
) -> DryRunResult:
    """Build a success DryRunResult."""
    return DryRunResult(
        success=True,
        details=details,
        backend_name=backend_name,
        circuit_qubits=circuit_qubits,
        supported_gates=supported_gates,
        warnings=warnings,
    )


[docs] class QuantumAdapter(abc.ABC): """Abstract base class for quantum cloud backend adapters. Subclass this for each backend (originq_cloud, quafu, ibm, ...). Each adapter is instantiated once per task module and reused. Class attributes: name: Adapter identifier. max_native_batch_size: Maximum number of circuits the adapter can pack into a single platform-side job. uniqc relies on this to auto-shard oversized batches; see :func:`uniqc.backend_adapter.task_manager.submit_batch`. The base default is ``1`` (no native batching, one platform job per circuit). Adapters that natively support grouped submission override this — see :class:`OriginQAdapter` (``200``, configurable via ``originq.task_group_size``) and :class:`QiskitAdapter` (``100``). """ name: str = "base" max_native_batch_size: int = 1 # ------------------------------------------------------------------------- # Circuit translation # -------------------------------------------------------------------------
[docs] @abc.abstractmethod def translate_circuit(self, originir: str) -> Any: """Translate an OriginIR circuit string to the provider's native circuit type. Args: originir: Circuit in OriginIR format. Returns: Provider-specific circuit object. """ ...
# ------------------------------------------------------------------------- # Task submission # -------------------------------------------------------------------------
[docs] @abc.abstractmethod def submit(self, circuit: Any, *, shots: int = 1000, **kwargs: Any) -> str: """Submit a circuit to the backend and return a task ID. Args: circuit: Provider-native circuit object (result of ``translate_circuit``). shots: Number of measurement shots. **kwargs: Additional provider-specific parameters (e.g. chip_id, auto_mapping, circuit_optimize). Returns: str: Task ID assigned by the backend. """ ...
[docs] @abc.abstractmethod def submit_batch(self, circuits: list[Any], *, shots: int = 1000, **kwargs: Any) -> list[str]: """Submit multiple circuits as a single batch. Args: circuits: List of provider-native circuit objects. shots: Number of measurement shots. **kwargs: Additional provider-specific parameters. Returns: list[str]: Task IDs (one per circuit), or a single task ID if the backend returns a group ID. """ ...
# ------------------------------------------------------------------------- # Task query # -------------------------------------------------------------------------
[docs] @abc.abstractmethod def query(self, taskid: str) -> dict[str, Any]: """Query a single task's status and result. Args: taskid: Task identifier. Returns: dict with keys: - ``status``: ``'success'`` | ``'failed'`` | ``'running'`` - ``result``: execution result (present when status is ``'success'`` or ``'failed'``) """ ...
[docs] @abc.abstractmethod def query_batch(self, taskids: list[str]) -> dict[str, Any]: """Query multiple tasks' status and merge results. Overall status is the worst case: ``'failed'`` > ``'running'`` > ``'success'``. Args: taskids: List of task identifiers. Returns: dict with keys: ``status``, ``result`` (list of results). """ ...
# ------------------------------------------------------------------------- # Utils # -------------------------------------------------------------------------
[docs] def is_available(self) -> bool: """Return True if the required packages / credentials are configured. Defaults to ``False`` so that subclasses must explicitly opt-in, avoiding the risk of an unconfigured adapter incorrectly reporting availability. """ return False
[docs] def list_backends(self) -> list[dict[str, Any]]: """Return raw backend metadata from the platform API. Returns a list of dicts with at least a ``"name"`` key. The dict shape is platform-specific; the caller is responsible for normalising the data into ``BackendInfo`` objects. Raises: Exception: If the platform API call fails (auth error, network error, etc.). Subclasses should propagate the original exception type so callers can distinguish auth failures from network failures. """ raise NotImplementedError(f"{self.__class__.__name__} does not implement list_backends")
# ------------------------------------------------------------------------- # Dry-run validation # -------------------------------------------------------------------------
[docs] def dry_run(self, originir: str, *, shots: int = 1000, **kwargs: Any) -> DryRunResult: """Validate a circuit without making network calls. Subclasses must implement this method. The default implementation raises NotImplementedError. Validation checklist (where determinable offline): 1. OriginIR parses without error. 2. All gates are supported by the target backend. 3. Qubit count fits within the backend's limits. 4. The native circuit object is structurally valid. This method must NOT make any cloud API calls. All checks must be local-only. Args: originir: Circuit in OriginIR format. shots: Number of measurement shots (for shots-limit validation). **kwargs: Adapter-specific options (e.g. chip_id for IBM/Quafu). Returns: DryRunResult with success=True/False, details, warnings, and metadata. Raises: NotImplementedError: Subclasses must implement this method. Note: Any dry-run success followed by actual submission failure is a critical bug. Please report it at the UnifiedQuantum issue tracker. """ raise NotImplementedError( f"{self.__class__.__name__} does not implement dry_run(). " "Subclasses of QuantumAdapter must implement dry_run()." )