helper_functions.py 6.17 KB
Newer Older
Gregory Ashton's avatar
Gregory Ashton committed
1
2
3
4
5
6
7
8
9
"""
Provides helpful functions to facilitate ease-of-use of pyfstat
"""

import os
import sys
import argparse
import logging
import inspect
10
import peakutils
Gregory Ashton's avatar
Gregory Ashton committed
11
from functools import wraps
12
from scipy.stats.distributions import ncx2
Gregory Ashton's avatar
Gregory Ashton committed
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

import matplotlib.pyplot as plt
import numpy as np


def set_up_optional_tqdm():
    try:
        from tqdm import tqdm
    except ImportError:
        def tqdm(x, *args, **kwargs):
            return x
    return tqdm


def set_up_matplotlib_defaults():
    plt.switch_backend('Agg')
    plt.rcParams['text.usetex'] = True
    plt.rcParams['axes.formatter.useoffset'] = False


def set_up_command_line_arguments():
    parser = argparse.ArgumentParser()
    parser.add_argument("-q", "--quite", help="Decrease output verbosity",
                        action="store_true")
37
38
    parser.add_argument("-v", "--verbose", help="Increase output verbosity",
                        action="store_true")
Gregory Ashton's avatar
Gregory Ashton committed
39
40
41
42
43
44
45
46
47
48
49
50
51
    parser.add_argument("--no-interactive", help="Don't use interactive",
                        action="store_true")
    parser.add_argument("-c", "--clean", help="Don't use cached data",
                        action="store_true")
    parser.add_argument("-u", "--use-old-data", action="store_true")
    parser.add_argument('-s', "--setup-only", action="store_true")
    parser.add_argument('-n', "--no-template-counting", action="store_true")
    parser.add_argument('unittest_args', nargs='*')
    args, unknown = parser.parse_known_args()
    sys.argv[1:] = args.unittest_args
    if args.quite or args.no_interactive:
        def tqdm(x, *args, **kwargs):
            return x
52
53
    else:
        tqdm = set_up_optional_tqdm()
Gregory Ashton's avatar
Gregory Ashton committed
54
    logger = logging.getLogger()
55
    logger.setLevel(logging.INFO)
Gregory Ashton's avatar
Gregory Ashton committed
56
57
58
    stream_handler = logging.StreamHandler()
    if args.quite:
        stream_handler.setLevel(logging.WARNING)
59
    elif args.verbose:
Gregory Ashton's avatar
Gregory Ashton committed
60
        stream_handler.setLevel(logging.DEBUG)
61
62
    else:
        stream_handler.setLevel(logging.INFO)
Gregory Ashton's avatar
Gregory Ashton committed
63
64
65
    stream_handler.setFormatter(logging.Formatter(
        '%(asctime)s %(levelname)-8s: %(message)s', datefmt='%H:%M'))
    logger.addHandler(stream_handler)
66
    return args, tqdm
Gregory Ashton's avatar
Gregory Ashton committed
67
68
69


def set_up_ephemeris_configuration():
70
    """ Returns the earth_ephem and sun_ephem """
Gregory Ashton's avatar
Gregory Ashton committed
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
    config_file = os.path.expanduser('~')+'/.pyfstat.conf'
    if os.path.isfile(config_file):
        d = {}
        with open(config_file, 'r') as f:
            for line in f:
                k, v = line.split('=')
                k = k.replace(' ', '')
                for item in [' ', "'", '"', '\n']:
                    v = v.replace(item, '')
                d[k] = v
        earth_ephem = d['earth_ephem']
        sun_ephem = d['sun_ephem']
    else:
        logging.warning('No ~/.pyfstat.conf file found please provide the '
                        'paths when initialising searches')
        earth_ephem = None
        sun_ephem = None
    return earth_ephem, sun_ephem


def round_to_n(x, n):
    if not x:
        return 0
    power = -int(np.floor(np.log10(abs(x)))) + (n - 1)
    factor = (10 ** power)
    return round(x * factor) / factor


def texify_float(x, d=2):
Gregory Ashton's avatar
Gregory Ashton committed
100
101
    if x == 0:
        return 0
