Source code for artiq.coredevice.edge_counter

"""Driver for RTIO-enabled TTL edge counter.

Like for the TTL input PHYs, sensitivity can be configured over RTIO
(``gate_rising()``, etc.). In contrast to the former, however, the count is
accumulated in gateware, and only a single input event is generated at the end
of each gate period::

    with parallel:
        doppler_cool()
        self.pmt_counter.gate_rising(1 * ms)

    with parallel:
        readout()
        self.pmt_counter.gate_rising(100 * us)

    print("Doppler cooling counts:", self.pmt_counter.fetch_count())
    print("Readout counts:", self.pmt_counter.fetch_count())

For applications where the timestamps of the individual input events are not
required, this has two advantages over ``TTLInOut.count()`` beyond raw
throughput. First, it is easy to count events during multiple separate periods
without blocking to read back counts in between, as illustrated in the above
example. Secondly, as each count total only takes up a single input event, it
is much easier to acquire counts on several channels in parallel without
risking input FIFO overflows::

    # Using the TTLInOut driver, pmt_1 input events are only processed
    # after pmt_0 is done counting. To avoid RTIOOverflows, a round-robin
    # scheme would have to be implemented manually.

    with parallel:
        self.pmt_0.gate_rising(10 * ms)
        self.pmt_1.gate_rising(10 * ms)

    counts_0 = self.pmt_0.count(now_mu()) # blocks
    counts_1 = self.pmt_1.count(now_mu())

    #

    # Using gateware counters, only a single input event each is
    # generated, greatly reducing the load on the input FIFOs:

    with parallel:
        self.pmt_0_counter.gate_rising(10 * ms)
        self.pmt_1_counter.gate_rising(10 * ms)

    counts_0 = self.pmt_0_counter.fetch_count() # blocks
    counts_1 = self.pmt_1_counter.fetch_count()

See :mod:`artiq.gateware.rtio.phy.edge_counter` and
:meth:`artiq.gateware.eem.DIO.add_std` for the gateware components.
"""

from artiq.language.core import *
from artiq.language.types import *
from artiq.coredevice.rtio import (rtio_output, rtio_input_data,
                                   rtio_input_timestamped_data)
from numpy import int32, int64

CONFIG_COUNT_RISING = 0b0001
CONFIG_COUNT_FALLING = 0b0010
CONFIG_SEND_COUNT_EVENT = 0b0100
CONFIG_RESET_TO_ZERO = 0b1000


[docs]class CounterOverflow(Exception): """Raised when an edge counter value is read which indicates that the counter might have overflowed.""" pass
[docs]class EdgeCounter: """RTIO TTL edge counter driver driver. Like for regular TTL inputs, timeline periods where the counter is sensitive to a chosen set of input transitions can be specified. Unlike the former, however, the specified edges do not create individual input events; rather, the total count can be requested as a single input event from the core (typically at the end of the gate window). :param channel: The RTIO channel of the gateware phy. :param gateware_width: The width of the gateware counter register, in bits. This is only used for overflow handling; to change the size, the gateware needs to be rebuilt. """ kernel_invariants = {"core", "channel", "counter_max"} def __init__(self, dmgr, channel, gateware_width=31, core_device="core"): self.core = dmgr.get(core_device) self.channel = channel self.counter_max = (1 << (gateware_width - 1)) - 1
[docs] @kernel def gate_rising(self, duration): """Count rising edges for the given duration and request the total at the end. The counter is reset at the beginning of the gate period. Use :meth:`set_config` directly for more detailed control. :param duration: The duration for which the gate is to stay open. :return: The timestamp at the end of the gate period, in machine units. """ return self.gate_rising_mu(self.core.seconds_to_mu(duration))
[docs] @kernel def gate_falling(self, duration): """Count falling edges for the given duration and request the total at the end. The counter is reset at the beginning of the gate period. Use :meth:`set_config` directly for more detailed control. :param duration: The duration for which the gate is to stay open. :return: The timestamp at the end of the gate period, in machine units. """ return self.gate_falling_mu(self.core.seconds_to_mu(duration))
[docs] @kernel def gate_both(self, duration): """Count both rising and falling edges for the given duration, and request the total at the end. The counter is reset at the beginning of the gate period. Use :meth:`set_config` directly for more detailed control. :param duration: The duration for which the gate is to stay open. :return: The timestamp at the end of the gate period, in machine units. """ return self.gate_both_mu(self.core.seconds_to_mu(duration))
[docs] @kernel def gate_rising_mu(self, duration_mu): """See :meth:`gate_rising`.""" return self._gate_mu( duration_mu, count_rising=True, count_falling=False)
[docs] @kernel def gate_falling_mu(self, duration_mu): """See :meth:`gate_falling`.""" return self._gate_mu( duration_mu, count_rising=False, count_falling=True)
[docs] @kernel def gate_both_mu(self, duration_mu): """See :meth:`gate_both_mu`.""" return self._gate_mu( duration_mu, count_rising=True, count_falling=True)
@kernel def _gate_mu(self, duration_mu, count_rising, count_falling): self.set_config( count_rising=count_rising, count_falling=count_falling, send_count_event=False, reset_to_zero=True) delay_mu(duration_mu) self.set_config( count_rising=False, count_falling=False, send_count_event=True, reset_to_zero=False) return now_mu()
[docs] @kernel def set_config(self, count_rising: TBool, count_falling: TBool, send_count_event: TBool, reset_to_zero: TBool): """Emit an RTIO event at the current timeline position to set the gateware configuration. For most use cases, the `gate_*` wrappers will be more convenient. :param count_rising: Whether to count rising signal edges. :param count_falling: Whether to count falling signal edges. :param send_count_event: If `True`, an input event with the current counter value is generated on the next clock cycle (once). :param reset_to_zero: If `True`, the counter value is reset to zero on the next clock cycle (once). """ config = int32(0) if count_rising: config |= CONFIG_COUNT_RISING if count_falling: config |= CONFIG_COUNT_FALLING if send_count_event: config |= CONFIG_SEND_COUNT_EVENT if reset_to_zero: config |= CONFIG_RESET_TO_ZERO rtio_output(self.channel << 8, config)
[docs] @kernel def fetch_count(self) -> TInt32: """Wait for and return count total from previously requested input event. It is valid to trigger multiple gate periods without immediately reading back the count total; the results will be returned in order on subsequent fetch calls. This function blocks until a result becomes available. """ count = rtio_input_data(self.channel) if count == self.counter_max: raise CounterOverflow( "Input edge counter overflow on RTIO channel {0}", int64(self.channel)) return count
[docs] @kernel def fetch_timestamped_count( self, timeout_mu=int64(-1)) -> TTuple([TInt64, TInt32]): """Wait for and return the timestamp and count total of a previously requested input event. It is valid to trigger multiple gate periods without immediately reading back the count total; the results will be returned in order on subsequent fetch calls. This function blocks until a result becomes available or the given timeout elapses. :return: A tuple of timestamp (-1 if timeout elapsed) and counter value. (The timestamp is that of the requested input event – typically the gate closing time – and not that of any input edges.) """ timestamp, count = rtio_input_timestamped_data(timeout_mu, self.channel) if count == self.counter_max: raise CounterOverflow( "Input edge counter overflow on RTIO channel {0}", int64(self.channel)) return timestamp, count