Select Git revision
optimal_setup_functions.py

Gregory Ashton authored
- Provide a function in the BaseSearchClass to read in the ephemeris files - Some clean up of docstrings
optimal_setup_functions.py 7.54 KiB
"""
Provides functions to aid in calculating the optimal setup for zoom follow up
"""
from __future__ import division, absolute_import, print_function
import logging
import numpy as np
import scipy.optimize
import lal
import lalpulsar
import pyfstat.helper_functions as helper_functions
def get_optimal_setup(
NstarMax, Nsegs0, tref, minStartTime, maxStartTime, prior,
detector_names):
""" Using an optimisation step, calculate the optimal setup ladder
Parameters
----------
NstarMax : float
Nsegs0 : int
The number of segments for the initial step of the ladder
minStartTime, maxStartTime : int
GPS times of the start and end time of the search
prior : dict
Prior dictionary, each item must either be a fixed scalar value, or
a uniform prior.
detector_names : list of str
Returns
-------
nsegs, Nstar : list
Ladder of segment numbers and the corresponding Nstar
"""
logging.info('Calculating optimal setup for NstarMax={}, Nsegs0={}'.format(
NstarMax, Nsegs0))
Nstar_0 = get_Nstar_estimate(
Nsegs0, tref, minStartTime, maxStartTime, prior,
detector_names)
logging.info(
'Stage {}, nsegs={}, Nstar={}'.format(0, Nsegs0, int(Nstar_0)))
nsegs_vals = [Nsegs0]
Nstar_vals = [Nstar_0]
i = 0
nsegs_i = Nsegs0
while nsegs_i > 1:
nsegs_i, Nstar_i = _get_nsegs_ip1(
nsegs_i, NstarMax, tref, minStartTime, maxStartTime, prior,
detector_names)
nsegs_vals.append(nsegs_i)
Nstar_vals.append(Nstar_i)
i += 1
logging.info(
'Stage {}, nsegs={}, Nstar={}'.format(i, nsegs_i, int(Nstar_i)))
return nsegs_vals, Nstar_vals
def _get_nsegs_ip1(nsegs_i, NstarMax, tref, minStartTime, maxStartTime, prior,
detector_names):
""" Calculate Nsegs_{i+1} given Nsegs_{i} """
log10NstarMax = np.log10(NstarMax)
log10Nstari = np.log10(get_Nstar_estimate(
nsegs_i, tref, minStartTime, maxStartTime, prior,
detector_names))
def f(nsegs_ip1):
if nsegs_ip1[0] > nsegs_i:
return 1e6
if nsegs_ip1[0] < 0:
return 1e6
nsegs_ip1 = int(nsegs_ip1[0])
if nsegs_ip1 == 0:
nsegs_ip1 = 1
Nstarip1 = get_Nstar_estimate(
nsegs_ip1, tref, minStartTime, maxStartTime, prior, detector_names)
if Nstarip1 is None:
return 1e6
else:
log10Nstarip1 = np.log10(Nstarip1)
return np.abs(log10Nstari + log10NstarMax - log10Nstarip1)
res = scipy.optimize.minimize(f, .4*nsegs_i, method='Powell', tol=1,
options={'maxiter': 10})
logging.info('{} with {} evaluations'.format(res['message'], res['nfev']))
nsegs_ip1 = int(res.x)
if nsegs_ip1 == 0:
nsegs_ip1 = 1
if res.success:
return nsegs_ip1, get_Nstar_estimate(
nsegs_ip1, tref, minStartTime, maxStartTime, prior,
detector_names)
else:
raise ValueError('Optimisation unsuccesful')
def _extract_data_from_prior(prior):
""" Calculate the input data from the prior
Parameters
----------
prior: dict
Returns
-------
p : ndarray
Matrix with columns being the edges of the uniform bounding box
spindowns : int
The number of spindowns
sky : bool
If true, search includes the sky position
fiducial_freq : float
Fidicual frequency
"""
keys = ['Alpha', 'Delta', 'F0', 'F1', 'F2']
spindown_keys = keys[3:]
sky_keys = keys[:2]
lims = []
lims_keys = []
lims_idxs = []
for i, key in enumerate(keys):
if type(prior[key]) == dict:
if prior[key]['type'] == 'unif':
lims.append([prior[key]['lower'], prior[key]['upper']])
lims_keys.append(key)
lims_idxs.append(i)
else:
raise ValueError(
"Prior type {} not yet supported".format(
prior[key]['type']))
elif key not in spindown_keys:
lims.append([prior[key], 0])
lims = np.array(lims)
lims_keys = np.array(lims_keys)
base = lims[:, 0]
p = [base]
for i in lims_idxs:
basex = base.copy()
basex[i] = lims[i, 1]
p.append(basex)
spindowns = np.sum([np.sum(lims_keys == k) for k in spindown_keys])
sky = any([key in lims_keys for key in sky_keys])
if type(prior['F0']) == dict:
fiducial_freq = prior['F0']['upper']
else:
fiducial_freq = prior['F0']
return np.array(p).T, spindowns, sky, fiducial_freq
def get_Nstar_estimate(
nsegs, tref, minStartTime, maxStartTime, prior, detector_names):
""" Returns N* estimated from the super-sky metric
Parameters
----------
nsegs : int
Number of semi-coherent segments
tref : int
Reference time in GPS seconds
minStartTime, maxStartTime : int
Minimum and maximum SFT timestamps
prior : dict
The prior dictionary
detector_names : array
Array of detectors to average over
Returns
-------
Nstar: int
The estimated approximate number of templates to cover the prior
parameter space at a mismatch of unity, assuming the normalised
thickness is unity.
"""
earth_ephem, sun_ephem = helper_functions.get_ephemeris_files()
in_phys, spindowns, sky, fiducial_freq = _extract_data_from_prior(prior)
out_rssky = np.zeros(in_phys.shape)
in_phys = helper_functions.convert_array_to_gsl_matrix(in_phys)
out_rssky = helper_functions.convert_array_to_gsl_matrix(out_rssky)
tboundaries = np.linspace(minStartTime, maxStartTime, nsegs+1)
ref_time = lal.LIGOTimeGPS(tref)
segments = lal.SegListCreate()
for j in range(len(tboundaries)-1):
seg = lal.SegCreate(lal.LIGOTimeGPS(tboundaries[j]),
lal.LIGOTimeGPS(tboundaries[j+1]),
j)
lal.SegListAppend(segments, seg)
detNames = lal.CreateStringVector(*detector_names)
detectors = lalpulsar.MultiLALDetector()
lalpulsar.ParseMultiLALDetector(detectors, detNames)
detector_weights = None
detector_motion = (lalpulsar.DETMOTION_SPIN
+ lalpulsar.DETMOTION_ORBIT)
ephemeris = lalpulsar.InitBarycenter(earth_ephem, sun_ephem)
try:
SSkyMetric = lalpulsar.ComputeSuperskyMetrics(
lalpulsar.SUPERSKY_METRIC_TYPE, spindowns, ref_time, segments,
fiducial_freq, detectors, detector_weights, detector_motion,
ephemeris)
except RuntimeError as e:
logging.warning('Encountered run-time error {}'.format(e))
raise RuntimeError("Calculation of the SSkyMetric failed")
if sky:
i = 0
else:
i = 2
lalpulsar.ConvertPhysicalToSuperskyPoints(
out_rssky, in_phys, SSkyMetric.semi_rssky_transf)
d = out_rssky.data
g = SSkyMetric.semi_rssky_metric.data
d[2:] = d[2:][::-1] # Convert to Alpha, Delta, F0, F1.. ordering
g[2:] = g[2:][::-1] # Convert to Alpha, Delta, F0, F1.. ordering
g = g[i:, i:] # Remove sky if required
parallelepiped = (out_rssky.data[i:, 1:].T - out_rssky.data[i:, 0]).T
Nstars = []
for j in range(1, len(g)+1):
dV = np.abs(np.linalg.det(parallelepiped[:j, :j]))
sqrtdetG = np.sqrt(np.abs(np.linalg.det(g[:j, :j])))
Nstars.append(sqrtdetG * dV)
logging.debug('Nstar for each dimension = {}'.format(
', '.join(["{:1.1e}".format(n) for n in Nstars])))
return np.max(Nstars)