Source code for tristan.diagnostics.utils

"""General utilities for the diagnostic tools."""

from __future__ import annotations

# Some platforms seem to raise OSError: [Errno -101] NetCDF: HDF error:
# '/path/to/HDF5/file.h5'
# Importing netCDF4 before h5py here seems to fix it.  ¯\_(ツ)_/¯
import netCDF4  # noqa F401

import glob
import logging
from enum import Enum
from pathlib import Path

import h5py
import numpy as np

from ..data import (
    cue_id_key,
    cue_time_key,
    event_id_dtype,
    event_location_key,
    shutter_close,
    shutter_open,
)

# Define a logger
logger = logging.getLogger("TristanDiagnostics.Utils")

# Some constants
TIME_RES = 1.5625e-9  # timing resolution fine
DIV = event_id_dtype(0x2000)


class FileChecker(str, Enum):
    CUES = "cues"
    EVENTS = "events"


class TristanConfig(str, Enum):
    T1M = "1M"
    T2M = "2M"
    T10M = "10M"

    @staticmethod
    def config_opts():
        return list(map(lambda d: d.value, TristanConfig))


TRISTAN_CONFIG = {"10M": (2, 5), "2M": (1, 2), "1M": (1, 1)}  # (H, V) -.> (fast, slow)

# Tristan 10M specs
MODULE_SIZE = (515, 2069)  # slow, fast
GAP_SIZE = (117, 45)  # slow, fast
IMAGE_SIZE_10M = (3043, 4183)  # slow, fast


[docs] def get_full_file_list(filename_template: str | Path) -> list[Path]: """Given a template filename, including directory, get a list of all the files\ using that template. Args: filename_template(str | Path): Template to look up in the directory. Returns: file_list(list[Path]): A list of all the files found matching the template. """ if not isinstance(filename_template, Path): filename_template = Path(filename_template) file_list = [ Path(f).expanduser().resolve() for f in sorted(glob.glob(filename_template.as_posix())) ] return file_list
[docs] def define_modules(det_config: TristanConfig = "10M") -> dict[str, tuple]: """Define the start and end pixel of each module in the Tristan detector. Args: det_config (TristanConfig, optional): Specify how many physical modules make up the Tristan\ detector currently in use. Available configurations: 1M, 2M, 10M.\ Defaults to "10M". Returns: dict[str, tuple]: Start and end pixel value of each module - which are defined\ by a (x,y) tuple. For example a Tristan 1M will return \ {"0": ([0, 515], [0, 2069])} """ if det_config not in TristanConfig.config_opts(): logger.error(f"Detector configuration {det_config} unknown.") raise ValueError( f"Detector configuration unknown. Please pass one of {TristanConfig.config_opts()}." ) modules = TRISTAN_CONFIG[det_config] mod = {} n = 0 for _y in range(modules[0]): for _x in range(modules[1]): int_x = [ _x * (MODULE_SIZE[0] + GAP_SIZE[0]), _x * (MODULE_SIZE[0] + GAP_SIZE[0]) + MODULE_SIZE[0], ] int_y = [ _y * (MODULE_SIZE[1] + GAP_SIZE[1]), _y * (MODULE_SIZE[1] + GAP_SIZE[1]) + MODULE_SIZE[1], ] mod[str(n)] = (int_x, int_y) # mod[(_x, _y)] = (int_x, int_y) n += 1 return mod
[docs] def module_cooordinates(det_config: TristanConfig = "10M") -> dict[str, tuple]: """ Create a conversion table between module number and its location on the detector. Args: det_config(TristanConfig, optional): Specify how many physical modules make up the Tristan\ detector currently in use. Available configurations: 1M, 2M, 10M.\ Defaults to "10M". Returns: dict[str, tuple]: effectively a conversion table mapping the module number to its\ location on the detector. For example a Trisstan 1M will return \ {"0": (0, 0)} """ if det_config not in TristanConfig.config_opts(): logger.error(f"Detector configuration {det_config} unknown.") raise ValueError( f"Detector configuration unknown. Please pass one of {TristanConfig.config_opts()}." ) modules = TRISTAN_CONFIG[det_config] table = {} n = 0 for _y in range(modules[0]): for _x in range(modules[1]): table[str(n)] = (_x, _y) n += 1 return table
def _check_for_cues(filename: Path) -> bool: try: with h5py.File(filename) as fh: _ = fh[cue_id_key][1] _ = fh[cue_time_key][1] return True except IndexError: return False
[docs] def assign_files_to_modules( filelist: list[Path], det_config: TristanConfig = "10M", check_for: FileChecker = "events", ): """ Assign each file to the correct module after having checked that it has valid events/cues in it. While the files should be in order i.e. for module 0 we'll have file numbers 000001-000010, for module 1 \ file numbers 000011-000020 and so on, when checking for events an additional check will be done on the \ event id when assigning to each module. Args: filelist (list[Path]): List of input tristan files. det_config (TristanConfig, optional): Specify how many physical modules make up the Tristan \ detector currently in use. Available configurations: 1M, 2M, 10M.\ Defaults to "10M". check_for (FileChecker, optional): Specify whether to check for valid events or cues. \ Defaults to "cues". Returns: files_per_module(dict[str, list(Path)]): List of files assigned to each module. """ MOD = define_modules(det_config) files_per_module = {k: [] for k in MOD.keys()} broken_files = [] match check_for: case FileChecker.EVENTS: for filename in filelist: try: with h5py.File(filename) as fh: x, y = divmod(fh[event_location_key][1], DIV) # Assign file to correct module depending on coordinates. # Should be irrelevant as they go in order but still good to check for now for k, v in MOD.items(): if v[1][0] <= x <= v[1][1]: if v[0][0] <= y <= v[0][1]: files_per_module[k].append(filename) except IndexError: broken_files.append(filename) case FileChecker.CUES: # NOTE This is to check for triggers when dealing with dark field collections which # sometimes have datasets with no events. for filename in filelist: has_cues = _check_for_cues(filename) if has_cues: # Assign to module based on filename filenum = int(filename.stem.split("_")[-1]) - 1 mod_num = str(filenum // int(det_config.strip("M"))) files_per_module[mod_num].append(filename) else: broken_files.append(filename) return files_per_module, broken_files
[docs] def find_shutter_times(filelist): """Find shutter open and close timestamps.""" sh_open = [] sh_close = [] for filename in filelist: with h5py.File(filename) as fh: cues = fh[cue_id_key][()] cues_time = fh[cue_time_key] op_idx = np.where(cues == shutter_open)[0] cl_idx = np.where(cues == shutter_close)[0] if len(op_idx) == 1: sh_open.append(cues_time[op_idx[0]] * TIME_RES) if len(cl_idx) == 1: sh_close.append(cues_time[cl_idx[0]] * TIME_RES) return sh_open[0], sh_close[0]