Skip to content
Snippets Groups Projects
Commit ea69739b authored by Yifan Wang's avatar Yifan Wang
Browse files

transplant samples generation to python3

parent 97346f8e
Branches
No related tags found
No related merge requests found
Showing
with 2116 additions and 0 deletions
File added
File added
{
"random_seed": 42,
"background_data_directory": null,
"dq_bits": [0, 1, 2, 3],
"inj_bits": [0, 1, 2, 4],
"waveform_params_file_name": "waveform_params.ini",
"max_runtime": 60,
"n_injection_samples": 32,
"n_noise_samples": 16,
"n_processes": 4,
"output_file_name": "default.hdf"
}
; -----------------------------------------------------------------------------
; DECLARE ARGUMENTS
; -----------------------------------------------------------------------------
[variable_args]
; Waveform parameters that will vary in MCMC
mass1 =
mass2 =
spin1z =
spin2z =
ra =
dec =
coa_phase =
inclination =
polarization =
injection_snr =
[static_args]
; Waveform parameters that will not change in MCMC
approximant = SEOBNRv4
domain = time
f_lower = 18
distance = 100
waveform_length = 128
; Width of the background noise interval (in seconds) around the event_time,
; which is used to make the injection. Should be larger than (see below):
; sample_length = seconds_before_event + seconds_after_event
; because we need to crop off the edges that are corrupted by the whitening.
noise_interval_width = 16
; original_sampling_rate = Sampling rate of raw HDF files (usually 4096 Hz)
; target_sampling_rate = Desired sampling rate for sample generation output
original_sampling_rate = 4096
target_sampling_rate = 2048
; Define parameters for the whitening procedure. See documentation of the
; pycbc.types.TimeSeries.whiten() method for an explanation of what these
; values exactly mean.
whitening_segment_duration = 4
whitening_max_filter_duration = 4
; Define the lower and upper bound for the bandpass filter (in Hertz)
bandpass_lower = 20
bandpass_upper = 2048
; Define how to align the sample around the event time. By convention, the
; event time is the H1 time!
; The sum of these values will be the the sample_length!
seconds_before_event = 5.5
seconds_after_event = 2.5
; alpha for the Tukey window that is used to "fade on" the waveforms
; It represents the fraction of the window inside the cosine tapered region.
; To turn off the "fade on", simply choose tukey_alpha = 0.
tukey_alpha = 0.25
; -----------------------------------------------------------------------------
; DEFINE DISTRIBUTIONS FOR PARAMETERS
; -----------------------------------------------------------------------------
[prior-mass1]
; Prior for mass1
name = uniform
min-mass1 = 10.
max-mass1 = 80.
[prior-mass2]
; Prior for mass2
name = uniform
min-mass2 = 10.
max-mass2 = 80.
[prior-spin1z]
; Prior for spin1z
name = uniform
min-spin1z = 0
max-spin1z = 0.998
[prior-spin2z]
; Prior for spin2z
name = uniform
min-spin2z = 0
max-spin2z = 0.998
[prior-injection_snr]
; Prior for the injection SNR
name = uniform
min-injection_snr = 5
max-injection_snr = 20
[prior-coa_phase]
; Coalescence phase prior
name = uniform_angle
[prior-inclination]
; Inclination prior
name = sin_angle
[prior-ra+dec]
; Sky position prior
name = uniform_sky
[prior-polarization]
; Polarization prior
name = uniform_angle
"""
The "main script" of this repository: Read in a configuration file and
generate synthetic GW data according to the provided specifications.
"""
# -----------------------------------------------------------------------------
# IMPORTS
# -----------------------------------------------------------------------------
from __future__ import print_function
import argparse
import numpy as np
import os
import sys
import time
from itertools import count
from multiprocessing import Process, Queue
from tqdm import tqdm
from utils.configfiles import read_ini_config, read_json_config
from utils.hdffiles import NoiseTimeline
from utils.samplefiles import SampleFile
from utils.samplegeneration import generate_sample
from utils.waveforms import WaveformParameterGenerator
# -----------------------------------------------------------------------------
# MAIN CODE
# -----------------------------------------------------------------------------
if __name__ == '__main__':
# -------------------------------------------------------------------------
# Preliminaries
# -------------------------------------------------------------------------
# Disable output buffering ('flush' option is not available for Python 2)
#sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
# Start the stopwatch
script_start = time.time()
print('')
print('GENERATE A GW DATA SAMPLE FILE')
print('')
# -------------------------------------------------------------------------
# Parse the command line arguments
# -------------------------------------------------------------------------
# Set up the parser and add arguments
parser = argparse.ArgumentParser(description='Generate a GW data sample.')
parser.add_argument('--config-file',
help='Name of the JSON configuration file which '
'controls the sample generation process.',
default='default.json')
# Parse the arguments that were passed when calling this script
print('Parsing command line arguments...', end=' ')
command_line_arguments = vars(parser.parse_args())
print('Done!')
# -------------------------------------------------------------------------
# Read in JSON config file specifying the sample generation process
# -------------------------------------------------------------------------
# Build the full path to the config file
json_config_name = command_line_arguments['config_file']
json_config_path = os.path.join('.', 'config_files', json_config_name)
# Read the JSON configuration into a dict
print('Reading and validating in JSON configuration file...', end=' ')
config = read_json_config(json_config_path)
print('Done!')
# -------------------------------------------------------------------------
# Read in INI config file specifying the static_args and variable_args
# -------------------------------------------------------------------------
# Build the full path to the waveform params file
ini_config_name = config['waveform_params_file_name']
ini_config_path = os.path.join('.', 'config_files', ini_config_name)
# Read in the variable_arguments and static_arguments
print('Reading and validating in INI configuration file...', end=' ')
variable_arguments, static_arguments = read_ini_config(ini_config_path)
print('Done!\n')
# -------------------------------------------------------------------------
# Shortcuts and random seed
# -------------------------------------------------------------------------
# Set the random seed for this script
np.random.seed(config['random_seed'])
# Define some useful shortcuts
random_seed = config['random_seed']
max_runtime = config['max_runtime']
bkg_data_dir = config['background_data_directory']
# -------------------------------------------------------------------------
# Construct a generator for sampling waveform parameters
# -------------------------------------------------------------------------
# Initialize a waveform parameter generator that can sample injection
# parameters from the distributions specified in the config file
waveform_parameter_generator = \
WaveformParameterGenerator(config_file=ini_config_path,
random_seed=random_seed)
# Wrap it in a generator expression so that we can we can easily sample
# from it by calling next(waveform_parameters)
waveform_parameters = \
(waveform_parameter_generator.draw() for _ in iter(int, 1))
# -------------------------------------------------------------------------
# Construct a generator for sampling valid noise times
# -------------------------------------------------------------------------
# If the 'background_data_directory' is None, we will use synthetic noise
if config['background_data_directory'] is None:
print('Using synthetic noise! (background_data_directory = None)\n')
# Create a iterator that returns a fake "event time", which we will
# use as a seed for the RNG to ensure the reproducibility of the
# generated synthetic noise.
# For the HDF file path that contains that time, we always yield
# None, so that we know that we need to generate synthetic noise.
noise_times = ((1000000000 + _, None) for _ in count())
# Otherwise, we set up a timeline object for the background noise, that
# is, we read in all HDF files in the raw_data_directory and figure out
# which parts of it are useable (i.e., have the right data quality and
# injection bits set as specified in the config file).
else:
print('Using real noise from LIGO recordings! '
'(background_data_directory = {})'.format(bkg_data_dir))
print('Reading in raw data. This may take several minutes...', end=' ')
# Create a timeline object by running over all HDF files once
noise_timeline = NoiseTimeline(background_data_directory=bkg_data_dir,
random_seed=random_seed)
# Create a noise time generator so that can sample valid noise times
# simply by calling next(noise_time_generator)
delta_t = int(static_arguments['noise_interval_width'] / 2)
noise_times = (noise_timeline.sample(delta_t=delta_t,
dq_bits=config['dq_bits'],
inj_bits=config['inj_bits'],
return_paths=True)
for _ in iter(int, 1))
print('Done!\n')
# -------------------------------------------------------------------------
# Define a convenience function to generate arguments for the simulation
# -------------------------------------------------------------------------
def generate_arguments(injection=True):
# Only sample waveform parameters if we are making an injection
waveform_params = next(waveform_parameters) if injection else None
# Return all necessary arguments as a dictionary
return dict(static_arguments=static_arguments,
event_tuple=next(noise_times),
waveform_params=waveform_params)
# -------------------------------------------------------------------------
# Finally: Create our samples!
# -------------------------------------------------------------------------
# Keep track of all the samples (and parameters) we have generated
samples = dict(injection_samples=[], noise_samples=[])
injection_parameters = dict(injection_samples=[], noise_samples=[])
print('Generating samples containing an injection...')
n_samples = config['n_injection_samples']
arguments_generator = \
(generate_arguments(injection=True) for _ in iter(int, 1))
print('Number of samples:',n_samples)
sample_type = 'injection_samples'
for i in range(n_samples):
print(i)
results_list = []
arguments = next(arguments_generator)
print(arguments)
result = generate_sample(**arguments)
results_list.append(result)
# ---------------------------------------------------------------------
# Process results in the results_list
# ---------------------------------------------------------------------
# Separate the samples and the injection parameters
samples[sample_type], injection_parameters[sample_type] = \
zip(*results_list)
# Sort all results by the event_time
idx = np.argsort([_['event_time'] for _ in list(samples[sample_type])])
samples[sample_type] = \
list([samples[sample_type][i] for i in idx])
injection_parameters[sample_type] = \
list([injection_parameters[sample_type][i] for i in idx])
print('Sample generation completed!\n')
# -------------------------------------------------------------------------
# Compute the normalization parameters for this file
# -------------------------------------------------------------------------
print('Computing normalization parameters for sample...', end=' ')
# Gather all samples (with and without injection) in one list
all_samples = list(samples['injection_samples'] + samples['noise_samples'])
# Group all samples by detector
h1_samples = [_['h1_strain'] for _ in all_samples]
l1_samples = [_['l1_strain'] for _ in all_samples]
# Stack recordings along first axis
h1_samples = np.vstack(h1_samples)
l1_samples = np.vstack(l1_samples)
# Compute the mean and standard deviation for both detectors as the median
# of the means / standard deviations for each sample. This is more robust
# towards outliers than computing "global" parameters by concatenating all
# samples and treating them as a single, long time series.
normalization_parameters = \
dict(h1_mean=np.median(np.mean(h1_samples, axis=1), axis=0),
l1_mean=np.median(np.mean(l1_samples, axis=1), axis=0),
h1_std=np.median(np.std(h1_samples, axis=1), axis=0),
l1_std=np.median(np.std(l1_samples, axis=1), axis=0))
print('Done!\n')
# -------------------------------------------------------------------------
# Create a SampleFile dict from list of samples and save it as an HDF file
# -------------------------------------------------------------------------
print('Saving the results to HDF file ...', end=' ')
# Initialize the dictionary that we use to create a SampleFile object
sample_file_dict = dict(command_line_arguments=command_line_arguments,
injection_parameters=dict(),
injection_samples=dict(),
noise_samples=dict(),
normalization_parameters=normalization_parameters,
static_arguments=static_arguments)
# Collect and add samples (with and without injection)
for sample_type in ('injection_samples', 'noise_samples'):
for key in ('event_time', 'h1_strain', 'l1_strain'):
if samples[sample_type]:
value = np.array([_[key] for _ in list(samples[sample_type])])
else:
value = None
sample_file_dict[sample_type][key] = value
# Collect and add injection_parameters (ignore noise samples here, because
# for those, the injection_parameters are always None)
other_keys = ['h1_signal', 'h1_output_signal','h1_snr', 'l1_signal','l1_output_signal', 'l1_snr', 'scale_factor']
for key in list(variable_arguments + other_keys):
if injection_parameters['injection_samples']:
value = np.array([_[key] for _ in
injection_parameters['injection_samples']])
else:
value = None
sample_file_dict['injection_parameters'][key] = value
# Construct the path for the output HDF file
output_dir = os.path.join('.', 'output')
if not os.path.exists(output_dir):
os.mkdir(output_dir)
sample_file_path = os.path.join(output_dir, config['output_file_name'])
# Create the SampleFile object and save it to the specified output file
sample_file = SampleFile(data=sample_file_dict)
sample_file.to_hdf(file_path=sample_file_path)
print('Done!')
# Get file size in MB and print the result
sample_file_size = os.path.getsize(sample_file_path) / 1024**2
print('Size of resulting HDF file: {:.2f}MB'.format(sample_file_size))
print('')
# -------------------------------------------------------------------------
# Postliminaries
# -------------------------------------------------------------------------
# PyCBC always create a copy of the waveform parameters file, which we
# can delete at the end of the sample generation process
duplicate_path = os.path.join('.', config['waveform_params_file_name'])
if os.path.exists(duplicate_path):
os.remove(duplicate_path)
# Print the total run time
print('Total runtime: {:.1f} seconds!'.format(time.time() - script_start))
print('')
File added
File added
File added
File added
File added
File added
File added
File added
"""
Provide functions for reading and parsing configuration files.
"""
# -----------------------------------------------------------------------------
# IMPORTS
# -----------------------------------------------------------------------------
import json
import os
from pycbc.workflow import WorkflowConfigParser
from pycbc.distributions import read_params_from_config
from .staticargs import amend_static_args, typecast_static_args
# -----------------------------------------------------------------------------
# FUNCTION DEFINITIONS
# -----------------------------------------------------------------------------
def read_ini_config(file_path):
"""
Read in a `*.ini` config file, which is used mostly to specify the
waveform simulation (for example, the waveform model, the parameter
space for the binary black holes, etc.) and return its contents.
Args:
file_path (str): Path to the `*.ini` config file to be read in.
Returns:
A tuple `(variable_arguments, static_arguments)` where
* `variable_arguments` should simply be a list of all the
parameters which get randomly sampled from the specified
distributions, usually using an instance of
:class:`utils.waveforms.WaveformParameterGenerator`.
* `static_arguments` should be a dictionary containing the keys
and values of the parameters that are the same for each
example that is generated (i.e., the non-physical parameters
such as the waveform model and the sampling rate).
"""
# Make sure the config file actually exists
if not os.path.exists(file_path):
raise IOError('Specified configuration file does not exist: '
'{}'.format(file_path))
# Set up a parser for the PyCBC config file
workflow_config_parser = WorkflowConfigParser(configFiles=[file_path])
# Read the variable_arguments and static_arguments using the parser
variable_arguments, static_arguments = \
read_params_from_config(workflow_config_parser)
# Typecast and amend the static arguments
static_arguments = typecast_static_args(static_arguments)
static_arguments = amend_static_args(static_arguments)
return variable_arguments, static_arguments
def read_json_config(file_path):
"""
Read in a `*.json` config file, which is used to specify the
sample generation process itself (for example, the number of
samples to generate, the number of concurrent processes to use,
etc.) and return its contents.
Args:
file_path (str): Path to the `*.json` config file to be read in.
Returns:
A `dict` containing the contents of the given JSON file.
"""
# Make sure the config file actually exists
if not os.path.exists(file_path):
raise IOError('Specified configuration file does not exist: '
'{}'.format(file_path))
# Open the config while and load the JSON contents as a dict
with open(file_path, 'r') as json_file:
config = json.load(json_file)
# Define the required keys for the config file in a set
required_keys = {'background_data_directory', 'dq_bits', 'inj_bits',
'waveform_params_file_name', 'max_runtime',
'n_injection_samples', 'n_noise_samples', 'n_processes',
'random_seed', 'output_file_name'}
# Make sure no required keys are missing
missing_keys = required_keys.difference(set(config.keys()))
if missing_keys:
raise KeyError('Missing required key(s) in JSON configuration file: '
'{}'.format(', '.join(list(missing_keys))))
return config
This diff is collapsed.
"""
Provide a custom ProgressBar class, which provides a wrapper around
an iterable that automatically produces a progressbar when iterating
over it.
"""
# -----------------------------------------------------------------------------
# IMPORTS
# -----------------------------------------------------------------------------
import sys
import time
from threading import Event, Thread
# -----------------------------------------------------------------------------
# CLASS DEFINITIONS
# -----------------------------------------------------------------------------
class RepeatedTimer:
"""
Wrapper class to repeat the given `func` every `interval` seconds
(asynchronously in the background).
Source: https://stackoverflow.com/a/33054922/4100721.
"""
def __init__(self, interval, func, *args, **kwargs):
self.interval = interval
self.func = func
self.args = args
self.kwargs = kwargs
self.start = time.time()
self.event = Event()
self.thread = Thread(target=self._target)
self.thread.start()
def _target(self):
while not self.event.wait(self._time):
self.func(*self.args, **self.kwargs)
@property
def _time(self):
return self.interval - ((time.time() - self.start) % self.interval)
def stop(self):
self.event.set()
self.thread.join()
# -----------------------------------------------------------------------------
class ProgressBar:
"""
:class:`ProgressBar` objects are a custom way to "decorate"
a given iterable to produce a progress bar when looping over it.
This class allows to also produce some output with the progress
bar, such as information about the element of the iterable that is
currently being processed.
Args:
iterable (iterable): The iterable to be "decorated" with
a progressbar.
bar_length (int): Length of the bar itself (in characters).
auto_update (bool): Whether or not to automatically write
the updated progressbar to the command line.
"""
def __init__(self,
iterable,
bar_length=50,
auto_update=False):
self.iterable = iterable
self.max_value = len(iterable)
self.bar_length = bar_length
self.auto_update = auto_update
self.start_time = None
self.last_timediff = None
self.progressbar = self.get_progressbar(-1)
self.extras_ = []
self.scheduler = None
# -------------------------------------------------------------------------
def __iter__(self):
# Start the stop watch as soon as we start iterating
self.start_time = time.time()
# Initialize index to 0 to ensure it is always defined
index = 0
# Start the scheduler that will update the elapsed time every second
def update():
self.progressbar = self.get_progressbar(index)
self.write(extras=self.extras_)
self.scheduler = RepeatedTimer(1, update)
# Actually loop over the iterable
for index, value in enumerate(self.iterable):
# Update the last_timediff, which is used to estimate when we
# will be done
self.last_timediff = self.get_timediff()
# Update the progressbar string
self.progressbar = self.get_progressbar(index)
# If are doing auto-updates (i.e. no extras), we can already
# write the progress bar to stdout
if self.auto_update:
self.write()
# Finally, actually yield the current value of the iterable
yield value
# Update our progress bar string one last time to indicate we have
# made it to 100%
self.progressbar = self.get_progressbar(self.max_value)
# Stop our background scheduler
self.scheduler.stop()
# -------------------------------------------------------------------------
def get_timediff(self):
"""
Returns: Time elapsed since progress bar was instantiated.
"""
if self.start_time is not None:
return time.time() - self.start_time
else:
return None
# -------------------------------------------------------------------------
def get_eta(self,
percent):
"""
Get the estimated time of arrival (ETA) by linear interpolation.
Args:
percent (float): Current progress in percent.
Returns:
Estimated time of arrival in seconds.
"""
if self.last_timediff is not None and percent != 0:
return max(0, self.last_timediff / percent - self.get_timediff())
else:
return None
# -------------------------------------------------------------------------
def get_progressbar(self,
index):
"""
Construct the progressbar itself (bar, ETA, etc.).
Args:
index (int): Current index of the iterable; used to compute
the current progress percentage.
Returns:
A string containing the basic progress bar.
"""
# Construct the actual progress bar
percent = float(index) / self.max_value
bar = '=' * int(round(percent * self.bar_length))
spaces = '-' * (self.bar_length - len(bar))
# Get the elapsed time as a proper string
elapsed_time = self.get_timediff()
if elapsed_time is None:
elapsed_time = 0
elapsed_time = '{:.2f}'.format(elapsed_time)
# Get the expected time of arrival (ETA) as a proper string
eta = self.get_eta(percent)
if eta is None:
eta = '?'
else:
eta = '{:.2f}'.format(eta)
# Construct the actual progress bar string
out = "[{0}] {1:>3}% ({2:>{3}}/{4:>{3}}) | Elapsed: {5} | ETA: {6}"
progressbar = out.format(bar + spaces, round(percent * 100),
index, len(str(self.max_value)),
self.max_value, elapsed_time, eta)
return progressbar
# -------------------------------------------------------------------------
def write(self,
clear_line=False,
extras=()):
"""
Construct the progress bar and write it to the command line.
Args:
clear_line (bool): Whether or not to clear the last line.
extras (list): List of additional outputs (e.g., the file
that is currently being downloaded).
"""
self.extras_ = extras
if extras:
for _ in range(len(extras)):
sys.stdout.write('\r\033[K\033[F')
if clear_line:
sys.stdout.write('\r\033[K\033[F')
# Actually write the finished progress bar to the command line
sys.stdout.write('\r\033[K\033[F')
sys.stdout.write('\r\033[K\033[K')
sys.stdout.write(self.progressbar)
if extras:
sys.stdout.write('\n' + '\n'.join(extras))
if not clear_line:
sys.stdout.write('\n')
sys.stdout.flush()
"""
Provide tools for writing and reading the sample HDF files produced by
the sample generation.
"""
# -----------------------------------------------------------------------------
# IMPORTS
# -----------------------------------------------------------------------------
import numpy as np
import pandas as pd
import h5py
from six import iteritems
from pprint import pformat
from warnings import warn
# -----------------------------------------------------------------------------
# CLASS DEFINITIONS
# -----------------------------------------------------------------------------
class SampleFile:
"""
:class:`SampleFile` objects serve as an abstraction for the result
files of the sample generation.
Args:
data (dict): A dictionary containing the following keys:
.. code-block:: python
{'command_line_arguments', 'static_arguments',
'injection_samples', 'noise_samples',
'injection_parameters', 'normalization_parameters'}
The value for every key must again be a dictionary relating
the names of sample parameters (e.g., 'h1_snr') to a numpy
array containing the values for that parameter.
"""
def __init__(self,
data=None):
# Perform sanity checks on data
self.__check_data(data)
# If we have received data, store it; else initialize an empty dict
if data is not None:
self.data = data
else:
self.data = dict(command_line_arguments=dict(),
static_arguments=dict(),
injection_samples=dict(),
noise_samples=dict(),
injection_parameters=dict(),
normalization_parameters=dict())
# -------------------------------------------------------------------------
@staticmethod
def __check_data(data):
"""
Run some sanity checks on `data`. Raises an assertion error if
the data fail any of these sanity checks.
Args:
data (dict): A dictionary as specified in the ``__init__``
of this class, that is, a dictionary containing the
following keys:
.. code-block:: python
{'command_line_arguments', 'static_arguments',
'injection_samples', 'noise_samples',
'injection_parameters', 'normalization_parameters'}
"""
assert isinstance(data, dict) or data is None, \
'data must be either dict or None!'
if data is not None:
assert 'command_line_arguments' in data.keys(), \
'data must provide key "command_line_arguments"!'
assert 'static_arguments' in data.keys(), \
'data must provide key "static_arguments"!'
assert 'injection_samples' in data.keys(), \
'data must provide key "injection_samples"!'
assert 'noise_samples' in data.keys(), \
'data must provide key "noise_samples"!'
assert 'injection_parameters' in data.keys(), \
'data must provide key "injection_parameters"!'
assert 'normalization_parameters' in data.keys(), \
'data must provide key "normalization_parameters"!'
# -------------------------------------------------------------------------
def __repr__(self):
return pformat(self.data, indent=4)
# -------------------------------------------------------------------------
def __str__(self):
return pformat(self.data, indent=4)
# -------------------------------------------------------------------------
def __getitem__(self, item):
return self.data[item]
# -------------------------------------------------------------------------
def __setitem__(self, key, value):
self.data[key] = value
# -------------------------------------------------------------------------
def read_hdf(self, file_path):
"""
Read in an existing HDF sample file (e.g., to use an instance
of :class:`SampleFile` as a convenience wrapper for accessing
the contents of an HDF samples file).
Args:
file_path (str): The path to the HDF file to be read into
the :class:`SampleFile` object.
"""
# Clear the existing data
self.data = {}
with h5py.File(file_path, 'r') as hdf_file:
# Read in dict with command_line_arguments
self.data['command_line_arguments'] = \
dict(hdf_file['command_line_arguments'].attrs)
self.data['command_line_arguments'] = \
{key: value.decode('ascii') for key, value in
iteritems(self.data['command_line_arguments'])}
# Read in dict with static_arguments
self.data['static_arguments'] = \
dict(hdf_file['static_arguments'].attrs)
self.data['static_arguments'] = \
{key: value.decode('ascii') for key, value in
iteritems(self.data['static_arguments'])}
# Read in group containing injection samples
self.data['injection_samples'] = dict()
for key in ('event_time', 'h1_strain', 'l1_strain'):
try:
self.data['injection_samples'][key] = \
np.array(hdf_file['injection_samples'][key])
except TypeError:
self.data['injection_samples'][key] = np.array(None)
# Read in group containing noise samples
self.data['noise_samples'] = dict()
for key in ('event_time', 'h1_strain', 'l1_strain'):
try:
self.data['noise_samples'][key] = \
np.array(hdf_file['noise_samples'][key])
except TypeError:
self.data['noise_samples'][key] = np.array(None)
# Read in injection parameters
self.data['injection_parameters'] = dict()
for key in hdf_file['/injection_parameters'].keys():
try:
self.data['injection_parameters'][key] = \
np.array(hdf_file['injection_parameters'][key])
except TypeError:
self.data['injection_parameters'][key] = np.array(None)
# Read in dict with normalization parameters
self.data['normalization_parameters'] = \
dict(hdf_file['normalization_parameters'].attrs)
self.data['normalization_parameters'] = \
{key: float(value) for key, value in
iteritems(self.data['normalization_parameters'])}
# -------------------------------------------------------------------------
def to_hdf(self, file_path):
with h5py.File(file_path, 'w') as hdf_file:
# Create group for command_line_arguments and save the values of
# the dict as attributes of the group
group = hdf_file.create_group('command_line_arguments')
for key, value in iteritems(self.data['command_line_arguments']):
group.attrs[key] = str(value)
# Create group for static_arguments and save the values of
# the dict as attributes of the group
group = hdf_file.create_group('static_arguments')
for key, value in iteritems(self.data['static_arguments']):
group.attrs[key] = str(value)
# Create group for injection_samples and save every item of the
# dict as a new dataset
group = hdf_file.create_group('injection_samples')
for key, value in iteritems(self.data['injection_samples']):
dtype = 'float64' if key == 'event_time' else 'float32'
if value is not None:
group.create_dataset(name=key,
shape=value.shape,
dtype=dtype,
data=value)
else:
group.create_dataset(name=key,
shape=None,
dtype=dtype)
# Create group for noise_samples and save every item of the
# dict as a new dataset
group = hdf_file.create_group('noise_samples')
for key, value in iteritems(self.data['noise_samples']):
dtype = 'float64' if key == 'event_time' else 'float32'
if value is not None:
group.create_dataset(name=key,
shape=value.shape,
dtype=dtype,
data=value)
else:
group.create_dataset(name=key,
shape=None,
dtype=dtype)
# Create group for injection_parameters and save every item of the
# dict as a new dataset
group = hdf_file.create_group('injection_parameters')
for key, value in iteritems(self.data['injection_parameters']):
if value is not None:
group.create_dataset(name=key,
shape=value.shape,
dtype='float64',
data=value)
else:
group.create_dataset(name=key,
shape=None,
dtype='float64')
# Create group for normalization_parameters and save every item
# of the dict as a new attribute
group = hdf_file.create_group('normalization_parameters')
for key, value in iteritems(self.data['normalization_parameters']):
group.attrs[key] = float(value)
# -------------------------------------------------------------------------
def as_dataframe(self,
injection_parameters=False,
static_arguments=False,
command_line_arguments=False,
split_injections_noise=False):
"""
Return the contents of the :class:`SampleFile` as a ``pandas``
data frame.
Args:
injection_parameters (bool): Whether or not to return
the `injection parameters` for every sample.
static_arguments (bool): Whether or not to return
the `static_arguments` for every sample.
command_line_arguments (bool): Whether or not to return
the `command_line_arguments` for every sample.
split_injections_noise (bool): If this is set to True, a
separate data frame will be returned for both the
samples with and without an injection.
Returns:
One (or two, if `split_injections_noise` is set to `True`)
pandas data frame containing the sample stored in the
:class:`SampleFile` object.
"""
# Create a data frame for the samples containing an injection
injection_samples = []
if self.data['injection_samples']['event_time'].shape != ():
for i in range(len(self.data['injection_samples']['event_time'])):
_ = {k: v[i] for k, v in
iteritems(self.data['injection_samples'])}
injection_samples.append(_)
df_injection_samples = pd.DataFrame().append(injection_samples,
ignore_index=True,
sort=True)
else:
df_injection_samples = pd.DataFrame()
# Create a data frame for the samples not containing an injection
noise_samples = []
if self.data['noise_samples']['event_time'].shape != ():
for i in range(len(self.data['noise_samples']['event_time'])):
_ = {k: v[i] for k, v in
iteritems(self.data['noise_samples'])}
noise_samples.append(_)
df_noise_samples = pd.DataFrame().append(noise_samples,
ignore_index=True,
sort=True)
else:
df_noise_samples = pd.DataFrame()
# If requested, create a data frame for the injection parameters and
# merge it with the data frame containing the injection samples
if injection_parameters:
injection_params = []
# Check if we even have any injection parameters
if self.data['injection_parameters']['mass1'].shape != ():
for i in range(len(df_injection_samples)):
_ = {k: v[i] for k, v in
iteritems(self.data['injection_parameters'])}
injection_params.append(_)
df_injection_params = pd.DataFrame().append(injection_params,
ignore_index=True,
sort=True)
else:
df_injection_params = pd.DataFrame()
df = pd.concat([df_injection_samples, df_injection_params],
axis=1, sort=True)
else:
df = df_injection_samples
# If requested, add the static_arguments to the data frame
# containing the injections, and a smaller subset of the
# static_arguments also to the data frame containing the noise
# samples (only those arguments that make sense there)
if static_arguments:
for key, value in iteritems(self.data['static_arguments']):
df[key] = value
if key in ('random_seed', 'target_sampling_rate',
'bandpass_lower', 'bandpass_upper',
'seconds_before_event', 'seconds_after_event',
'sample_length'):
df_noise_samples[key] = value
# Merge the data frames for the samples with and without injections
df = df.append(df_noise_samples, ignore_index=True, sort=True)
# If requested, add the command line arguments that were used in the
# creation of the sample file to the combined data frame
if command_line_arguments:
for key, value in iteritems(self.data['command_line_arguments']):
df[key] = value
# Ensure the `event_time` variable is an integer
try:
df['event_time'] = df['event_time'].astype(int)
except KeyError:
warn('\nNo key "event_time": Data frame is probably empty!')
# Either split into two data frames for injection and noise samples
if split_injections_noise:
df_injections = df[df.h1_signal.notnull()]
df_noise = df[~df.h1_signal.notnull()]
return df_injections, df_noise
# Or just return a single data frame containing both types of samples
else:
return df
"""
Provide the :func:`generate_sample()` method, which is at the heart of
the sample generation process.
"""
# -----------------------------------------------------------------------------
# IMPORTS
# -----------------------------------------------------------------------------
from __future__ import print_function
import numpy as np
from lal import LIGOTimeGPS
from pycbc.psd import interpolate
from pycbc.psd.analytical import aLIGOZeroDetHighPower
from pycbc.noise import noise_from_psd
from pycbc.filter import sigma
from .hdffiles import get_strain_from_hdf_file
from .waveforms import get_detector_signals, get_waveform
# -----------------------------------------------------------------------------
# FUNCTION DEFINITIONS
# -----------------------------------------------------------------------------
def generate_sample(static_arguments,
event_tuple,
waveform_params=None):
"""
Generate a single sample (or example) by taking a piece of LIGO
background noise (real or synthetic, depending on `event_tuple`),
optionally injecting a simulated waveform (depending on
`waveform_params`) and post-processing the result (whitening,
band-passing).
Args:
static_arguments (dict): A dictionary containing global
technical parameters for the sample generation, for example
the target_sampling_rate of the output.
event_tuple (tuple): A tuple `(event_time, file_path)`, which
specifies the GPS time at which to make an injection and
the path of the HDF file which contains said GPS time.
If `file_path` is `None`, synthetic noise will be used
instead and the `event_time` only serves as a seed for
the corresponding (random) noise generator.
waveform_params (dict): A dictionary containing the randomly
sampled parameters that are passed as inputs to the
waveform model (e.g., the masses, spins, position, ...).
Returns:
A tuple `(sample, injection_parameters)`, which contains the
generated `sample` itself (a dict with keys `{'event_time',
'h1_strain', 'l1_strain'}`), and the `injection_parameters`,
which are either `None` (in case no injection was made), or a
dict containing the `waveform_params` and some additional
parameters (e.g., single detector SNRs).
"""
# -------------------------------------------------------------------------
# Define shortcuts for some elements of self.static_arguments
# -------------------------------------------------------------------------
# Read out frequency-related arguments
original_sampling_rate = static_arguments['original_sampling_rate']
target_sampling_rate = static_arguments['target_sampling_rate']
f_lower = static_arguments['f_lower']
delta_f = static_arguments['delta_f']
fd_length = static_arguments['fd_length']
# Get the width of the noise sample that we either select from the raw
# HDF files, or generate synthetically
noise_interval_width = static_arguments['noise_interval_width']
# Get how many seconds before and after the event time to use
seconds_before_event = static_arguments['seconds_before_event']
seconds_after_event = static_arguments['seconds_after_event']
# Get the event time and the dict containing the HDF file path
event_time, hdf_file_paths = event_tuple
# -------------------------------------------------------------------------
# Get the background noise (either from data or synthetically)
# -------------------------------------------------------------------------
# If the event_time is None, we generate synthetic noise
if hdf_file_paths is None:
# Create an artificial PSD for the noise
# TODO: Is this the best choice for this task?
psd = aLIGOZeroDetHighPower(length=fd_length,
delta_f=delta_f,
low_freq_cutoff=f_lower)
# Actually generate the noise using the PSD and LALSimulation
noise = dict()
for i, det in enumerate(('H1', 'L1')):
# Compute the length of the noise sample in time steps
noise_length = noise_interval_width * target_sampling_rate
# Generate the noise for this detector
noise[det] = noise_from_psd(length=noise_length,
delta_t=(1.0 / target_sampling_rate),
psd=psd,
seed=(2 * event_time + i))
# Manually fix the noise start time to match the fake event time.
# However, for some reason, the correct setter method seems broken?
start_time = event_time - noise_interval_width / 2
# noinspection PyProtectedMember
noise[det]._epoch = LIGOTimeGPS(start_time)
# Otherwise we select the noise from the corresponding HDF file
else:
kwargs = dict(hdf_file_paths=hdf_file_paths,
gps_time=event_time,
interval_width=noise_interval_width,
original_sampling_rate=original_sampling_rate,
target_sampling_rate=target_sampling_rate,
as_pycbc_timeseries=True)
noise = get_strain_from_hdf_file(**kwargs)
# -------------------------------------------------------------------------
# If applicable, make an injection
# -------------------------------------------------------------------------
# If no waveform parameters are given, we are not making an injection.
# In this case, there are no detector signals and no injection
# parameters, and the strain is simply equal to the noise
if waveform_params is None:
detector_signals = None
injection_parameters = None
strain = noise
# Otherwise, we need to simulate a waveform for the given waveform_params
# and add it into the noise to create the strain
else:
# ---------------------------------------------------------------------
# Simulate the waveform with the given injection parameters
# ---------------------------------------------------------------------
# Actually simulate the waveform with these parameters
waveform = get_waveform(static_arguments=static_arguments,
waveform_params=waveform_params)
# Get the detector signals by projecting on the antenna patterns
detector_signals = \
get_detector_signals(static_arguments=static_arguments,
waveform_params=waveform_params,
event_time=event_time,
waveform=waveform)
# ---------------------------------------------------------------------
# Add the waveform into the noise as is to calculate the NOMF-SNR
# ---------------------------------------------------------------------
# Store the dummy strain, the PSDs and the SNRs for the two detectors
strain_ = {}
psds = {}
snrs = {}
# Calculate these quantities for both detectors
for det in ('H1', 'L1'):
# Add the simulated waveform into the noise to get the dummy strain
strain_[det] = noise[det].add_into(detector_signals[det])
# Estimate the Power Spectral Density from the dummy strain
psds[det] = strain_[det].psd(4)
psds[det] = interpolate(psds[det], delta_f=delta_f)
# Use the PSD estimate to calculate the optimal matched
# filtering SNR for this injection and this detector
snrs[det] = sigma(htilde=detector_signals[det],
psd=psds[det],
low_frequency_cutoff=f_lower)
# Calculate the network optimal matched filtering SNR for this
# injection (which we need for scaling to the chosen injection SNR)
nomf_snr = np.sqrt(snrs['H1']**2 + snrs['L1']**2)
# ---------------------------------------------------------------------
# Add the waveform into the noise with the chosen injection SNR
# ---------------------------------------------------------------------
# Compute the rescaling factor
injection_snr = waveform_params['injection_snr']
scale_factor = 1.0 * injection_snr / nomf_snr
strain = {}
for det in ('H1', 'L1'):
# Add the simulated waveform into the noise, using a scaling
# factor to ensure that the resulting NOMF-SNR equals the chosen
# injection SNR
strain[det] = noise[det].add_into(scale_factor *
detector_signals[det])
# ---------------------------------------------------------------------
# Store some information about the injection we just made
# ---------------------------------------------------------------------
# Store the information we have computed ourselves
injection_parameters = {'scale_factor': scale_factor,
'h1_snr': snrs['H1'],
'l1_snr': snrs['L1']}
# Also add the waveform parameters we have sampled
for key, value in waveform_params.items():
injection_parameters[key] = value
# -------------------------------------------------------------------------
# Whiten and bandpass the strain (also for noise-only samples)
# -------------------------------------------------------------------------
for det in ('H1', 'L1'):
# Get the whitening parameters
segment_duration = static_arguments['whitening_segment_duration']
max_filter_duration = static_arguments['whitening_max_filter_duration']
# Whiten the strain (using the built-in whitening of PyCBC)
# We don't need to remove the corrupted samples here, because we
# crop the strain later on
strain[det] = \
strain[det].whiten(segment_duration=segment_duration,
max_filter_duration=max_filter_duration,
remove_corrupted=False)
# Get the limits for the bandpass
bandpass_lower = static_arguments['bandpass_lower']
bandpass_upper = static_arguments['bandpass_upper']
# Apply a high-pass to remove everything below `bandpass_lower`;
# If bandpass_lower = 0, do not apply any high-pass filter.
if bandpass_lower != 0:
strain[det] = strain[det].highpass_fir(frequency=bandpass_lower,
remove_corrupted=False,
order=512)
# Apply a low-pass filter to remove everything above `bandpass_upper`.
# If bandpass_upper = sampling rate, do not apply any low-pass filter.
if bandpass_upper != target_sampling_rate:
strain[det] = strain[det].lowpass_fir(frequency=bandpass_upper,
remove_corrupted=False,
order=512)
# -------------------------------------------------------------------------
# Cut strain (and signal) time series to the pre-specified length
# -------------------------------------------------------------------------
for det in ('H1', 'L1'):
# Define some shortcuts for slicing
a = event_time - seconds_before_event
b = event_time + seconds_after_event
# Cut the strain to the desired length
strain[det] = strain[det].time_slice(a, b)
# If we've made an injection, also cut the simulated signal
if waveform_params is not None:
# Cut the detector signals to the specified length
detector_signals[det] = detector_signals[det].time_slice(a, b)
# Also add the detector signals to the injection parameters
injection_parameters['h1_signal'] = \
np.array(detector_signals['H1'])
injection_parameters['l1_signal'] = \
np.array(detector_signals['L1'])
# -------------------------------------------------------------------------
# Collect all available information about this sample and return results
# -------------------------------------------------------------------------
# The whitened strain is numerically on the order of O(1), so we can save
# it as a 32-bit float (unlike the original signal, which is down to
# O(10^-{30}) and thus requires 64-bit floats).
sample = {'event_time': event_time,
'h1_strain': np.array(strain['H1']).astype(np.float32),
'l1_strain': np.array(strain['L1']).astype(np.float32)}
return sample, injection_parameters
"""
Provide tools that are needed for amending and typecasting the static
arguments from an `*.ini` configuration file, which controls the
waveform simulation process.
"""
# -----------------------------------------------------------------------------
# IMPORTS
# -----------------------------------------------------------------------------
import copy
# -----------------------------------------------------------------------------
# FUNCTION DEFINITIONS
# -----------------------------------------------------------------------------
def amend_static_args(static_args):
"""
Amend the static_args from the `*.ini` configuration file by adding
the parameters that can be computed directly from others (more
intuitive ones). Note that the static_args should have been
properly typecast first; see :func:`typecast_static_args()`.
Args:
static_args (dict): The static_args dict after it has been
typecast by :func:`typecast_static_args()`.
Returns:
The amended `static_args`, where implicitly defined variables
have been added.
"""
# Create a copy of the original static_args
args = copy.deepcopy(static_args)
# If necessary, compute the sample length
if 'sample_length' not in args.keys():
args['sample_length'] = \
args['seconds_before_event'] + args['seconds_after_event']
# If necessary, add delta_t = 1 / target_sampling_rate
if 'delta_t' not in args.keys():
args['delta_t'] = 1.0 / args['target_sampling_rate']
# If necessary, add delta_f = 1 / waveform_length
if 'delta_f' not in args.keys():
args['delta_f'] = 1.0 / args['waveform_length']
# If necessary, add td_length = waveform_length * target_sampling_rate
if 'td_length' not in args.keys():
args['td_length'] = \
int(args['waveform_length'] * args['target_sampling_rate'])
# If necessary, add fd_length = td_length / 2 + 1
if 'fd_length' not in args.keys():
args['fd_length'] = int(args['td_length'] / 2.0 + 1)
return args
def typecast_static_args(static_args):
"""
Take the `static_args` dictionary as it is read in from the PyCBC
configuration file (i.e., all values are strings) and cast the
values to the correct types (`float` or `int`).
Args:
static_args (dict): The raw `static_args` dictionary as it is
read from the `*.ini` configuration file.
Returns:
The `static_args` dictionary with proper types for all values.
"""
args = copy.deepcopy(static_args)
# Cast variables to integer that need to be integers
args['bandpass_lower'] = int(args['bandpass_lower'])
args['bandpass_upper'] = int(args['bandpass_upper'])
args['waveform_length'] = int(args['waveform_length'])
args['noise_interval_width'] = int(args['noise_interval_width'])
args['original_sampling_rate'] = int(args['original_sampling_rate'])
args['target_sampling_rate'] = int(args['target_sampling_rate'])
args['whitening_segment_duration'] = \
float(args['whitening_segment_duration'])
args['whitening_max_filter_duration'] = \
int(args['whitening_max_filter_duration'])
# Cast variables to float that need to be floats
args['distance'] = float(args['distance'])
args['f_lower'] = float(args['f_lower'])
args['seconds_before_event'] = float(args['seconds_before_event'])
args['seconds_after_event'] = float(args['seconds_after_event'])
return args
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment