Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
Yifan Wang
Summer-MachineLearning
Commits
a4651ce2
Commit
a4651ce2
authored
Jul 16, 2021
by
Simran Dave
Browse files
Upload New File
parent
cd4af194
Changes
1
Hide whitespace changes
Inline
Side-by-side
generate_sample.py
0 → 100644
View file @
a4651ce2
"""
The "main script" of this repository: Read in a configuration file and
generate synthetic GW data according to the provided specifications.
"""
# -----------------------------------------------------------------------------
# IMPORTS
# -----------------------------------------------------------------------------
from
__future__
import
print_function
import
argparse
import
numpy
as
np
import
os
import
sys
import
time
import
io
from
itertools
import
count
from
multiprocessing
import
Process
,
Queue
from
tqdm
import
tqdm
from
utils.configfiles
import
read_ini_config
,
read_json_config
from
utils.hdffiles
import
NoiseTimeline
from
utils.samplefiles
import
SampleFile
from
utils.samplegeneration
import
generate_sample
from
utils.waveforms
import
WaveformParameterGenerator
# -----------------------------------------------------------------------------
# FUNCTION DEFINITIONS
# -----------------------------------------------------------------------------
"""
def queue_worker(arguments, results_queue):
Helper function to generate a single sample in a dedicated process.
Args:
arguments (dict): Dictionary containing the arguments that are
passed to generate_sample().
results_queue (Queue): The queue to which the results of this
worker / process are passed.
# Try to generate a sample using the given arguments and store the result
# in the given result_queue (which is shared across all worker processes).
try:
result = generate_sample(**arguments)
results_queue.put(result)
sys.exit(0)
# For some arguments, LALSuite crashes during the sample generation.
# In this case, terminate with a non-zero exit code to make sure a new
# set of argument is added to the main arguments_queue
except RuntimeError:
sys.exit('Runtime Error')
"""
# -----------------------------------------------------------------------------
# MAIN CODE
# -----------------------------------------------------------------------------
if
__name__
==
'__main__'
:
# -------------------------------------------------------------------------
# Preliminaries
# -------------------------------------------------------------------------
# Disable output buffering ('flush' option is not available for Python 2)
sys
.
stdout
=
os
.
fdopen
(
sys
.
stdout
.
fileno
(),
'wb'
,
0
)
#only works with wb not w
# Start the stopwatch
script_start
=
time
.
time
()
#print('')
print
(
'GENERATE A GW DATA SAMPLE FILE'
,
flush
=
True
)
#print('')
# -------------------------------------------------------------------------
# Parse the command line arguments
# -------------------------------------------------------------------------
# Set up the parser and add arguments
parser
=
argparse
.
ArgumentParser
(
description
=
'Generate a GW data sample.'
)
parser
.
add_argument
(
'--config-file'
,
help
=
'Name of the JSON configuration file which '
'controls the sample generation process.'
,
default
=
'default.json'
)
# Parse the arguments that were passed when calling this script
print
(
'Parsing command line arguments...'
,
end
=
' '
,
flush
=
True
)
command_line_arguments
=
vars
(
parser
.
parse_args
())
print
(
'Done!'
,
flush
=
True
)
# -------------------------------------------------------------------------
# Read in JSON config file specifying the sample generation process
# -------------------------------------------------------------------------
# Build the full path to the config file
json_config_name
=
command_line_arguments
[
'config_file'
]
json_config_path
=
os
.
path
.
join
(
'.'
,
'config_files'
,
json_config_name
)
# Read the JSON configuration into a dict
#print('Reading and validating in JSON configuration file...', end=' ')
config
=
read_json_config
(
json_config_path
)
print
(
'Done!'
,
flush
=
True
)
# -------------------------------------------------------------------------
# Read in INI config file specifying the static_args and variable_args
# -------------------------------------------------------------------------
# Build the full path to the waveform params file
ini_config_name
=
config
[
'waveform_params_file_name'
]
ini_config_path
=
os
.
path
.
join
(
'.'
,
'config_files'
,
ini_config_name
)
# Read in the variable_arguments and static_arguments
print
(
'Reading and validating in INI configuration file...'
,
end
=
' '
,
flush
=
True
)
variable_arguments
,
static_arguments
=
read_ini_config
(
ini_config_path
)
print
(
'Done!
\n
'
,
flush
=
True
)
# -------------------------------------------------------------------------
# Shortcuts and random seed
# -------------------------------------------------------------------------
# Set the random seed for this script
np
.
random
.
seed
(
config
[
'random_seed'
])
# Define some useful shortcuts
random_seed
=
config
[
'random_seed'
]
max_runtime
=
config
[
'max_runtime'
]
bkg_data_dir
=
config
[
'background_data_directory'
]
# -------------------------------------------------------------------------
# Construct a generator for sampling waveform parameters
# -------------------------------------------------------------------------
# Initialize a waveform parameter generator that can sample injection
# parameters from the distributions specified in the config file
waveform_parameter_generator
=
\
WaveformParameterGenerator
(
config_file
=
ini_config_path
,
random_seed
=
random_seed
)
# Wrap it in a generator expression so that we can we can easily sample
# from it by calling next(waveform_parameters)
waveform_parameters
=
\
(
waveform_parameter_generator
.
draw
()
for
_
in
iter
(
int
,
1
))
# -------------------------------------------------------------------------
# Construct a generator for sampling valid noise times
# -------------------------------------------------------------------------
# If the 'background_data_directory' is None, we will use synthetic noise
if
config
[
'background_data_directory'
]
is
None
:
print
(
'Using synthetic noise! (background_data_directory = None)
\n
'
,
flush
=
True
)
# Create a iterator that returns a fake "event time", which we will
# use as a seed for the RNG to ensure the reproducibility of the
# generated synthetic noise.
# For the HDF file path that contains that time, we always yield
# None, so that we know that we need to generate synthetic noise.
noise_times
=
((
1000000000
+
_
,
None
)
for
_
in
count
())
# Otherwise, we set up a timeline object for the background noise, that
# is, we read in all HDF files in the raw_data_directory and figure out
# which parts of it are useable (i.e., have the right data quality and
# injection bits set as specified in the config file).
else
:
print
(
'Using real noise from LIGO recordings! '
'(background_data_directory = {})'
.
format
(
bkg_data_dir
),
flush
=
True
)
print
(
'Reading in raw data. This may take several minutes...'
,
end
=
' '
,
flush
=
True
)
# Create a timeline object by running over all HDF files once
noise_timeline
=
NoiseTimeline
(
background_data_directory
=
bkg_data_dir
,
random_seed
=
random_seed
)
# Create a noise time generator so that can sample valid noise times
# simply by calling next(noise_time_generator)
delta_t
=
int
(
static_arguments
[
'noise_interval_width'
]
/
2
)
noise_times
=
(
noise_timeline
.
sample
(
delta_t
=
delta_t
,
dq_bits
=
config
[
'dq_bits'
],
inj_bits
=
config
[
'inj_bits'
],
return_paths
=
True
)
for
_
in
iter
(
int
,
1
))
print
(
'Done!
\n
'
)
# -------------------------------------------------------------------------
# Define a convenience function to generate arguments for the simulation
# -------------------------------------------------------------------------
def
generate_arguments
(
injection
=
True
):
# Only sample waveform parameters if we are making an injection
waveform_params
=
next
(
waveform_parameters
)
if
injection
else
None
# Return all necessary arguments as a dictionary
return
dict
(
static_arguments
=
static_arguments
,
event_tuple
=
next
(
noise_times
),
waveform_params
=
waveform_params
)
# -------------------------------------------------------------------------
# Finally: Create our samples!
# -------------------------------------------------------------------------
# Keep track of all the samples (and parameters) we have generated
samples
=
dict
(
injection_samples
=
[],
noise_samples
=
[])
injection_parameters
=
dict
(
injection_samples
=
[],
noise_samples
=
[])
print
(
'samples='
,
samples
)
# The procedure for generating samples with and without injections is
# mostly the same; the only real difference is which arguments_generator
# we have have to use:
for
sample_type
in
(
'injection_samples'
,
'noise_samples'
):
# ---------------------------------------------------------------------
# Define some sample_type-specific shortcuts
# ---------------------------------------------------------------------
if
sample_type
==
'injection_samples'
:
print
(
'Generating samples containing an injection...'
,
flush
=
True
)
n_samples
=
config
[
'n_injection_samples'
]
arguments_generator
=
\
(
generate_arguments
(
injection
=
True
)
for
_
in
iter
(
int
,
1
))
else
:
print
(
'Generating samples *not* containing an injection...'
,
flush
=
True
)
n_samples
=
config
[
'n_noise_samples'
]
arguments_generator
=
\
(
generate_arguments
(
injection
=
False
)
for
_
in
iter
(
int
,
1
))
# ---------------------------------------------------------------------
# If we do not need to generate any samples, skip ahead:
# ---------------------------------------------------------------------
if
n_samples
==
0
:
print
(
'Done! (n_samples=0)
\n
'
,
flush
=
True
)
continue
# ---------------------------------------------------------------------
# Process results in the results_list (IS THIS BIT NEEDED?)
# ---------------------------------------------------------------------
'''
# Separate the samples and the injection parameters
samples[sample_type], injection_parameters[sample_type] =
\
zip(*results_list)
# Sort all results by the event_time
idx = np.argsort([_['event_time'] for _ in list(samples[sample_type])])
samples[sample_type] =
\
list([samples[sample_type][i] for i in idx])
injection_parameters[sample_type] =
\
list([injection_parameters[sample_type][i] for i in idx])
print('Sample generation completed!
\n
')
'''
# -------------------------------------------------------------------------
# Compute the normalization parameters for this file
# -------------------------------------------------------------------------
print
(
'Computing normalization parameters for sample...'
,
end
=
' '
,
flush
=
True
)
# Gather all samples (with and without injection) in one list
all_samples
=
list
(
samples
[
'injection_samples'
]
+
samples
[
'noise_samples'
])
print
(
'all samples='
,
all_samples
)
# Group all samples by detector
h1_samples
=
[
_
[
'h1_strain'
]
for
_
in
all_samples
]
l1_samples
=
[
_
[
'l1_strain'
]
for
_
in
all_samples
]
# Stack recordings along first axis
h1_samples
=
np
.
vstack
(
h1_samples
)
l1_samples
=
np
.
vstack
(
l1_samples
)
# Compute the mean and standard deviation for both detectors as the median
# of the means / standard deviations for each sample. This is more robust
# towards outliers than computing "global" parameters by concatenating all
# samples and treating them as a single, long time series.
normalization_parameters
=
\
dict
(
h1_mean
=
np
.
median
(
np
.
mean
(
h1_samples
,
axis
=
1
),
axis
=
0
),
l1_mean
=
np
.
median
(
np
.
mean
(
l1_samples
,
axis
=
1
),
axis
=
0
),
h1_std
=
np
.
median
(
np
.
std
(
h1_samples
,
axis
=
1
),
axis
=
0
),
l1_std
=
np
.
median
(
np
.
std
(
l1_samples
,
axis
=
1
),
axis
=
0
))
print
(
'Done!
\n
'
)
# -------------------------------------------------------------------------
# Create a SampleFile dict from list of samples and save it as an HDF file
# -------------------------------------------------------------------------
print
(
'Saving the results to HDF file ...'
,
end
=
' '
,
flush
=
True
)
# Initialize the dictionary that we use to create a SampleFile object
sample_file_dict
=
dict
(
command_line_arguments
=
command_line_arguments
,
injection_parameters
=
dict
(),
injection_samples
=
dict
(),
noise_samples
=
dict
(),
normalization_parameters
=
normalization_parameters
,
static_arguments
=
static_arguments
)
# Collect and add samples (with and without injection)
for
sample_type
in
(
'injection_samples'
,
'noise_samples'
):
for
key
in
(
'event_time'
,
'h1_strain'
,
'l1_strain'
):
if
samples
[
sample_type
]:
value
=
np
.
array
([
_
[
key
]
for
_
in
list
(
samples
[
sample_type
])])
else
:
value
=
None
sample_file_dict
[
sample_type
][
key
]
=
value
# Collect and add injection_parameters (ignore noise samples here, because
# for those, the injection_parameters are always None)
other_keys
=
[
'h1_signal'
,
'h1_output_signal'
,
'h1_snr'
,
'l1_signal'
,
'l1_output_signal'
,
'l1_snr'
,
'scale_factor'
]
for
key
in
list
(
variable_arguments
+
other_keys
):
if
injection_parameters
[
'injection_samples'
]:
value
=
np
.
array
([
_
[
key
]
for
_
in
injection_parameters
[
'injection_samples'
]])
else
:
value
=
None
sample_file_dict
[
'injection_parameters'
][
key
]
=
value
# Construct the path for the output HDF file
output_dir
=
os
.
path
.
join
(
'.'
,
'output'
)
if
not
os
.
path
.
exists
(
output_dir
):
os
.
mkdir
(
output_dir
)
sample_file_path
=
os
.
path
.
join
(
output_dir
,
config
[
'output_file_name'
])
# Create the SampleFile object and save it to the specified output file
sample_file
=
SampleFile
(
data
=
sample_file_dict
)
sample_file
.
to_hdf
(
file_path
=
sample_file_path
)
print
(
'Done!'
,
flush
=
True
)
# Get file size in MB and print the result
sample_file_size
=
os
.
path
.
getsize
(
sample_file_path
)
/
1024
**
2
print
(
'Size of resulting HDF file: {:.2f}MB'
.
format
(
sample_file_size
),
flush
=
True
)
#print('')
# -------------------------------------------------------------------------
# Postliminaries
# -------------------------------------------------------------------------
# PyCBC always create a copy of the waveform parameters file, which we
# can delete at the end of the sample generation process
duplicate_path
=
os
.
path
.
join
(
'.'
,
config
[
'waveform_params_file_name'
])
if
os
.
path
.
exists
(
duplicate_path
):
os
.
remove
(
duplicate_path
)
# Print the total run time
print
(
'Total runtime: {:.1f} seconds!'
.
format
(
time
.
time
()
-
script_start
))
#print('')
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment