"""QuarkStudio / Quafu-SQC backend adapter.
QuarkStudio replaces the deprecated pyquafu client for the BAQIS Quafu-SQC
platform. Its public SDK accepts OpenQASM 2.0 text inside a task dictionary:
{"chip": "Baihua", "name": "...", "circuit": qasm2, "shots": 1024, ...}
This adapter keeps that SDK boundary intact and exposes the same
``QuantumAdapter`` interface used by the rest of UnifiedQuantum.
"""
from __future__ import annotations
__all__ = ["QuarkAdapter"]
import contextlib
import io
import os
import re
from typing import Any
from uniqc.backend_adapter.backend_info import QubitTopology
from uniqc.backend_adapter.task.adapters.base import (
TASK_STATUS_FAILED,
TASK_STATUS_RUNNING,
TASK_STATUS_SUCCESS,
DryRunResult,
QuantumAdapter,
_dry_run_failed,
_dry_run_success,
)
from uniqc.backend_adapter.task.optional_deps import MissingDependencyError, check_quark, check_quarkcircuit
from uniqc.cli.chip_info import ChipGlobalInfo, SingleQubitData, TwoQubitData, TwoQubitGateData
from uniqc.compile.converter import convert_oir_to_qasm
from uniqc.config import load_quark_config
_DEFAULT_CHIP = "Baihua"
_DEFAULT_TASK_NAME = "UniqcQuantumTask"
_TERMINAL_SUCCESS = {"finished", "success", "succeeded", "done", "completed"}
_TERMINAL_FAILED = {"failed", "failure", "error", "cancelled", "canceled"}
_BACKEND_STATUS_MAP = {
"online": "available",
"available": "available",
"offline": "unavailable",
"unavailable": "unavailable",
"maintenance": "maintenance",
"maintaining": "maintenance",
"calibrating": "maintenance",
"calibration": "maintenance",
}
def _normalise_status(value: Any, *, has_counts: bool = False) -> str:
if has_counts:
return TASK_STATUS_SUCCESS
if isinstance(value, dict):
for key in ("status", "taskStatus", "state"):
if key in value:
return _normalise_status(value[key])
if "count" in value or "counts" in value:
return TASK_STATUS_SUCCESS
text = str(value or "").strip().lower()
if text in _TERMINAL_SUCCESS:
return TASK_STATUS_SUCCESS
if text in _TERMINAL_FAILED:
return TASK_STATUS_FAILED
return TASK_STATUS_RUNNING
def _counts_from_result(result: Any) -> dict[str, Any] | None:
if not isinstance(result, dict):
return None
counts = result.get("count", result.get("counts"))
return counts if isinstance(counts, dict) else None
def _task_id(value: Any) -> str:
if isinstance(value, dict):
for key in ("tid", "task_id", "taskId", "id"):
if key in value:
return str(value[key])
return str(value)
def _qasm_qubit_count(qasm: str) -> int | None:
match = re.search(r"\bqreg\s+\w+\[(\d+)\]\s*;", qasm)
return int(match.group(1)) if match else None
def _number(value: Any) -> float | None:
if value is None:
return None
try:
result = float(value)
except (TypeError, ValueError):
return None
return result
def _avg(values: list[float]) -> float | None:
return sum(values) / len(values) if values else None
def _seconds_to_nanoseconds(value: Any) -> float | None:
result = _number(value)
if result is None:
return None
return result * 1_000_000_000 if abs(result) < 1 else result
def _parse_quark_qubit_id(value: Any) -> int | None:
if isinstance(value, int):
return value
text = str(value).strip()
if text.isdigit():
return int(text)
match = re.search(r"\d+", text)
return int(match.group(0)) if match else None
def _quark_gate_basis(chip_info: dict[str, Any]) -> tuple[list[str], str]:
basis = [str(g).strip().lower() for g in chip_info.get("basis_gates") or [] if str(g).strip()]
two_qubit_basis = str((chip_info.get("global_info") or {}).get("two_qubit_gate_basis") or "").strip().lower()
if two_qubit_basis and two_qubit_basis not in basis:
basis.append(two_qubit_basis)
return basis, two_qubit_basis or "cz"
def _extract_quark_backend_details(chip_info: dict[str, Any]) -> dict[str, Any]:
"""Normalize QuarkStudio ``quarkcircuit`` chip metadata."""
global_info = chip_info.get("global_info") if isinstance(chip_info.get("global_info"), dict) else {}
qubits_info = chip_info.get("qubits_info") if isinstance(chip_info.get("qubits_info"), dict) else {}
couplers_info = chip_info.get("couplers_info") if isinstance(chip_info.get("couplers_info"), dict) else {}
basis_gates, two_qubit_basis = _quark_gate_basis(chip_info)
single_qubit_data: list[SingleQubitData] = []
available_qubits: set[int] = set()
readout_fidelities: list[float] = []
for key, qdata in qubits_info.items():
qid = _parse_quark_qubit_id((qdata or {}).get("index") if isinstance(qdata, dict) else key)
if qid is None:
qid = _parse_quark_qubit_id(key)
if qid is None:
continue
available_qubits.add(qid)
qdata = qdata if isinstance(qdata, dict) else {}
readout_0 = _number(qdata.get("readout g_fidelity"))
readout_1 = _number(qdata.get("readout e_fidelity"))
readout_values = [v for v in (readout_0, readout_1) if v is not None]
avg_readout = _avg(readout_values)
if avg_readout is not None:
readout_fidelities.append(avg_readout)
single_qubit_data.append(
SingleQubitData(
qubit_id=qid,
t1=_number(qdata.get("T1")),
t2=_number(qdata.get("T2")),
single_gate_fidelity=_number(qdata.get("fidelity")),
readout_fidelity_0=readout_0,
readout_fidelity_1=readout_1,
avg_readout_fidelity=avg_readout,
)
)
topology: list[QubitTopology] = []
two_qubit_data: list[TwoQubitData] = []
two_qubit_fidelities: list[float] = []
seen_edges: set[tuple[int, int]] = set()
for cdata in couplers_info.values():
if not isinstance(cdata, dict):
continue
qubits = cdata.get("qubits_index") or cdata.get("qubits") or cdata.get("qubit")
if not isinstance(qubits, (list, tuple)) or len(qubits) != 2:
continue
u = _parse_quark_qubit_id(qubits[0])
v = _parse_quark_qubit_id(qubits[1])
if u is None or v is None or u == v:
continue
available_qubits.update((u, v))
key = tuple(sorted((u, v)))
if key in seen_edges:
continue
seen_edges.add(key)
topology.append(QubitTopology(u=key[0], v=key[1]))
fidelity = _number(cdata.get("fidelity"))
if fidelity is not None and fidelity <= 0:
fidelity = None
if fidelity is not None:
two_qubit_fidelities.append(fidelity)
two_qubit_data.append(
TwoQubitData(
qubit_u=key[0],
qubit_v=key[1],
gates=(TwoQubitGateData(gate=two_qubit_basis, fidelity=fidelity),),
)
)
if not available_qubits:
nqubits = global_info.get("nqubits_available")
if isinstance(nqubits, int) and nqubits > 0:
available_qubits.update(range(nqubits))
single_qubit_gates = tuple(g for g in basis_gates if g != two_qubit_basis)
two_qubit_gates = (two_qubit_basis,) if two_qubit_basis else ()
avg_1q = _number(global_info.get("single_qubit_gate_fidelity_average"))
avg_2q = _number(global_info.get("two_qubit_gate_fidelity_average"))
return {
"num_qubits": int(global_info.get("nqubits_available") or len(available_qubits) or 0),
"topology": [[edge.u, edge.v] for edge in sorted(topology, key=lambda edge: (edge.u, edge.v))],
"available_qubits": sorted(available_qubits),
"valid_gates": basis_gates,
"per_qubit_calibration": [item.to_dict() for item in sorted(single_qubit_data, key=lambda item: item.qubit_id)],
"per_pair_calibration": [
item.to_dict() for item in sorted(two_qubit_data, key=lambda item: (item.qubit_u, item.qubit_v))
],
"global_info": ChipGlobalInfo(
single_qubit_gates=single_qubit_gates,
two_qubit_gates=two_qubit_gates,
single_qubit_gate_time=_seconds_to_nanoseconds(global_info.get("one_qubit_gate_length")),
two_qubit_gate_time=_seconds_to_nanoseconds(global_info.get("two_qubit_gate_length")),
).to_dict(),
"calibrated_at": chip_info.get("calibration_time"),
"avg_1q_fidelity": avg_1q,
"avg_2q_fidelity": avg_2q if avg_2q is not None else _avg(two_qubit_fidelities),
"avg_readout_fidelity": _avg(readout_fidelities),
"coherence_t1": _number(global_info.get("T1_average")),
"coherence_t2": _number(global_info.get("T2_average")),
}
def _normalise_backend_status(value: Any) -> str:
if isinstance(value, int):
return "available"
text = str(value or "").strip().lower()
return _BACKEND_STATUS_MAP.get(text, "unknown" if text else "unknown")
[docs]
class QuarkAdapter(QuantumAdapter):
"""Adapter for QuarkStudio's Quafu-SQC ``Task`` API."""
name = "quark"
def __init__(self, token: str | None = None, task_client: Any | None = None) -> None:
self._token = token
self._task_client = task_client
if self._token is None:
self._token = os.getenv("QUARK_API_KEY") or os.getenv("QPU_API_TOKEN")
if self._token is None:
try:
self._token = load_quark_config()["api_token"]
except Exception:
self._token = None
def _get_task_client(self) -> Any:
if self._task_client is None:
if not check_quark():
raise MissingDependencyError("quarkstudio", "quark")
if not self._token:
raise ImportError(
"QuarkStudio config not found. "
"Run `uniqc config set quark.QUARK_API_KEY <TOKEN>` or set QUARK_API_KEY."
)
from quark import Task
self._task_client = Task(self._token)
return self._task_client
[docs]
def is_available(self) -> bool:
"""Return True when the SDK is importable and a token is configured."""
return check_quark() and bool(self._token)
[docs]
def translate_circuit(self, originir: str) -> str:
"""Translate OriginIR text to OpenQASM 2.0."""
qasm = convert_oir_to_qasm(originir)
# Validate with Qiskit when available because QuarkStudio delegates this
# payload to OpenQASM2-capable tooling server-side.
try:
from qiskit import qasm2
qasm2.loads(qasm)
except ImportError:
pass
return qasm
def _task_payload(self, circuit: Any, *, shots: int, **kwargs: Any) -> dict[str, Any]:
qasm = str(circuit)
task: dict[str, Any] = {
"chip": kwargs.get("chip_id") or kwargs.get("backend_name") or kwargs.get("chip") or _DEFAULT_CHIP,
"name": kwargs.get("task_name") or kwargs.get("name") or _DEFAULT_TASK_NAME,
"circuit": qasm,
"shots": int(shots),
"compile": bool(kwargs.get("compile", True)),
}
options = dict(kwargs.get("options") or {})
for key in ("compiler", "correct", "open_dd", "target_qubits"):
if key in kwargs:
options[key] = kwargs[key]
if "clientip" not in options and os.getenv("CLIENT_REAL_IP"):
options["clientip"] = os.getenv("CLIENT_REAL_IP", "")
if options:
task["options"] = options
return task
[docs]
def submit(self, circuit: Any, *, shots: int = 1024, **kwargs: Any) -> str:
"""Submit a single OpenQASM 2.0 circuit to QuarkStudio."""
task = self._task_payload(circuit, shots=shots, **kwargs)
tid = self._get_task_client().run(task, repeat=max(1, int(shots) // 1024))
return _task_id(tid)
[docs]
def submit_batch(self, circuits: list[Any], *, shots: int = 1024, **kwargs: Any) -> list[str]:
"""Submit circuits one-by-one because the public SDK exposes single-task run()."""
return [self.submit(circuit, shots=shots, **kwargs) for circuit in circuits]
[docs]
def query(self, taskid: str) -> dict[str, Any]:
"""Query task status and return normalised status/result fields."""
client = self._get_task_client()
tid = int(taskid) if str(taskid).isdigit() else taskid
result = client.result(tid)
counts = _counts_from_result(result)
if counts is not None:
return {
"status": _normalise_status(result, has_counts=True),
"result": {
"counts": counts,
"raw_result": result,
},
}
status_payload = client.status(tid)
status = _normalise_status(status_payload)
response: dict[str, Any] = {"status": status}
if status != TASK_STATUS_RUNNING:
response["result"] = result if result else status_payload
return response
[docs]
def query_batch(self, taskids: list[str]) -> dict[str, Any]:
"""Query multiple task IDs and merge their statuses."""
results = [self.query(taskid) for taskid in taskids]
statuses = [r.get("status", TASK_STATUS_RUNNING) for r in results]
if TASK_STATUS_FAILED in statuses:
status = TASK_STATUS_FAILED
elif TASK_STATUS_RUNNING in statuses:
status = TASK_STATUS_RUNNING
else:
status = TASK_STATUS_SUCCESS
return {"status": status, "result": [r.get("result") for r in results]}
[docs]
def list_backends(self) -> list[dict[str, Any]]:
"""Return backend status entries from ``Task.status()``."""
raw = self._get_task_client().status()
if isinstance(raw, dict):
return [self._backend_entry(str(name), queue) for name, queue in raw.items()]
if isinstance(raw, list):
return [
self._backend_entry(
str(entry.get("name", "")),
entry.get("task_in_queue", entry.get("queue", 0)),
entry,
)
if isinstance(entry, dict)
else entry
for entry in raw
]
return []
[docs]
def get_backend_info(self, chip: str = _DEFAULT_CHIP) -> dict[str, Any]:
"""Fetch detailed backend information when quarkcircuit is installed."""
info = self._load_chip_basic_info(chip)
return info if isinstance(info, dict) else {}
def _backend_entry(self, name: str, queue: Any, base: dict[str, Any] | None = None) -> dict[str, Any]:
entry = dict(base or {})
entry.update(
{
"name": name,
"status": _normalise_backend_status(entry.get("status", queue)),
"task_in_queue": queue,
}
)
chip_info = self._load_chip_basic_info(name)
if isinstance(chip_info, dict) and chip_info.get("qubits_info"):
entry.update(_extract_quark_backend_details(chip_info))
entry["backend_info_available"] = True
elif isinstance(chip_info, dict):
entry["backend_info_available"] = False
return entry
def _load_chip_basic_info(self, chip: str) -> dict[str, Any] | None:
if not check_quarkcircuit():
return None
try:
from quark.circuit.backend import load_chip_basic_info
with contextlib.redirect_stdout(io.StringIO()):
info = load_chip_basic_info(chip)
return info if isinstance(info, dict) else None
except Exception:
return None
[docs]
def dry_run(self, originir: str, *, shots: int = 1024, **kwargs: Any) -> DryRunResult:
"""Validate OriginIR -> OpenQASM 2.0 locally without network calls."""
backend_name = kwargs.get("chip_id") or kwargs.get("backend_name") or kwargs.get("chip") or _DEFAULT_CHIP
try:
qasm = self.translate_circuit(originir)
except Exception as exc:
return _dry_run_failed(
str(exc),
details=f"Failed to translate OriginIR to OpenQASM 2.0 for QuarkStudio: {exc}",
backend_name=backend_name,
)
warnings: list[str] = []
if shots <= 0:
return _dry_run_failed(
"shots must be positive",
details=f"Invalid shots value for QuarkStudio: {shots}",
backend_name=backend_name,
)
if shots % 1024 != 0:
warnings.append("QuarkStudio documentation recommends shots as an integer multiple of 1024.")
return _dry_run_success(
"OriginIR translated to OpenQASM 2.0 and is structurally valid for QuarkStudio submission.",
backend_name=backend_name,
circuit_qubits=_qasm_qubit_count(qasm),
supported_gates=(
"h",
"x",
"y",
"z",
"s",
"sdg",
"t",
"tdg",
"sx",
"sxdg",
"rx",
"ry",
"rz",
"u1",
"u2",
"u3",
"cx",
"cz",
"swap",
"ccx",
"measure",
"barrier",
),
warnings=tuple(warnings),
)