Source code for tristan

"""
Utilities for processing data from the Large Area Time-Resolved Detector

This module provides tools to interpret NeXus-like data in HDF5 format from the
experimental Timepix-based event-mode detector, codenamed Tristan, at Diamond Light
Source.
"""

from __future__ import annotations

__author__ = "Diamond Light Source — Data Analysis Group"
__email__ = "dataanalysis@diamond.ac.uk"
__version__ = "0.4.2"
__version_tuple__ = tuple(int(x) for x in __version__.split("."))

from contextlib import ContextDecorator
from logging import ERROR

import dask
import pint
from dask.distributed import Client, progress, wait

ureg = pint.UnitRegistry()

clock_frequency = ureg.Quantity(6.4e8, "Hz").to_compact()


[docs] class WithLocalDistributedCluster(ContextDecorator): """ A decorator to run a function in a distributed.Client context. Example: Using this decorator like so >>> @WithLocalDistributedCluster() ... def foo(*args): ... ... is equivalent to >>> def foo(*args): ... with Client(processes=False): ... ... """ def __init__(self, *args, **kwargs): self._client = None super().__init__(*args, **kwargs) def __enter__(self, *args, **kwargs): self._client = Client( processes=False, silence_logs=ERROR, dashboard_address=None ) self._client.__enter__(*args, **kwargs) def __exit__(self, *args, **kwargs): if self._client: self._client.__exit__(*args, **kwargs)
[docs] def compute_with_progress(*collection, gather=False): """ Compute Dask collections, showing a progress bar, assuming a distributed client. Args: collection: A Dask object or built-in collection of objects. gather: If true, return the computed result. """ collection = dask.persist(*collection) print(progress(collection) or "") wait(collection) return dask.compute(*collection, sync=True) if gather else None