Source code for artiq.coredevice.core

import os, sys, tempfile, subprocess, io
from functools import wraps
from math import floor, ceil
import numpy as np
from numpy import int32, int64, uint32, uint64, float64, bool_, str_, ndarray
from types import GenericAlias, ModuleType, SimpleNamespace
from typing import _GenericAlias, Generic, Literal, TypeVar

from artiq.language.core import *
from artiq.language.core import _ConstGenericMarker
from artiq.language import core as core_language
from artiq.language.units import *
from artiq.language.embedding_map import EmbeddingMap

from artiq.coredevice.comm_kernel import CommKernel, CommKernelDummy


@extern
def rtio_init():
    raise NotImplementedError("syscall not simulated")

@extern
def rtio_get_destination_status(destination: int32) -> bool:
    raise NotImplementedError("syscall not simulated")

@extern
def rtio_get_counter() -> int64:
    raise NotImplementedError("syscall not simulated")

@extern
def test_exception_id_sync(id: int32):
    raise NotImplementedError("syscall not simulated")


[docs] @compile class Core: """Core device driver. :param host: hostname or IP address of the core device. :param ref_period: period of the reference clock for the RTIO subsystem. On platforms that use clock multiplication and SERDES-based PHYs, this is the period after multiplication. For example, with a RTIO core clocked at 125MHz and a SERDES multiplication factor of 8, the reference period is ``1 ns``. The machine time unit (``mu``) is equal to this period. :param ref_multiplier: ratio between the RTIO fine timestamp frequency and the RTIO coarse timestamp frequency (e.g. SERDES multiplication factor). :param analyzer_proxy: name of the core device analyzer proxy to trigger (optional). :param analyze_at_run_end: automatically trigger the core device analyzer proxy after the Experiment's run stage finishes. :param report_invariants: report variables which are not changed inside kernels and are thus candidates for KernelInvariant annotation """ ref_period: KernelInvariant[float] ref_multiplier: KernelInvariant[int32] coarse_ref_period: KernelInvariant[float] def __init__(self, dmgr, host, ref_period, analyzer_proxy=None, analyze_at_run_end=False, ref_multiplier=8, target="rv32g", satellite_cpu_targets={}, report_invariants=False): import nac3artiq from scipy import special from scipy import linalg nac3_builtins = { "int": int, "float": float, "bool": bool, "str": str, "list": list, "tuple": tuple, "Exception": Exception, "range": range, "enumerate": enumerate, "round": round, "round64": round64, "floor": floor, "floor64": floor64, "ceil": ceil, "ceil64": ceil64, "len": len, "min": min, "max": max, "abs": abs, "some": Some, "staticmethod": staticmethod, "types": { "GenericAlias": GenericAlias, "ModuleType": ModuleType, }, "typing": { "_GenericAlias": _GenericAlias, "Generic": Generic, "TypeVar": TypeVar, "Literal": Literal, }, "numpy": { "int32": int32, "int64": int64, "uint32": uint32, "uint64": uint64, "float64": float64, "bool_": bool_, "str_": str_, "ndarray": ndarray, "np_empty": np.empty, "np_zeros": np.zeros, "np_ones": np.ones, "np_full": np.full, "np_array": np.array, "np_eye": np.eye, "np_identity": np.identity, "np_size": np.size, "np_shape": np.shape, "np_broadcast_to": np.broadcast_to, "np_transpose": np.transpose, "np_reshape": np.reshape, "np_round": np.round, "np_floor": np.floor, "np_ceil": np.ceil, "np_min": np.min, "np_minimum": np.minimum, "np_max": np.max, "np_maximum": np.maximum, "np_argmin": np.argmin, "np_argmax": np.argmax, "np_isnan": np.isnan, "np_isinf": np.isinf, "np_sin": np.sin, "np_cos": np.cos, "np_exp": np.exp, "np_exp2": np.exp2, "np_log": np.log, "np_log10": np.log10, "np_log2": np.log2, "np_fabs": np.fabs, "np_sqrt": np.sqrt, "np_rint": np.rint, "np_tan": np.tan, "np_arcsin": np.arcsin, "np_arccos": np.arccos, "np_arctan": np.arctan, "np_sinh": np.sinh, "np_cosh": np.cosh, "np_tanh": np.tanh, "np_arcsinh": np.arcsinh, "np_arccosh": np.arccosh, "np_arctanh": np.arctanh, "np_expm1": np.expm1, "np_cbrt": np.cbrt, "sp_spec_erf": special.erf, "sp_spec_erfc": special.erfc, "sp_spec_gamma": special.gamma, "sp_spec_gammaln": special.gammaln, "sp_spec_j0": special.j0, "sp_spec_j1": special.j1, "np_arctan2": np.arctan2, "np_copysign": np.copysign, "np_fmax": np.fmax, "np_fmin": np.fmin, "np_ldexp": np.ldexp, "np_hypot": np.hypot, "np_nextafter": np.nextafter, "np_any": np.any, "np_all": np.all, "np_dot": np.dot, "np_linalg_cholesky": np.linalg.cholesky, "np_linalg_qr": np.linalg.qr, "np_linalg_svd": np.linalg.svd, "np_linalg_inv": np.linalg.inv, "np_linalg_pinv": np.linalg.pinv, "np_linalg_matrix_power": np.linalg.matrix_power, "np_linalg_det": np.linalg.det, "sp_linalg_lu": linalg.lu, "sp_linalg_schur": linalg.schur, "sp_linalg_hessenberg": linalg.hessenberg, }, "artiq": { "Auto": Auto, "Kernel": Kernel, "KernelInvariant": KernelInvariant, "_ConstGenericMarker": _ConstGenericMarker, "none": none, "virtual": virtual, "Option": Option, # Decorator functions "compile": compile, "extern": extern, "kernel": kernel, "portable": portable, "rpc": rpc, }, } self.ref_period = ref_period self.ref_multiplier = ref_multiplier self.coarse_ref_period = ref_period*ref_multiplier if host is None: self.comm = CommKernelDummy() else: self.comm = CommKernel(host) self.first_run = True self.dmgr = dmgr self.core = self self.comm.core = self self.target = target self.analyzed = False self.compiler = nac3artiq.NAC3(target, nac3_builtins) self.analyzer_proxy_name = analyzer_proxy self.analyze_at_run_end = analyze_at_run_end self.analyzer_proxy = None self.report_invariants = report_invariants def notify_run_end(self): if self.analyze_at_run_end: self.trigger_analyzer_proxy()
[docs] def close(self): """Disconnect core device and close sockets. """ self.comm.close()
def compile(self, method, args, kwargs, embedding_map, output_filename=None, debug_filename=None, target=None): if target is not None: # NAC3TODO: subkernels raise NotImplementedError if (output_filename is None) != (debug_filename is None): raise ValueError("both output and debug filenames should be specified or unspecified") if not self.analyzed: self.compiler.analyze( core_language._registered_functions, core_language._registered_classes, core_language._special_ids, ) self.analyzed = True if hasattr(method, "__self__"): obj = method.__self__ name = method.__name__ else: obj = method name = "" # NAC3TODO: handle self.report_invariants if output_filename is None and debug_filename is None: return self.compiler.compile_method_to_mem(obj, name, args, embedding_map) else: self.compiler.compile_method_to_file(obj, name, args, output_filename, debug_filename, embedding_map) def run(self, function, args, kwargs): embedding_map = EmbeddingMap() kernel_library, debug_object = self.compile(function, args, kwargs, embedding_map) self._run_compiled(kernel_library, debug_object, embedding_map) # set by NAC3 if embedding_map.expects_return: return embedding_map.return_value def _run_compiled(self, kernel_library, debug_object, embedding_map): if self.first_run: self.comm.check_system_info() self.first_run = False symbolizer = lambda addresses: symbolize(debug_object, addresses) self.comm.load(kernel_library) self.comm.run() self.comm.serve(embedding_map, symbolizer)
[docs] def precompile(self, function, *args, **kwargs): """Precompile a kernel and return a callable that executes it on the core device at a later time. Arguments to the kernel are set at compilation time and passed to this function, as additional positional and keyword arguments. The returned callable accepts no arguments. Precompiled kernels may use RPCs and subkernels. Object attributes at the beginning of a precompiled kernel execution have the values they had at precompilation time. If up-to-date values are required, use RPC to read them. Similarly, modified values are not written back, and explicit RPC should be used to modify host objects. Carefully review the source code of drivers calls used in precompiled kernels, as they may rely on host object attributes being transfered between kernel calls. Examples include code used to control DDS phase, and Urukul RF switch control via the CPLD register. The return value of the callable is the return value of the kernel, if any. The callable may be called several times. """ if not getattr(function, "__artiq_kernel__"): raise ValueError("Argument is not a kernel") embedding_map = EmbeddingMap() kernel_library = self.compile(function, args, kwargs, embedding_map) @wraps(function) def run_precompiled(): self._run_compiled(kernel_library, embedding_map) return run_precompiled
[docs] @portable def seconds_to_mu(self, seconds: float) -> int64: """Convert seconds to the corresponding number of machine units (fine RTIO cycles). :param seconds: time (in seconds) to convert. """ return round64(seconds/self.ref_period)
[docs] @portable def mu_to_seconds(self, mu: int64) -> float: """Convert machine units (fine RTIO cycles) to seconds. :param mu: cycle count to convert. """ return float(mu)*self.ref_period
@kernel def delay(self, dt: float): delay_mu(self.seconds_to_mu(dt))
[docs] @kernel def get_rtio_counter_mu(self) -> int64: """Retrieve the current value of the hardware RTIO timeline counter. As the timing of kernel code executed on the CPU is inherently non-deterministic, the return value is by necessity only a lower bound for the actual value of the hardware register at the instant when execution resumes in the caller. For a more detailed description of these concepts, see :doc:`rtio`. """ return rtio_get_counter()
[docs] @kernel def wait_until_mu(self, cursor_mu: int64): """Block execution until the hardware RTIO counter reaches the given value (see :meth:`get_rtio_counter_mu`). If the hardware counter has already passed the given time, the function returns immediately. """ while self.get_rtio_counter_mu() < cursor_mu: pass
[docs] @kernel def get_rtio_destination_status(self, destination: int32) -> bool: """Returns whether the specified RTIO destination is up. This is particularly useful in startup kernels to delay startup until certain DRTIO destinations are available.""" return rtio_get_destination_status(destination)
[docs] @kernel def reset(self): """Clear RTIO FIFOs, release RTIO PHY reset, and set the time cursor at the current value of the hardware RTIO counter plus a margin of 125000 machine units.""" rtio_init() at_mu(rtio_get_counter() + int64(125000))
[docs] @kernel def break_realtime(self): """Set the time cursor after the current value of the hardware RTIO counter plus a margin of 125000 machine units. If the time cursor is already after that position, this function does nothing.""" min_now = rtio_get_counter() + int64(125000) if now_mu() < min_now: at_mu(min_now)
[docs] def trigger_analyzer_proxy(self): """Causes the core analyzer proxy to retrieve a dump from the device, and distribute it to all connected clients (typically dashboards). Returns only after the dump has been retrieved from the device. Raises :exc:`IOError` if no analyzer proxy has been configured, or if the analyzer proxy fails. In the latter case, more details would be available in the proxy log. """ if self.analyzer_proxy is None: if self.analyzer_proxy_name is not None: self.analyzer_proxy = self.dmgr.get(self.analyzer_proxy_name) if self.analyzer_proxy is None: raise IOError("No analyzer proxy configured") else: self.analyzer_proxy.trigger()
class RunTool: def __init__(self, pattern, **tempdata): self._pattern = pattern self._tempdata = tempdata self._tempnames = {} self._tempfiles = {} def __enter__(self): for key, data in self._tempdata.items(): if data is None: fd, filename = tempfile.mkstemp() os.close(fd) self._tempnames[key] = filename else: with tempfile.NamedTemporaryFile(delete=False) as f: f.write(data) self._tempnames[key] = f.name cmdline = [] for argument in self._pattern: cmdline.append(argument.format(**self._tempnames)) # https://bugs.python.org/issue17023 windows = os.name == "nt" process = subprocess.Popen(cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, shell=windows) stdout, stderr = process.communicate() if process.returncode != 0: raise Exception("{} invocation failed: {}". format(cmdline[0], stderr)) self._tempfiles["__stdout__"] = io.StringIO(stdout) for key in self._tempdata: if self._tempdata[key] is None: self._tempfiles[key] = open(self._tempnames[key], "rb") return self._tempfiles def __exit__(self, exc_typ, exc_value, exc_trace): for file in self._tempfiles.values(): file.close() for filename in self._tempnames.values(): os.unlink(filename) def symbolize(debug_library, addresses): import nac3artiq if addresses == []: return [] # We got a list of return addresses, i.e. addresses of instructions # just after the call. Offset them back to get an address somewhere # inside the call instruction (or its delay slot), since that's what # the backtrace entry should point at. last_inlined = None offset_addresses = [addr - 1 for addr in addresses] call_records = nac3artiq.symbolize(debug_library, offset_addresses) backtrace = [] for record in call_records: address = record.address inlined = False if address is None: address = backtrace[-1][4] # inlined inlined = True else: address += 1 filename = record.file dirname = record.dir if dirname is not None: filename = dirname + "/" + filename function, line, column = record.name, record.line, record.column if inlined: last_inlined.append((filename, line, column, function, address)) else: last_inlined = [] backtrace.append((filename, line, column, function, address, last_inlined)) return backtrace