"""Unified backend management for quantum computing platforms.
This module provides a centralized Backend management system with:
- Abstract base class QuantumBackend defining a unified interface
- Factory pattern for backend instance creation/retrieval
- Caching mechanism for backend instances
- Integration with existing adapters (OriginQ, Quafu, IBM)
Usage::
# Get or create a backend instance
backend = get_backend('originq')
# List all available backends
available = list_backends()
# Submit a circuit
task_id = backend.submit(circuit, shots=1000)
# Query task status
result = backend.query(task_id)
"""
from __future__ import annotations
__all__ = [
"QuantumBackend",
"OriginQBackend",
"QuafuBackend",
"IBMBackend",
"get_backend",
"list_backends",
"BACKENDS",
]
import abc
import json
import os
from pathlib import Path
from typing import TYPE_CHECKING, Any, ClassVar, Type
if TYPE_CHECKING:
from uniqc.circuit_builder.qcircuit import Circuit
from uniqc.task.adapters import (
OriginQAdapter,
QuafuAdapter,
QiskitAdapter,
QuantumAdapter,
TASK_STATUS_FAILED,
TASK_STATUS_RUNNING,
TASK_STATUS_SUCCESS,
)
# -----------------------------------------------------------------------------
# Cache Configuration
# -----------------------------------------------------------------------------
DEFAULT_CACHE_DIR = Path.home() / ".uniqc" / "cache"
CACHE_FILE_SUFFIX = "_backend.json"
def _get_cache_file_path(platform: str, cache_dir: Path | None = None) -> Path:
"""Get the cache file path for a specific platform.
Args:
platform: The backend platform identifier.
cache_dir: Optional custom cache directory. Uses default if None.
Returns:
Path to the cache file.
"""
cache_path = cache_dir or DEFAULT_CACHE_DIR
cache_path.mkdir(parents=True, exist_ok=True)
return cache_path / f"{platform}{CACHE_FILE_SUFFIX}"
# -----------------------------------------------------------------------------
# Abstract Base Backend
# -----------------------------------------------------------------------------
[docs]
class QuantumBackend(abc.ABC):
"""Abstract base class for quantum backend management.
This class provides a unified interface for all quantum computing backends,
wrapping the underlying adapters and providing caching capabilities.
Attributes:
name: The name of this backend instance.
platform: The platform identifier (e.g., 'originq', 'quafu', 'ibm').
adapter: The underlying quantum adapter instance.
config: Backend-specific configuration dictionary.
"""
# Class-level registry of backend instances
_instances: ClassVar[dict[str, "QuantumBackend"]] = {}
# Platform identifier (must be set by subclasses)
platform: ClassVar[str] = ""
# Adapter class (must be set by subclasses)
_adapter_class: ClassVar[Type[QuantumAdapter]] = QuantumAdapter
def __init__(
self,
name: str | None = None,
config: dict[str, Any] | None = None,
cache_dir: Path | str | None = None,
) -> None:
"""Initialize the backend.
Args:
name: Optional name for this backend instance. Uses platform name if None.
config: Optional configuration dictionary.
cache_dir: Optional custom cache directory path.
"""
self.name = name or self.platform
self.config = config or {}
self._cache_dir = Path(cache_dir) if cache_dir else DEFAULT_CACHE_DIR
self._adapter: QuantumAdapter | None = None
@property
def adapter(self) -> QuantumAdapter:
"""Get or create the underlying adapter instance.
Returns:
The quantum adapter for this backend.
Raises:
RuntimeError: If the adapter cannot be initialized.
"""
if self._adapter is None:
self._adapter = self._create_adapter()
return self._adapter
@abc.abstractmethod
def _create_adapter(self) -> QuantumAdapter:
"""Create and return the platform-specific adapter.
Returns:
A new adapter instance for this backend.
"""
...
# -------------------------------------------------------------------------
# Circuit Adapter
# -------------------------------------------------------------------------
[docs]
def get_circuit_adapter(self) -> QuantumAdapter:
"""Get the circuit adapter for translating circuits.
Returns:
The quantum adapter that handles circuit translation.
"""
return self.adapter
[docs]
def translate_circuit(self, originir: str) -> Any:
"""Translate an OriginIR circuit to the platform's native format.
Args:
originir: Circuit in OriginIR format.
Returns:
Provider-specific circuit object.
"""
return self.adapter.translate_circuit(originir)
# -------------------------------------------------------------------------
# Task Submission
# -------------------------------------------------------------------------
[docs]
def submit(self, circuit: Any, *, shots: int = 1000, **kwargs: Any) -> str:
"""Submit a circuit to the backend.
Args:
circuit: Provider-native circuit object or OriginIR string.
shots: Number of measurement shots.
**kwargs: Additional provider-specific parameters.
Returns:
Task ID assigned by the backend.
"""
return self.adapter.submit(circuit, shots=shots, **kwargs)
[docs]
def submit_batch(
self, circuits: list[Any], *, shots: int = 1000, **kwargs: Any
) -> str | list[str]:
"""Submit multiple circuits as a batch.
Args:
circuits: List of provider-native circuit objects or OriginIR strings.
shots: Number of measurement shots.
**kwargs: Additional provider-specific parameters.
Returns:
Task ID(s) assigned by the backend.
"""
return self.adapter.submit_batch(circuits, shots=shots, **kwargs)
# -------------------------------------------------------------------------
# Task Query
# -------------------------------------------------------------------------
[docs]
def query(self, task_id: str) -> dict[str, Any]:
"""Query a task's status and result.
Args:
task_id: Task identifier.
Returns:
Dict with keys:
- 'status': 'success' | 'failed' | 'running'
- 'result': Execution result (when status is 'success' or 'failed')
"""
return self.adapter.query(task_id)
[docs]
def query_batch(self, task_ids: list[str]) -> dict[str, Any]:
"""Query multiple tasks' status and merge results.
Args:
task_ids: List of task identifiers.
Returns:
Dict with keys: 'status', 'result' (list of results).
"""
return self.adapter.query_batch(task_ids)
# -------------------------------------------------------------------------
# Availability
# -------------------------------------------------------------------------
[docs]
def is_available(self) -> bool:
"""Check if this backend is available.
Returns:
True if the backend is properly configured and ready to use.
"""
try:
return self.adapter.is_available()
except Exception:
return False
# -------------------------------------------------------------------------
# Cache Management
# -------------------------------------------------------------------------
[docs]
def save_to_cache(self) -> None:
"""Save this backend instance configuration to cache."""
cache_file = _get_cache_file_path(self.platform, self._cache_dir)
cache_data = {
"name": self.name,
"platform": self.platform,
"config": self.config,
}
try:
with open(cache_file, "w", encoding="utf-8") as f:
json.dump(cache_data, f, indent=2, ensure_ascii=False)
except (IOError, OSError) as e:
# Cache saving is non-critical, log but don't fail
import warnings
warnings.warn(f"Failed to save backend cache: {e}")
[docs]
@classmethod
def load_from_cache(
cls,
cache_dir: Path | str | None = None,
) -> "QuantumBackend" | None:
"""Load a backend instance from cache.
Args:
cache_dir: Optional custom cache directory path.
Returns:
Loaded backend instance or None if cache doesn't exist or is invalid.
"""
cache_path = Path(cache_dir) if cache_dir else DEFAULT_CACHE_DIR
cache_file = _get_cache_file_path(cls.platform, cache_path)
if not cache_file.exists():
return None
try:
with open(cache_file, "r", encoding="utf-8") as f:
cache_data = json.load(f)
# Verify platform matches
if cache_data.get("platform") != cls.platform:
return None
instance = cls(
name=cache_data.get("name"),
config=cache_data.get("config", {}),
cache_dir=cache_path,
)
return instance
except (IOError, OSError, json.JSONDecodeError, KeyError):
return None
[docs]
def clear_cache(self) -> None:
"""Clear the cache for this backend instance."""
cache_file = _get_cache_file_path(self.platform, self._cache_dir)
if cache_file.exists():
try:
cache_file.unlink()
except (IOError, OSError):
pass
# -------------------------------------------------------------------------
# Class Methods for Instance Management
# -------------------------------------------------------------------------
[docs]
@classmethod
def get_instance(
cls,
name: str | None = None,
config: dict[str, Any] | None = None,
use_cache: bool = True,
cache_dir: Path | str | None = None,
) -> "QuantumBackend":
"""Get or create a backend instance (factory method).
Args:
name: Optional name for the instance.
config: Optional configuration dictionary.
use_cache: Whether to use/load cache. Defaults to True.
cache_dir: Optional custom cache directory.
Returns:
A backend instance.
"""
cache_key = f"{cls.platform}:{name or 'default'}"
# Check in-memory cache first
if cache_key in cls._instances:
return cls._instances[cache_key]
# Try loading from disk cache if enabled
if use_cache and config is None:
cached = cls.load_from_cache(cache_dir)
if cached is not None:
cls._instances[cache_key] = cached
return cached
# Create new instance
instance = cls(name=name, config=config, cache_dir=cache_dir)
# Save to cache if enabled
if use_cache:
instance.save_to_cache()
cls._instances[cache_key] = instance
return instance
[docs]
@classmethod
def list_available(cls) -> bool:
"""Check if this backend type is available.
Returns:
True if the backend can be instantiated and is configured.
"""
try:
instance = cls.get_instance(use_cache=False)
return instance.is_available()
except Exception:
return False
# -----------------------------------------------------------------------------
# Concrete Backend Implementations
# -----------------------------------------------------------------------------
[docs]
class OriginQBackend(QuantumBackend):
"""Backend for OriginQ Cloud (本源量子云).
This backend connects to the OriginQ Cloud service for executing
quantum circuits on OriginQ quantum computers and simulators.
"""
platform = "originq"
_adapter_class = OriginQAdapter
def _create_adapter(self) -> OriginQAdapter:
"""Create an OriginQ adapter.
Returns:
A configured OriginQAdapter instance.
"""
return OriginQAdapter()
[docs]
class QuafuBackend(QuantumBackend):
"""Backend for BAQIS Quafu (ScQ) quantum cloud platform.
This backend connects to the Quafu service for executing quantum
circuits on superconducting quantum computers.
"""
platform = "quafu"
_adapter_class = QuafuAdapter
# Valid chip IDs for Quafu
VALID_CHIP_IDS = frozenset(
{"ScQ-P10", "ScQ-P18", "ScQ-P136", "ScQ-P10C", "Dongling"}
)
def _create_adapter(self) -> QuafuAdapter:
"""Create a Quafu adapter.
Returns:
A configured QuafuAdapter instance.
"""
return QuafuAdapter()
[docs]
def validate_chip_id(self, chip_id: str) -> bool:
"""Validate if the chip ID is valid for Quafu.
Args:
chip_id: The chip identifier to validate.
Returns:
True if the chip ID is valid.
"""
return chip_id in self.VALID_CHIP_IDS
[docs]
class IBMBackend(QuantumBackend):
"""Backend for IBM Quantum via Qiskit.
This backend connects to IBM Quantum services for executing quantum
circuits on IBM quantum computers and simulators.
Proxy Configuration:
Proxies can be configured in multiple ways (in priority order):
1. Explicit config dict passed to constructor
2. Environment variables (HTTP_PROXY, HTTPS_PROXY)
3. uniqc.yml configuration file
Example:
>>> # Using config file
>>> backend = get_backend('ibm')
>>> # Check proxy availability
>>> backend.check_proxy()
True
>>> # Test IBM connectivity
>>> result = backend.test_connectivity()
>>> print(result['success'])
True
"""
platform = "ibm"
_adapter_class = QiskitAdapter
def __init__(
self,
name: str | None = None,
config: dict[str, Any] | None = None,
cache_dir: Path | str | None = None,
) -> None:
"""Initialize the IBM backend.
Args:
name: Optional name for this backend instance.
config: Optional configuration dictionary. Can include:
- token: IBM Quantum API token
- proxy: Dict with 'http' and/or 'https' keys
cache_dir: Optional custom cache directory path.
"""
super().__init__(name=name, config=config, cache_dir=cache_dir)
self._proxy_config: dict[str, str] | None = None
self._api_token: str | None = None
self._load_ibm_config()
def _load_ibm_config(self) -> None:
"""Load IBM configuration including proxy settings.
Loads from (in priority order):
1. Explicit config dict passed to constructor
2. Environment variables (HTTP_PROXY, HTTPS_PROXY)
3. uniqc.yml configuration file
"""
from uniqc.config import get_ibm_config
from uniqc.network_utils import (
detect_system_proxy,
get_ibm_proxy_from_config,
)
# Start with config file settings
try:
file_config = get_ibm_config()
self._api_token = file_config.get("token")
self._proxy_config = get_ibm_proxy_from_config(file_config)
except Exception:
file_config = {}
# Override with explicit config if provided
if self.config:
if "token" in self.config:
self._api_token = self.config["token"]
if "proxy" in self.config:
proxy = self.config["proxy"]
if isinstance(proxy, dict):
self._proxy_config = {
k: v for k, v in proxy.items()
if k in ("http", "https") and v
}
elif isinstance(proxy, str):
self._proxy_config = {"http": proxy, "https": proxy}
# Environment variables take highest priority, but merge with existing config
env_proxies = detect_system_proxy()
if env_proxies.get("http") or env_proxies.get("https"):
if self._proxy_config is None:
self._proxy_config = {}
# Merge: file config first, then override with env vars
if env_proxies.get("http"):
self._proxy_config["http"] = env_proxies["http"]
if env_proxies.get("https"):
self._proxy_config["https"] = env_proxies["https"]
def _create_adapter(self) -> QiskitAdapter:
"""Create an IBM Qiskit adapter.
Returns:
A configured QiskitAdapter instance.
"""
# Pass proxy configuration to adapter if available
if self._proxy_config:
return QiskitAdapter(proxy=self._proxy_config)
return QiskitAdapter()
[docs]
def check_proxy(self) -> bool:
"""Check if the configured proxy is available.
Returns:
True if proxy is configured and reachable, False otherwise.
Note:
If no proxy is configured, returns True (direct connection).
"""
from uniqc.network_utils import check_proxy_connectivity
if not self._proxy_config:
return True # No proxy configured, direct connection
# Check HTTPS proxy first (preferred for IBM Quantum)
https_proxy = self._proxy_config.get("https")
if https_proxy:
return check_proxy_connectivity(https_proxy)
# Fall back to HTTP proxy
http_proxy = self._proxy_config.get("http")
if http_proxy:
return check_proxy_connectivity(http_proxy)
return False
[docs]
def test_connectivity(self) -> dict[str, Any]:
"""Test connectivity to IBM Quantum services.
Returns:
dict with connectivity test results. Keys:
- ``success`` (``bool``)
- ``message`` (``str``)
- ``proxy_used`` (``dict | None``)
- ``response_time_ms`` (``float | None``)
"""
from uniqc.network_utils import test_ibm_connectivity
return test_ibm_connectivity(
token=self._api_token,
proxy=self._proxy_config,
)
[docs]
def get_proxy_config(self) -> dict[str, str] | None:
"""Get the current proxy configuration.
Returns:
Dict with 'http' and/or 'https' proxy URLs, or None if not configured.
"""
return self._proxy_config.copy() if self._proxy_config else None
# -----------------------------------------------------------------------------
# Backend Registry
# -----------------------------------------------------------------------------
BACKENDS: dict[str, Type[QuantumBackend]] = {
"originq": OriginQBackend,
"quafu": QuafuBackend,
"ibm": IBMBackend,
}
# -----------------------------------------------------------------------------
# Public API Functions
# -----------------------------------------------------------------------------
[docs]
def get_backend(
name: str,
*,
config: dict[str, Any] | None = None,
use_cache: bool = True,
cache_dir: Path | str | None = None,
) -> QuantumBackend:
"""Get or create a backend instance by name.
This is the main factory function for obtaining backend instances.
It uses the BACKENDS registry to look up the appropriate backend class
and returns a configured instance.
Args:
name: The platform name ('originq', 'quafu', or 'ibm').
config: Optional configuration dictionary for the backend.
use_cache: Whether to use cache. Defaults to True.
cache_dir: Optional custom cache directory path.
Returns:
A configured QuantumBackend instance.
Raises:
ValueError: If the backend name is not recognized.
RuntimeError: If the backend cannot be initialized.
Example:
>>> backend = get_backend('originq')
>>> task_id = backend.submit(circuit, shots=1000)
"""
if name not in BACKENDS:
available = ", ".join(BACKENDS.keys())
raise ValueError(
f"Unknown backend '{name}'. Available backends: {available}"
)
backend_class = BACKENDS[name]
return backend_class.get_instance(
name=None,
config=config,
use_cache=use_cache,
cache_dir=cache_dir,
)
[docs]
def list_backends() -> dict[str, dict[str, Any]]:
"""List all available backends and their status.
Returns:
A dictionary mapping backend names to their information, e.g.::
{
'originq': {'available': True, 'platform': 'originq'},
'quafu': {'available': False, 'platform': 'quafu'},
...
}
Example:
>>> backends = list_backends()
>>> for name, info in backends.items():
... print(f"{name}: {'available' if info['available'] else 'unavailable'}")
"""
result = {}
for name, backend_class in BACKENDS.items():
result[name] = {
"platform": name,
"available": backend_class.list_available(),
"class": backend_class.__name__,
}
return result
[docs]
def register_backend(
name: str,
backend_class: Type[QuantumBackend],
allow_override: bool = False,
) -> None:
"""Register a custom backend class.
Args:
name: The platform name to register.
backend_class: The backend class to register.
allow_override: Whether to allow overriding existing registrations.
Raises:
ValueError: If the name is already registered and override is False.
Example:
>>> class MyBackend(QuantumBackend):
... platform = "my_platform"
... def _create_adapter(self):
... return MyAdapter()
...
>>> register_backend("my_platform", MyBackend)
"""
if name in BACKENDS and not allow_override:
raise ValueError(
f"Backend '{name}' is already registered. "
"Use allow_override=True to replace it."
)
# Validate the backend class
if not issubclass(backend_class, QuantumBackend):
raise TypeError(
f"Backend class must be a subclass of QuantumBackend, "
f"got {type(backend_class)}"
)
if not backend_class.platform:
raise ValueError(
f"Backend class {backend_class.__name__} must define a platform attribute"
)
BACKENDS[name] = backend_class
[docs]
def unregister_backend(name: str) -> None:
"""Unregister a backend.
Args:
name: The platform name to unregister.
Raises:
ValueError: If the backend is not registered.
"""
if name not in BACKENDS:
raise ValueError(f"Backend '{name}' is not registered")
del BACKENDS[name]
[docs]
def clear_backend_cache(cache_dir: Path | str | None = None) -> None:
"""Clear all backend caches.
Args:
cache_dir: Optional custom cache directory. Uses default if None.
"""
cache_path = Path(cache_dir) if cache_dir else DEFAULT_CACHE_DIR
if not cache_path.exists():
return
for cache_file in cache_path.glob(f"*{CACHE_FILE_SUFFIX}"):
try:
cache_file.unlink()
except (IOError, OSError):
pass