Gregory Ashton's avatar
Gregory Ashton committed
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
    if type(x) == str:
        return x
    x = round_to_n(x, d)
    if 0.01 < abs(x) < 100:
        return str(x)
    else:
        power = int(np.floor(np.log10(abs(x))))
        stem = np.round(x / 10**power, d)
        if d == 1:
            stem = int(stem)
        return r'${}{{\times}}10^{{{}}}$'.format(stem, power)


def initializer(func):
    """ Decorator function to automatically assign the parameters to self """
    names, varargs, keywords, defaults = inspect.getargspec(func)

    @wraps(func)
    def wrapper(self, *args, **kargs):
        for name, arg in list(zip(names[1:], args)) + list(kargs.items()):
            setattr(self, name, arg)

        for name, default in zip(reversed(names), reversed(defaults)):
            if not hasattr(self, name):
                setattr(self, name, default)

        func(self, *args, **kargs)

    return wrapper

132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154

def get_peak_values(frequencies, twoF, threshold_2F, F0=None, F0range=None):
    if F0:
        cut_idxs = np.abs(frequencies - F0) < F0range
        frequencies = frequencies[cut_idxs]
        twoF = twoF[cut_idxs]
    idxs = peakutils.indexes(twoF, thres=1.*threshold_2F/np.max(twoF))
    F0maxs = frequencies[idxs]
    twoFmaxs = twoF[idxs]
    freq_err = frequencies[1] - frequencies[0]
    return F0maxs, twoFmaxs, freq_err*np.ones(len(idxs))


def get_comb_values(F0, frequencies, twoF, period, N=4):
    if period == 'sidereal':
        period = 23*60*60 + 56*60 + 4.0616
    elif period == 'terrestrial':
        period = 86400
    freq_err = frequencies[1] - frequencies[0]
    comb_frequencies = [n*1/period for n in range(-N, N+1)]
    comb_idxs = [np.argmin(np.abs(frequencies-F0-F)) for F in comb_frequencies]
    return comb_frequencies, twoF[comb_idxs], freq_err*np.ones(len(comb_idxs))

155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185

def compute_P_twoFstarcheck(twoFstarcheck, twoFcheck, M0, plot=False):
    """ Returns the unnormalised pdf of twoFstarcheck given twoFcheck """
    upper = 4+twoFstarcheck + 0.5*(2*(4*M0+2*twoFcheck))
    rho2starcheck = np.linspace(1e-1, upper, 500)
    integrand = (ncx2.pdf(twoFstarcheck, 4*M0, rho2starcheck)
                 * ncx2.pdf(twoFcheck, 4, rho2starcheck))
    if plot:
        fig, ax = plt.subplots()
        ax.plot(rho2starcheck, integrand)
        fig.savefig('test')
    return np.trapz(integrand, rho2starcheck)


def compute_pstar(twoFcheck_obs, twoFstarcheck_obs, m0, plot=False):
    M0 = 2*m0 + 1
    upper = 4+twoFcheck_obs + (2*(4*M0+2*twoFcheck_obs))
    twoFstarcheck_vals = np.linspace(1e-1, upper, 500)
    P_twoFstarcheck = np.array(
        [compute_P_twoFstarcheck(twoFstarcheck, twoFcheck_obs, M0)
         for twoFstarcheck in twoFstarcheck_vals])
    C = np.trapz(P_twoFstarcheck, twoFstarcheck_vals)
    idx = np.argmin(np.abs(twoFstarcheck_vals - twoFstarcheck_obs))
    if plot:
        fig, ax = plt.subplots()
        ax.plot(twoFstarcheck_vals, P_twoFstarcheck)
        ax.fill_between(twoFstarcheck_vals[:idx+1], 0, P_twoFstarcheck[:idx+1])
        ax.axvline(twoFstarcheck_vals[idx])
        fig.savefig('test')
    pstar_l = np.trapz(P_twoFstarcheck[:idx+1]/C, twoFstarcheck_vals[:idx+1])
    return 2*np.min([pstar_l, 1-pstar_l])