import os, sys, tempfile, subprocess, io
from functools import wraps
from math import floor, ceil
import numpy as np
from numpy import int32, int64, uint32, uint64, float64, bool_, str_, ndarray
from types import GenericAlias, ModuleType, SimpleNamespace
from typing import _GenericAlias, Generic, Literal, TypeVar
from artiq.language.core import *
from artiq.language.core import _ConstGenericMarker
from artiq.language import core as core_language
from artiq.language.units import *
from artiq.language.embedding_map import EmbeddingMap
from artiq.coredevice.comm_kernel import CommKernel, CommKernelDummy
@extern
def rtio_init():
raise NotImplementedError("syscall not simulated")
@extern
def rtio_get_destination_status(destination: int32) -> bool:
raise NotImplementedError("syscall not simulated")
@extern
def rtio_get_counter() -> int64:
raise NotImplementedError("syscall not simulated")
@extern
def test_exception_id_sync(id: int32):
raise NotImplementedError("syscall not simulated")
[docs]
@compile
class Core:
"""Core device driver.
:param host: hostname or IP address of the core device.
:param ref_period: period of the reference clock for the RTIO subsystem.
On platforms that use clock multiplication and SERDES-based PHYs,
this is the period after multiplication. For example, with a RTIO core
clocked at 125MHz and a SERDES multiplication factor of 8, the
reference period is ``1 ns``.
The machine time unit (``mu``) is equal to this period.
:param ref_multiplier: ratio between the RTIO fine timestamp frequency
and the RTIO coarse timestamp frequency (e.g. SERDES multiplication
factor).
:param analyzer_proxy: name of the core device analyzer proxy to trigger
(optional).
:param analyze_at_run_end: automatically trigger the core device analyzer
proxy after the Experiment's run stage finishes.
:param report_invariants: report variables which are not changed inside
kernels and are thus candidates for KernelInvariant annotation
"""
ref_period: KernelInvariant[float]
ref_multiplier: KernelInvariant[int32]
coarse_ref_period: KernelInvariant[float]
def __init__(self, dmgr,
host, ref_period,
analyzer_proxy=None, analyze_at_run_end=False,
ref_multiplier=8,
target="rv32g", satellite_cpu_targets={},
report_invariants=False):
import nac3artiq
from scipy import special
from scipy import linalg
nac3_builtins = {
"int": int,
"float": float,
"bool": bool,
"str": str,
"list": list,
"tuple": tuple,
"Exception": Exception,
"range": range,
"enumerate": enumerate,
"round": round,
"round64": round64,
"floor": floor,
"floor64": floor64,
"ceil": ceil,
"ceil64": ceil64,
"len": len,
"min": min,
"max": max,
"abs": abs,
"some": Some,
"staticmethod": staticmethod,
"types": {
"GenericAlias": GenericAlias,
"ModuleType": ModuleType,
},
"typing": {
"_GenericAlias": _GenericAlias,
"Generic": Generic,
"TypeVar": TypeVar,
"Literal": Literal,
},
"numpy": {
"int32": int32,
"int64": int64,
"uint32": uint32,
"uint64": uint64,
"float64": float64,
"bool_": bool_,
"str_": str_,
"ndarray": ndarray,
"np_empty": np.empty,
"np_zeros": np.zeros,
"np_ones": np.ones,
"np_full": np.full,
"np_array": np.array,
"np_eye": np.eye,
"np_identity": np.identity,
"np_size": np.size,
"np_shape": np.shape,
"np_broadcast_to": np.broadcast_to,
"np_transpose": np.transpose,
"np_reshape": np.reshape,
"np_round": np.round,
"np_floor": np.floor,
"np_ceil": np.ceil,
"np_min": np.min,
"np_minimum": np.minimum,
"np_max": np.max,
"np_maximum": np.maximum,
"np_argmin": np.argmin,
"np_argmax": np.argmax,
"np_isnan": np.isnan,
"np_isinf": np.isinf,
"np_sin": np.sin,
"np_cos": np.cos,
"np_exp": np.exp,
"np_exp2": np.exp2,
"np_log": np.log,
"np_log10": np.log10,
"np_log2": np.log2,
"np_fabs": np.fabs,
"np_sqrt": np.sqrt,
"np_rint": np.rint,
"np_tan": np.tan,
"np_arcsin": np.arcsin,
"np_arccos": np.arccos,
"np_arctan": np.arctan,
"np_sinh": np.sinh,
"np_cosh": np.cosh,
"np_tanh": np.tanh,
"np_arcsinh": np.arcsinh,
"np_arccosh": np.arccosh,
"np_arctanh": np.arctanh,
"np_expm1": np.expm1,
"np_cbrt": np.cbrt,
"sp_spec_erf": special.erf,
"sp_spec_erfc": special.erfc,
"sp_spec_gamma": special.gamma,
"sp_spec_gammaln": special.gammaln,
"sp_spec_j0": special.j0,
"sp_spec_j1": special.j1,
"np_arctan2": np.arctan2,
"np_copysign": np.copysign,
"np_fmax": np.fmax,
"np_fmin": np.fmin,
"np_ldexp": np.ldexp,
"np_hypot": np.hypot,
"np_nextafter": np.nextafter,
"np_any": np.any,
"np_all": np.all,
"np_dot": np.dot,
"np_linalg_cholesky": np.linalg.cholesky,
"np_linalg_qr": np.linalg.qr,
"np_linalg_svd": np.linalg.svd,
"np_linalg_inv": np.linalg.inv,
"np_linalg_pinv": np.linalg.pinv,
"np_linalg_matrix_power": np.linalg.matrix_power,
"np_linalg_det": np.linalg.det,
"sp_linalg_lu": linalg.lu,
"sp_linalg_schur": linalg.schur,
"sp_linalg_hessenberg": linalg.hessenberg,
},
"artiq": {
"Auto": Auto,
"Kernel": Kernel,
"KernelInvariant": KernelInvariant,
"_ConstGenericMarker": _ConstGenericMarker,
"none": none,
"virtual": virtual,
"Option": Option,
# Decorator functions
"compile": compile,
"extern": extern,
"kernel": kernel,
"portable": portable,
"rpc": rpc,
},
}
self.ref_period = ref_period
self.ref_multiplier = ref_multiplier
self.coarse_ref_period = ref_period*ref_multiplier
if host is None:
self.comm = CommKernelDummy()
else:
self.comm = CommKernel(host)
self.first_run = True
self.dmgr = dmgr
self.core = self
self.comm.core = self
self.target = target
self.analyzed = False
self.compiler = nac3artiq.NAC3(target, nac3_builtins)
self.analyzer_proxy_name = analyzer_proxy
self.analyze_at_run_end = analyze_at_run_end
self.analyzer_proxy = None
self.report_invariants = report_invariants
def notify_run_end(self):
if self.analyze_at_run_end:
self.trigger_analyzer_proxy()
[docs]
def close(self):
"""Disconnect core device and close sockets.
"""
self.comm.close()
def compile(self, method, args, kwargs, embedding_map, output_filename=None, debug_filename=None, target=None):
if target is not None:
# NAC3TODO: subkernels
raise NotImplementedError
if (output_filename is None) != (debug_filename is None):
raise ValueError("both output and debug filenames should be specified or unspecified")
if not self.analyzed:
self.compiler.analyze(
core_language._registered_functions,
core_language._registered_classes,
core_language._special_ids,
)
self.analyzed = True
if hasattr(method, "__self__"):
obj = method.__self__
name = method.__name__
else:
obj = method
name = ""
# NAC3TODO: handle self.report_invariants
if output_filename is None and debug_filename is None:
return self.compiler.compile_method_to_mem(obj, name, args, embedding_map)
else:
self.compiler.compile_method_to_file(obj, name, args, output_filename, debug_filename, embedding_map)
def run(self, function, args, kwargs):
embedding_map = EmbeddingMap()
kernel_library, debug_object = self.compile(function, args, kwargs, embedding_map)
self._run_compiled(kernel_library, debug_object, embedding_map)
# set by NAC3
if embedding_map.expects_return:
return embedding_map.return_value
def _run_compiled(self, kernel_library, debug_object, embedding_map):
if self.first_run:
self.comm.check_system_info()
self.first_run = False
symbolizer = lambda addresses: symbolize(debug_object, addresses)
self.comm.load(kernel_library)
self.comm.run()
self.comm.serve(embedding_map, symbolizer)
[docs]
def precompile(self, function, *args, **kwargs):
"""Precompile a kernel and return a callable that executes it on the core device
at a later time.
Arguments to the kernel are set at compilation time and passed to this function,
as additional positional and keyword arguments.
The returned callable accepts no arguments.
Precompiled kernels may use RPCs and subkernels.
Object attributes at the beginning of a precompiled kernel execution have the
values they had at precompilation time. If up-to-date values are required,
use RPC to read them.
Similarly, modified values are not written back, and explicit RPC should be used
to modify host objects.
Carefully review the source code of drivers calls used in precompiled kernels, as
they may rely on host object attributes being transfered between kernel calls.
Examples include code used to control DDS phase, and Urukul RF switch control
via the CPLD register.
The return value of the callable is the return value of the kernel, if any.
The callable may be called several times.
"""
if not getattr(function, "__artiq_kernel__"):
raise ValueError("Argument is not a kernel")
embedding_map = EmbeddingMap()
kernel_library = self.compile(function, args, kwargs, embedding_map)
@wraps(function)
def run_precompiled():
self._run_compiled(kernel_library, embedding_map)
return run_precompiled
[docs]
@portable
def seconds_to_mu(self, seconds: float) -> int64:
"""Convert seconds to the corresponding number of machine units
(fine RTIO cycles).
:param seconds: time (in seconds) to convert.
"""
return round64(seconds/self.ref_period)
[docs]
@portable
def mu_to_seconds(self, mu: int64) -> float:
"""Convert machine units (fine RTIO cycles) to seconds.
:param mu: cycle count to convert.
"""
return float(mu)*self.ref_period
@kernel
def delay(self, dt: float):
delay_mu(self.seconds_to_mu(dt))
[docs]
@kernel
def get_rtio_counter_mu(self) -> int64:
"""Retrieve the current value of the hardware RTIO timeline counter.
As the timing of kernel code executed on the CPU is inherently
non-deterministic, the return value is by necessity only a lower bound
for the actual value of the hardware register at the instant when
execution resumes in the caller.
For a more detailed description of these concepts, see :doc:`rtio`.
"""
return rtio_get_counter()
[docs]
@kernel
def wait_until_mu(self, cursor_mu: int64):
"""Block execution until the hardware RTIO counter reaches the given
value (see :meth:`get_rtio_counter_mu`).
If the hardware counter has already passed the given time, the function
returns immediately.
"""
while self.get_rtio_counter_mu() < cursor_mu:
pass
[docs]
@kernel
def get_rtio_destination_status(self, destination: int32) -> bool:
"""Returns whether the specified RTIO destination is up.
This is particularly useful in startup kernels to delay
startup until certain DRTIO destinations are available."""
return rtio_get_destination_status(destination)
[docs]
@kernel
def reset(self):
"""Clear RTIO FIFOs, release RTIO PHY reset, and set the time cursor
at the current value of the hardware RTIO counter plus a margin of
125000 machine units."""
rtio_init()
at_mu(rtio_get_counter() + int64(125000))
[docs]
@kernel
def break_realtime(self):
"""Set the time cursor after the current value of the hardware RTIO
counter plus a margin of 125000 machine units.
If the time cursor is already after that position, this function
does nothing."""
min_now = rtio_get_counter() + int64(125000)
if now_mu() < min_now:
at_mu(min_now)
[docs]
def trigger_analyzer_proxy(self):
"""Causes the core analyzer proxy to retrieve a dump from the device,
and distribute it to all connected clients (typically dashboards).
Returns only after the dump has been retrieved from the device.
Raises :exc:`IOError` if no analyzer proxy has been configured, or if the
analyzer proxy fails. In the latter case, more details would be
available in the proxy log.
"""
if self.analyzer_proxy is None:
if self.analyzer_proxy_name is not None:
self.analyzer_proxy = self.dmgr.get(self.analyzer_proxy_name)
if self.analyzer_proxy is None:
raise IOError("No analyzer proxy configured")
else:
self.analyzer_proxy.trigger()
class RunTool:
def __init__(self, pattern, **tempdata):
self._pattern = pattern
self._tempdata = tempdata
self._tempnames = {}
self._tempfiles = {}
def __enter__(self):
for key, data in self._tempdata.items():
if data is None:
fd, filename = tempfile.mkstemp()
os.close(fd)
self._tempnames[key] = filename
else:
with tempfile.NamedTemporaryFile(delete=False) as f:
f.write(data)
self._tempnames[key] = f.name
cmdline = []
for argument in self._pattern:
cmdline.append(argument.format(**self._tempnames))
# https://bugs.python.org/issue17023
windows = os.name == "nt"
process = subprocess.Popen(cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True, shell=windows)
stdout, stderr = process.communicate()
if process.returncode != 0:
raise Exception("{} invocation failed: {}".
format(cmdline[0], stderr))
self._tempfiles["__stdout__"] = io.StringIO(stdout)
for key in self._tempdata:
if self._tempdata[key] is None:
self._tempfiles[key] = open(self._tempnames[key], "rb")
return self._tempfiles
def __exit__(self, exc_typ, exc_value, exc_trace):
for file in self._tempfiles.values():
file.close()
for filename in self._tempnames.values():
os.unlink(filename)
def symbolize(debug_library, addresses):
import nac3artiq
if addresses == []:
return []
# We got a list of return addresses, i.e. addresses of instructions
# just after the call. Offset them back to get an address somewhere
# inside the call instruction (or its delay slot), since that's what
# the backtrace entry should point at.
last_inlined = None
offset_addresses = [addr - 1 for addr in addresses]
call_records = nac3artiq.symbolize(debug_library, offset_addresses)
backtrace = []
for record in call_records:
address = record.address
inlined = False
if address is None:
address = backtrace[-1][4] # inlined
inlined = True
else:
address += 1
filename = record.file
dirname = record.dir
if dirname is not None:
filename = dirname + "/" + filename
function, line, column = record.name, record.line, record.column
if inlined:
last_inlined.append((filename, line, column, function, address))
else:
last_inlined = []
backtrace.append((filename, line, column, function, address,
last_inlined))
return backtrace