Source code for tristan.diagnostics.utils

"""General utilities for the diagnostic tools."""
from __future__ import annotations

import glob
import logging
from pathlib import Path
from typing import Literal, get_args

import h5py
import numpy as np

from ..data import (
    cue_id_key,
    cue_time_key,
    event_location_key,
    shutter_close,
    shutter_open,
)

# Define a logger
logger = logging.getLogger("TristanDiagnostics.Utils")

# Some constants
TIME_RES = 1.5625e-9  # timing resolution fine
DIV = np.uint32(0x2000)

# Tristan 10M specs
TConfig = Literal["1M", "2M", "10M"]
tristan_config = {"10M": (2, 5), "2M": (1, 2), "1M": (1, 1)}  # (H, V) -.> (fast, slow)
mod_size = (515, 2069)  # slow, fast
gap_size = (117, 45)  # slow, fast
image_size = (3043, 4183)  # slow, fast


[docs]def get_full_file_list(filename_template: str | Path) -> list(Path): """Given a template filename, including directory, get a list of all the files\ using that template. Args: filename_template(str | Path): Template to look up in the directory. Returns: file_list(list[Path]): A list of all the files found matching the template. """ if not isinstance(filename_template, Path): filename_template = Path(filename_template) file_list = [ Path(f).expanduser().resolve() for f in sorted(glob.glob(filename_template.as_posix())) ] return file_list
[docs]def define_modules(det_config: TConfig = "10M") -> dict[str, tuple]: """Define the start and end pixel of each module in the Tristan detector. Args: det_config (TConfig, optional): Specify how many physical modules make up the Tristan\ detector currently in use. Available configurations: 1M, 2M, 10M.\ Defaults to "10M". Returns: dict[str, tuple]: Start and end pixel value of each module - which are defined\ by a (x,y) tuple. For example a Tristan 1M will return \ {"0": ([0, 515], [0, 2069])} """ config_opts = get_args(TConfig) if det_config not in config_opts: logger.error(f"Detector configuration {det_config} unknown.") raise ValueError( f"Detector configuration unknown. Please pass one of {config_opts}." ) modules = tristan_config[det_config] mod = {} n = 0 for _y in range(modules[0]): for _x in range(modules[1]): int_x = [ _x * (mod_size[0] + gap_size[0]), _x * (mod_size[0] + gap_size[0]) + mod_size[0], ] int_y = [ _y * (mod_size[1] + gap_size[1]), _y * (mod_size[1] + gap_size[1]) + mod_size[1], ] mod[str(n)] = (int_x, int_y) # mod[(_x, _y)] = (int_x, int_y) n += 1 return mod
[docs]def module_cooordinates(det_config: TConfig = "10M") -> dict[str, tuple]: """ Create a conversion table between module number and its location on the detector. Args: det_config(TConfig, optional): Specify how many physical modules make up the Tristan\ detector currently in use. Available configurations: 1M, 2M, 10M.\ Defaults to "10M". Returns: dict[str, tuple]: effectively a conversion table mapping the module number to its\ location on the detector. For example a Trisstan 1M will return \ {"0": (0, 0)} """ config_opts = get_args(TConfig) if det_config not in config_opts: logger.error(f"Detector configuration {det_config} unknown.") raise ValueError( f"Detector configuration unknown. Please pass one of {config_opts}." ) modules = tristan_config[det_config] table = {} n = 0 for _y in range(modules[0]): for _x in range(modules[1]): table[str(n)] = (_x, _y) n += 1 return table
[docs]def assign_files_to_modules(filelist: list[Path | str], det_config: TConfig = "10M"): MOD = define_modules(det_config) files_per_module = {k: [] for k in MOD.keys()} broken_files = [] for filename in filelist: with h5py.File(filename) as fh: try: x, y = divmod(fh[event_location_key][1], DIV) for k, v in MOD.items(): if v[1][0] <= x <= v[1][1]: if v[0][0] <= y <= v[0][1]: files_per_module[k].append(filename) except IndexError: broken_files.append(filename) return files_per_module, broken_files
[docs]def find_shutter_times(filelist): sh_open = [] sh_close = [] for filename in filelist: with h5py.File(filename) as fh: cues = fh[cue_id_key][()] cues_time = fh[cue_time_key] op_idx = np.where(cues == shutter_open)[0] cl_idx = np.where(cues == shutter_close)[0] if len(op_idx) == 1: sh_open.append(cues_time[op_idx[0]] * TIME_RES) if len(cl_idx) == 1: sh_close.append(cues_time[cl_idx[0]] * TIME_RES) return sh_open[0], sh_close[0]