""" Provide classes and functions for reading and writing HDF files. """ # ----------------------------------------------------------------------------- # IMPORTS # ----------------------------------------------------------------------------- from __future__ import print_function import numpy as np import h5py import os import sys from pycbc.catalog import Catalog from pycbc.types.timeseries import TimeSeries from lal import LIGOTimeGPS # ----------------------------------------------------------------------------- # FUNCTION DEFINITIONS # ----------------------------------------------------------------------------- def get_file_paths(directory, extensions=None): """ Take a directory and return the paths to all files in this directory and its subdirectories. Optionally filter out only files specific extensions. Args: directory (str): Path to a directory. extensions (list): List of allowed file extensions, for example: `['hdf', 'h5']`. Returns: List of paths of all files matching the above descriptions. """ file_paths = [] # Walk over the directory and find all files for path, dirs, files in os.walk(directory): for f in files: file_paths.append(os.path.join(path, f)) # If a list of extensions is provided, only keep the corresponding files if extensions is not None: file_paths = [_ for _ in file_paths if any([_.endswith(ext) for ext in extensions])] return file_paths # ----------------------------------------------------------------------------- def get_strain_from_hdf_file(hdf_file_paths, gps_time, interval_width, original_sampling_rate=4096, target_sampling_rate=4096, as_pycbc_timeseries=False): """ For a given `gps_time`, select the interval of length `interval_width` (centered around `gps_time`) from the HDF files specified in `hdf_file_paths`, and resample them to the given `target_sampling_rate`. Args: hdf_file_paths (dict): A dictionary with keys `{'H1', 'L1'}`, which holds the paths to the HDF files containing the interval around `gps_time`. gps_time (int): A (valid) background noise time (GPS timestamp). interval_width (int): The length of the strain sample (in seconds) to be selected from the HDF files. original_sampling_rate (int): The original sampling rate (in Hertz) of the HDF files sample. Default is 4096. target_sampling_rate (int): The sampling rate (in Hertz) to which the strain should be down-sampled (if desired). Must be a divisor of the `original_sampling_rate`. as_pycbc_timeseries (bool): Whether to return the strain as a dict of numpy arrays or as a dict of objects of type `pycbc.types.timeseries.TimeSeries`. Returns: A dictionary with keys `{'H1', 'L1'}`. For each key, the dictionary contains a strain sample (as a numpy array) of the given length, centered around `gps_time`, (down)-sampled to the desired `target_sampling_rate`. """ # ------------------------------------------------------------------------- # Perform some basic sanity checks on the arguments # ------------------------------------------------------------------------- assert isinstance(gps_time, int), \ 'time is not an integer!' assert isinstance(interval_width, int), \ 'interval_width is not an integer' assert isinstance(original_sampling_rate, int), \ 'original_sampling_rate is not an integer' assert isinstance(target_sampling_rate, int), \ 'target_sampling_rate is not an integer' assert original_sampling_rate % target_sampling_rate == 0, \ 'Invalid target_sampling_rate: Not a divisor of ' \ 'original_sampling_rate!' # ------------------------------------------------------------------------- # Read out the strain from the HDF files # ------------------------------------------------------------------------- # Compute the offset = half the interval width (intervals are centered # around the given gps_time) offset = int(interval_width / 2) # Compute the resampling factor sampling_factor = int(original_sampling_rate / target_sampling_rate) # Store the sample we have selected from the HDF files sample = dict() # Loop over both detectors for detector in ('H1', 'L1'): # Extract the path to the HDF file file_path = hdf_file_paths[detector] # Read in the HDF file and select the noise sample with h5py.File(file_path, 'r') as hdf_file: # Get the start_time and compute array indices start_time = int(hdf_file['meta']['GPSstart'][()]) start_idx = \ (gps_time - start_time - offset) * original_sampling_rate end_idx = \ (gps_time - start_time + offset) * original_sampling_rate # Select the sample from the strain strain = np.array(hdf_file['strain']['Strain']) sample[detector] = strain[start_idx:end_idx] # Down-sample the selected sample to the target_sampling_rate sample[detector] = sample[detector][::sampling_factor] # ------------------------------------------------------------------------- # Convert to PyCBC time series, if necessary # ------------------------------------------------------------------------- # If we just want a plain numpy array, we can return it right away if not as_pycbc_timeseries: return sample # Otherwise we need to convert the numpy array to a time series first else: # Initialize an empty dict for the time series results timeseries = dict() # Convert strain of both detectors to a TimeSeries object for detector in ('H1', 'L1'): timeseries[detector] = \ TimeSeries(initial_array=sample[detector], delta_t=1.0/target_sampling_rate, epoch=LIGOTimeGPS(gps_time - offset)) return timeseries # ----------------------------------------------------------------------------- # CLASS DEFINITIONS # ----------------------------------------------------------------------------- class NoiseTimeline: """ A ``NoiseTimeline`` object stores information about the data quality and hardware injection flags of the files in the given `background_data_directory`. This is information is read in only once at the beginning of the sample generation and can then be utilized to quickly sample "valid" noise times, that is, GPS times where the files in `background_data_directory` provide data which pass certain desired quality criteria. Args: background_data_directory (str): Path to the directory which contains the raw data (HDF files). These files may also be distributed over several subdirectories. random_seed (int): Seed for the random number generator which is used for sampling valid noise times. verbose (bool): Whether or not this instance should print logging information to the command line. """ def __init__(self, background_data_directory, random_seed=42, verbose=False): # Store the directory and sampling rate of the raw HDF files self.background_data_directory = background_data_directory # Print debug messages or not? self.verbose = verbose # Create a new random number generator with the given seed to # decouple this from the global numpy RNG (for reproducibility) self.rng = np.random.RandomState(seed=random_seed) # Get the list of all HDF files in the specified directory self.vprint('Getting HDF file paths...', end=' ') self.hdf_file_paths = get_file_paths(self.background_data_directory, extensions=['hdf', 'h5']) self.vprint('Done!') # Read in the meta information and masks from HDF files self.vprint('Reading information from HDF files', end=' ') self.hdf_files = self._get_hdf_files() self.vprint('Done!') # Build the timeline for these HDF files self.vprint('Building timeline object...', end=' ') self.timeline = self._build_timeline() self.vprint('Done!') # ------------------------------------------------------------------------- def vprint(self, string, *args, **kwargs): """ Verbose printing: Wrapper around `print()` to only call it if `self.verbose` is set to true. Args: string (str): String to be printed if `self.verbose` is `True`. *args: Arguments passed to `print()`. **kwargs: Keyword arguments passed to `print()`. """ if self.verbose: print(string, *args, **kwargs) sys.stdout.flush() # ------------------------------------------------------------------------- def _get_hdf_files(self): # Keep track of all the files whose information we need to store hdf_files = [] # Open every HDF file once to read in the meta information as well # as the injection and data quality (DQ) masks n_files = len(self.hdf_file_paths) for i, hdf_file_path in enumerate(self.hdf_file_paths): with h5py.File(hdf_file_path, 'r') as f: self.vprint('({:>4}/{:>4})...'.format(i, n_files), end=' ') # Select necessary information from the HDF file start_time = f['meta']['GPSstart'][()] detector = f['meta']['Detector'][()].decode('utf-8') duration = f['meta']['Duration'][()] inj_mask = np.array(f['quality']['injections']['Injmask'], dtype=np.int32) dq_mask = np.array(f['quality']['simple']['DQmask'], dtype=np.int32) # Perform some basic sanity checks assert detector in ['H1', 'L1'], \ 'Invalid detector {}!'.format(detector) assert duration == len(inj_mask) == len(dq_mask), \ 'Length of InjMask or DQMask does not match the duration!' # Collect this information in a dict hdf_files.append(dict(file_path=hdf_file_path, start_time=start_time, detector=detector, duration=duration, inj_mask=inj_mask, dq_mask=dq_mask)) self.vprint('\033[15D\033[K', end='') # Sort the read in HDF files by start time and return them self.vprint('({:>4}/{:>4})...'.format(n_files, n_files), end=' ') return sorted(hdf_files, key=lambda _: _['start_time']) # ------------------------------------------------------------------------- def _build_timeline(self): # Get the size of the arrays that we need to initialize n_entries = self.gps_end_time - self.gps_start_time # Initialize the empty timeline timeline = dict(h1_inj_mask=np.zeros(n_entries, dtype=np.int32), l1_inj_mask=np.zeros(n_entries, dtype=np.int32), h1_dq_mask=np.zeros(n_entries, dtype=np.int32), l1_dq_mask=np.zeros(n_entries, dtype=np.int32)) # Add information from HDF files to timeline for hdf_file in self.hdf_files: # Define some shortcuts detector = hdf_file['detector'] dq_mask = hdf_file['dq_mask'] inj_mask = hdf_file['inj_mask'] # Map start/end from GPS time to array indices idx_start = hdf_file['start_time'] - self.gps_start_time idx_end = idx_start + hdf_file['duration'] # Add the mask information to the correct detector if detector == 'H1': timeline['h1_inj_mask'][idx_start:idx_end] = inj_mask timeline['h1_dq_mask'][idx_start:idx_end] = dq_mask else: timeline['l1_inj_mask'][idx_start:idx_end] = inj_mask timeline['l1_dq_mask'][idx_start:idx_end] = dq_mask # Return the completed timeline return timeline # ------------------------------------------------------------------------- def is_valid(self, gps_time, delta_t=16, dq_bits=(0, 1, 2, 3), inj_bits=(0, 1, 2, 4)): """ For a given `gps_time`, check if is a valid time to sample noise from by checking if all data points in the interval `[gps_time - delta_t, gps_time + delta_t]` have the specified `dq_bits` and `inj_bits` set. .. seealso:: For more information about the `dq_bits` and `inj_bits`, check out the website of the GW Open Science Center, which explains these for the case of O1: https://www.gw-openscience.org/archive/dataset/O1 Args: gps_time (int): The GPS time whose validity we are checking. delta_t (int): The number of seconds around `gps_time` which we also want to be valid (because the sample will be an interval). dq_bits (tuple): The Data Quality Bits which one would like to require (see note above). *For example:* `dq_bits=(0, 1, 2, 3)` means that the data quality needs to pass all tests up to `CAT3`. inj_bits (tuple): The Injection Bits which one would like to require (see note above). *For example:* `inj_bits=(0, 1, 2, 4)` means that only continuous wave (CW) injections are permitted; all recordings containing any of other type of injection will be invalid for sampling. Returns: `True` if `gps_time` is valid, otherwise `False`. """ # --------------------------------------------------------------------- # Perform some basic sanity checks # --------------------------------------------------------------------- assert isinstance(gps_time, int), \ 'Received GPS time that is not an integer!' assert delta_t >= 0, \ 'Received an invalid value for delta_t!' assert set(dq_bits).issubset(set(range(7))), \ 'Invalid Data Quality bit specification passed to is_valid()!' assert set(inj_bits).issubset(set(range(5))), \ 'Invalid Injection bit specification passed to is_valid()!' # --------------------------------------------------------------------- # Check if given time is too close to a real event # --------------------------------------------------------------------- # Get GPS times of all confirmed mergers catalog = Catalog() real_event_times = [catalog.mergers[_].time for _ in catalog.names] # Check if gps_time is too close to any of these times if any(abs(gps_time - _) <= delta_t for _ in real_event_times): return False # --------------------------------------------------------------------- # Check if the given time is too close to the edge within its HDF file # --------------------------------------------------------------------- # Loop over all HDF files to find the one that contains the given # gps_time. Here, we do not distinguish between H1 and L1, because # we assume that the files for the detectors are aligned on a grid. for hdf_file in self.hdf_files: # Get the start and end time for the current HDF file start_time = hdf_file['start_time'] end_time = start_time + hdf_file['duration'] # Find the file that contains the given gps_time if start_time < gps_time < end_time: # Check if it is far away enough from the edges: If not, it # is not a valid time; otherwise we can still stop searching if not start_time + delta_t < gps_time < end_time - delta_t: return False else: break # --------------------------------------------------------------------- # Select the environment around the specified time # --------------------------------------------------------------------- # Map time to indices idx_start = self.gps2idx(gps_time) - delta_t idx_end = self.gps2idx(gps_time) + delta_t # Select the mask intervals environment = \ dict(h1_inj_mask=self.timeline['h1_inj_mask'][idx_start:idx_end], l1_inj_mask=self.timeline['l1_inj_mask'][idx_start:idx_end], h1_dq_mask=self.timeline['h1_dq_mask'][idx_start:idx_end], l1_dq_mask=self.timeline['l1_dq_mask'][idx_start:idx_end]) # --------------------------------------------------------------------- # Data Quality Check # --------------------------------------------------------------------- # Compute the minimum data quality min_dq = sum([2**i for i in dq_bits]) # Perform the DQ check for H1 environment['h1_dq_mask'] = environment['h1_dq_mask'] > min_dq if not np.all(environment['h1_dq_mask']): return False # Perform the DQ check for L1 environment['l1_dq_mask'] = environment['l1_dq_mask'] > min_dq if not np.all(environment['l1_dq_mask']): return False # --------------------------------------------------------------------- # Injection Check # --------------------------------------------------------------------- # Define an array of ones that matches the length of the environment. # This is needed because for a given number N, we can check if the # K-th bit is set by evaluating the expression: N & (1 << K) ones = np.ones(2 * delta_t, dtype=np.int32) # For each requested injection bit, check if it is set for the whole # environment (for both H1 and L1) for i in inj_bits: # Perform the injection check for H1 if not np.all(np.bitwise_and(environment['h1_inj_mask'], np.left_shift(ones, i))): return False # Perform the injection check for L1 if not np.all(np.bitwise_and(environment['l1_inj_mask'], np.left_shift(ones, i))): return False # If we have not returned False yet, the time must be valid! return True # ------------------------------------------------------------------------- def sample(self, delta_t=16, dq_bits=(0, 1, 2, 3), inj_bits=(0, 1, 2, 4), return_paths=False): """ Randomly sample a time from `[gps_start_time, gps_end_time]` which passes the :func:`NoiseTimeline.is_valid()` test. Args: delta_t (int): For an explanation, see :func:`NoiseTimeline.is_valid()`. dq_bits (tuple): For an explanation, see :func:`NoiseTimeline.is_valid()`. inj_bits (tuple): For an explanation, see :func:`NoiseTimeline.is_valid()`. return_paths (bool): Whether or not to return the paths to the HDF files containing the `gps_time`. Returns: A valid GPS time and optionally a `dict` with the file paths to the HDF files containing that GPS time (keys will correspond to the different detectors). """ # Keep sampling random times until we find a valid one... while True: # Randomly choose a GPS time between the start and end gps_time = self.rng.randint(self.gps_start_time + delta_t, self.gps_end_time - delta_t) # If it is a valid time, return it if self.is_valid(gps_time=gps_time, delta_t=delta_t, dq_bits=dq_bits, inj_bits=inj_bits): if return_paths: return gps_time, self.get_file_paths_for_time(gps_time) else: return gps_time # ------------------------------------------------------------------------- def get_file_paths_for_time(self, gps_time): """ For a given (valid) GPS time, find the two HDF files (for the two detectors H1 and L1) which contain the corresponding strain. Args: gps_time (int): A valid GPS time stamp. Returns: A dictionary with keys `{'H1', 'L1'}` containing the paths to the HDF files, or None if no such files could be found. """ # Keep track of the results, i.e., the paths to the HDF files result = dict() # Loop over all HDF files to find the ones containing the given time for hdf_file in self.hdf_files: # Get the start and end time for the current HDF file start_time = hdf_file['start_time'] end_time = start_time + hdf_file['duration'] # Check if the given GPS time falls into the interval of the # current HDF file, and if so, store the file path for it if start_time < gps_time < end_time: result[hdf_file['detector']] = hdf_file['file_path'] # If both files were found, we are done! if 'H1' in result.keys() and 'L1' in result.keys(): return result # If we didn't both files, return None return None # ------------------------------------------------------------------------- def idx2gps(self, idx): """ Map an index to a GPS time by correcting for the start time of the observation run, as determined from the HDF files. Args: idx (int): An index of a time series array (covering an observation run). Returns: The corresponding GPS time. """ return idx + self.gps_start_time # ------------------------------------------------------------------------- def gps2idx(self, gps): """ Map an GPS time to an index by correcting for the start time of the observation run, as determined from the HDF files. Args: gps (int): A GPS time belonging to a point in time between the start and end of an observation run. Returns: The corresponding time series index. """ return gps - self.gps_start_time # ------------------------------------------------------------------------- @property def gps_start_time(self): """ The GPS start time of the observation run. """ return self.hdf_files[0]['start_time'] # ------------------------------------------------------------------------- @property def gps_end_time(self): """ The GPS end time of the observation run. """ return self.hdf_files[-1]['start_time'] + \ self.hdf_files[-1]['duration']