Commit a1cd52bd authored by Gregory Ashton's avatar Gregory Ashton
Browse files

Adds parallelisation of the artifact SFT generation

Since each SFT is generated in a for loop, the process can be quite
slow. This allows the user with `pathos` multiprocessing installed to
leverage `-N` cores (set by command line arg) to speed the process up
Setting N=1 or if pathos is not installed, reverts to the usual
behaviour.
parent 79d89e06
......@@ -56,6 +56,8 @@ $ git clone git@gitlab.aei.uni-hannover.de:GregAshton/PyFstat.git
provides a useful progress bar and estimate of the remaining run-time.
* [bashplotlib](https://github.com/glamp/bashplotlib), if installed, presents
a histogram of the loaded SFT data
* [pathos](https://pypi.python.org/pypi/pathos), if installed, this provides
support for multiprocessing some functions.
For an introduction to installing modules see
[here](https://docs.python.org/3.5/installing/index.html). If you are using
......
......@@ -45,7 +45,10 @@ def set_up_command_line_arguments():
action="store_true")
parser.add_argument("-u", "--use-old-data", action="store_true")
parser.add_argument('-s', "--setup-only", action="store_true")
parser.add_argument('-n', "--no-template-counting", action="store_true")
parser.add_argument("--no-template-counting", action="store_true")
parser.add_argument(
'-N', type=int, default=3, metavar='N',
help="Number of threads to use when running in parallel")
parser.add_argument('unittest_args', nargs='*')
args, unknown = parser.parse_known_args()
sys.argv[1:] = args.unittest_args
......@@ -209,8 +212,7 @@ def run_commandline(cl, log_level=20, raise_error=True):
universal_newlines=True # properly display linebreaks in error/output printing
)
except subprocess.CalledProcessError as e:
logging.error('Execution failed:')
logging.error(e.output)
logging.log(log_level, 'Execution failed: {}'.format(e.output))
if raise_error:
raise
else:
......
......@@ -4,16 +4,21 @@ import numpy as np
import logging
import os
import glob
import pkgutil
import lal
import lalpulsar
from core import BaseSearchClass, tqdm
from core import BaseSearchClass, tqdm, args
import helper_functions
earth_ephem, sun_ephem = helper_functions.set_up_ephemeris_configuration()
class KeyboardInterruptError(Exception):
pass
class Writer(BaseSearchClass):
""" Instance object for generating SFTs """
@helper_functions.initializer
......@@ -443,20 +448,23 @@ class FrequencyModulatedArtifactWriter(Writer):
def get_h0(self, t):
return self.h0
def concatenate_sft_files(self, tmp_outdir):
def concatenate_sft_files(self):
SFTFilename = lalpulsar.OfficialSFTFilename(
self.IFO[0], self.IFO[1], self.nsfts, self.Tsft, self.tstart,
int(self.data_duration), self.label)
# If the file already exists, simply remove it for now (no caching
# implemented)
helper_functions.run_commandline(
'rm {}/{}'.format(self.outdir, SFTFilename), raise_error=False)
'rm {}/{}'.format(self.outdir, SFTFilename), raise_error=False,
log_level=10)
cl_splitSFTS = (
'lalapps_splitSFTs -fs {} -fb {} -fe {} -o {}/{} -i {}/{}_tmp/*sft'
'lalapps_splitSFTs -fs {} -fb {} -fe {} -o {}/{} -i {}/*sft'
.format(self.fmin, self.Band, self.fmin+self.Band, self.outdir,
SFTFilename, self.outdir, self.label))
SFTFilename, self.tmp_outdir))
helper_functions.run_commandline(cl_splitSFTS)
helper_functions.run_commandline('rm {} -r'.format(tmp_outdir))
helper_functions.run_commandline('rm {} -r'.format(self.tmp_outdir))
files = glob.glob('{}/{}*'.format(self.outdir, SFTFilename))
if len(files) == 1:
fn = files[0]
......@@ -468,31 +476,67 @@ class FrequencyModulatedArtifactWriter(Writer):
'Attempted to rename file, but multiple files found: {}'
.format(files))
def pre_compute_evolution(self):
logging.info('Precomputing evolution parameters')
self.lineFreqs = []
self.linePhis = []
self.lineh0s = []
self.mid_times = []
linePhi = 0
lineFreq_old = 0
for i in tqdm(range(self.nsfts)):
mid_time = self.tstart + (i+.5)*self.Tsft
lineFreq = self.get_frequency(mid_time)
self.mid_times.append(mid_time)
self.lineFreqs.append(lineFreq)
self.linePhis.append(linePhi + np.pi*self.Tsft*(lineFreq_old+lineFreq))
self.lineh0s.append(self.get_h0(mid_time))
lineFreq_old = lineFreq
def make_ith_sft(self, i):
try:
self.run_makefakedata_v4(self.mid_times[i], self.lineFreqs[i],
self.linePhis[i], self.lineh0s[i],
self.tmp_outdir)
except KeyboardInterrupt:
raise KeyboardInterruptError()
def make_data(self):
self.maxStartTime = None
self.duration = self.Tsft
linePhi = 0
lineFreq_old = 0
tmp_outdir = '{}/{}_tmp'.format(self.outdir, self.label)
if os.path.isdir(tmp_outdir) is True:
self.tmp_outdir = '{}/{}_tmp'.format(self.outdir, self.label)
if os.path.isdir(self.tmp_outdir) is True:
raise ValueError(
'Temporary directory {} already exists, please rename'.format(
tmp_outdir))
self.tmp_outdir))
else:
os.makedirs(tmp_outdir)
os.makedirs(self.tmp_outdir)
for i in tqdm(range(self.nsfts)):
self.minStartTime = self.tstart + i*self.Tsft
mid_time = self.minStartTime + self.Tsft / 2.0
lineFreq = self.get_frequency(mid_time)
linePhi += np.pi*self.Tsft*(lineFreq_old+lineFreq)
lineh0 = self.get_h0(mid_time)
self.run_makefakedata_v4(mid_time, lineFreq, linePhi, lineh0,
tmp_outdir)
lineFreq_old = lineFreq
self.pre_compute_evolution()
logging.info('Generating SFTs')
if args.N > 1 and pkgutil.find_loader('pathos') is not None:
import pathos.pools
logging.info('Using {} threads'.format(args.N))
try:
with pathos.pools.ProcessPool(args.N) as p:
list(tqdm(p.imap(self.make_ith_sft, range(self.nsfts)),
total=self.nsfts))
except KeyboardInterrupt:
p.terminate()
else:
logging.info(
"No multiprocessing requested or `pathos` not install, cont."
" without multiprocessing")
for i in tqdm(range(self.nsfts)):
self.make_ith_sft(i)
self.concatenate_sft_files(tmp_outdir)
self.concatenate_sft_files()
def run_makefakedata_v4(self, mid_time, lineFreq, linePhi, h0, tmp_outdir):
""" Generate the sft data using the --lineFeature option """
......@@ -502,7 +546,7 @@ class FrequencyModulatedArtifactWriter(Writer):
cl_mfd.append('--outSFTbname="{}"'.format(tmp_outdir))
cl_mfd.append('--IFO={}'.format(self.IFO))
cl_mfd.append('--noiseSqrtSh="{}"'.format(self.sqrtSX))
cl_mfd.append('--startTime={:0.0f}'.format(float(self.minStartTime)))
cl_mfd.append('--startTime={:0.0f}'.format(mid_time-self.Tsft/2.0))
cl_mfd.append('--refTime={:0.0f}'.format(mid_time))
cl_mfd.append('--duration={}'.format(int(self.duration)))
cl_mfd.append('--fmin={:.16g}'.format(self.fmin))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment