tests.py 15 KB
Newer Older
1
2
3
import unittest
import numpy as np
import os
Gregory Ashton's avatar
Gregory Ashton committed
4
5
import shutil
import pyfstat
6
import lalpulsar
7
import logging
Gregory Ashton's avatar
Gregory Ashton committed
8

9

Gregory Ashton's avatar
Gregory Ashton committed
10
class Test(unittest.TestCase):
Gregory Ashton's avatar
Gregory Ashton committed
11
12
    outdir = 'TestData'

Gregory Ashton's avatar
Gregory Ashton committed
13
    @classmethod
Gregory Ashton's avatar
Gregory Ashton committed
14
15
    def setUpClass(self):
        if os.path.isdir(self.outdir):
16
17
18
19
20
            try:
                shutil.rmtree(self.outdir)
            except OSError:
                logging.warning(
                    "{} not removed prior to tests".format(self.outdir))
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
        h0 = 1
        sqrtSX = 1
        F0 = 30
        F1 = -1e-10
        F2 = 0
        minStartTime = 700000000
        duration = 2 * 86400
        Alpha = 5e-3
        Delta = 1.2
        tref = minStartTime
        Writer = pyfstat.Writer(F0=F0, F1=F1, F2=F2, label='test',
                                h0=h0, sqrtSX=sqrtSX,
                                outdir=self.outdir, tstart=minStartTime,
                                Alpha=Alpha, Delta=Delta, tref=tref,
                                duration=duration,
                                Band=4)
        Writer.make_data()
        self.sftfilepath = Writer.sftfilepath
        self.minStartTime = minStartTime
        self.maxStartTime = minStartTime + duration
        self.duration = duration
42

Gregory Ashton's avatar
Gregory Ashton committed
43
    @classmethod
Gregory Ashton's avatar
Gregory Ashton committed
44
45
    def tearDownClass(self):
        if os.path.isdir(self.outdir):
46
47
48
49
50
            try:
                shutil.rmtree(self.outdir)
            except OSError:
                logging.warning(
                    "{} not removed prior to tests".format(self.outdir))
Gregory Ashton's avatar
Gregory Ashton committed
51
52


53
class Writer(Test):
Gregory Ashton's avatar
Gregory Ashton committed
54
    label = "TestWriter"
55
56

    def test_make_cff(self):
Gregory Ashton's avatar
Gregory Ashton committed
57
        Writer = pyfstat.Writer(self.label, outdir=self.outdir)
58
        Writer.make_cff()
59
60
        self.assertTrue(os.path.isfile(
            './{}/{}.cff'.format(self.outdir, self.label)))
61
62

    def test_run_makefakedata(self):
63
        Writer = pyfstat.Writer(self.label, outdir=self.outdir, duration=3600)
64
65
66
        Writer.make_cff()
        Writer.run_makefakedata()
        self.assertTrue(os.path.isfile(
67
            './{}/H-2_H1_1800SFT_TestWriter-700000000-3600.sft'
68
            .format(self.outdir)))
69
70

    def test_makefakedata_usecached(self):
71
        Writer = pyfstat.Writer(self.label, outdir=self.outdir, duration=3600)
72
73
        if os.path.isfile(Writer.sftfilepath):
            os.remove(Writer.sftfilepath)
Gregory Ashton's avatar
Gregory Ashton committed
74
        Writer.make_cff()
75
        Writer.run_makefakedata()
76
        time_first = os.path.getmtime(Writer.sftfilepath)
77
        Writer.run_makefakedata()
78
        time_second = os.path.getmtime(Writer.sftfilepath)
79
80
81
        self.assertTrue(time_first == time_second)
        os.system('touch {}'.format(Writer.config_file_name))
        Writer.run_makefakedata()
82
        time_third = os.path.getmtime(Writer.sftfilepath)
83
84
85
        self.assertFalse(time_first == time_third)


86
class Bunch(Test):
Gregory Ashton's avatar
Gregory Ashton committed
87
88
89
90
91
    def test_bunch(self):
        b = pyfstat.core.Bunch(dict(x=10))
        self.assertTrue(b.x == 10)


92
class par(Test):
Gregory Ashton's avatar
Gregory Ashton committed
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
    label = 'TestPar'

    def test(self):
        os.system('mkdir {}'.format(self.outdir))
        os.system(
            'echo "x=100\ny=10" > {}/{}.par'.format(self.outdir, self.label))

        par = pyfstat.core.read_par(
            '{}/{}.par'.format(self.outdir, self.label), return_type='Bunch')
        self.assertTrue(par.x == 100)
        self.assertTrue(par.y == 10)

        par = pyfstat.core.read_par(outdir=self.outdir, label=self.label,
                                    return_type='dict')
        self.assertTrue(par['x'] == 100)
        self.assertTrue(par['y'] == 10)
109
        os.system('rm -r {}'.format(self.outdir))
Gregory Ashton's avatar
Gregory Ashton committed
110
111


112
class BaseSearchClass(Test):
113
114
115
    def test_shift_matrix(self):
        BSC = pyfstat.BaseSearchClass()
        dT = 10
116
        a = BSC._shift_matrix(4, dT)
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
        b = np.array([[1, 2*np.pi*dT, 2*np.pi*dT**2/2.0, 2*np.pi*dT**3/6.0],
                      [0, 1, dT, dT**2/2.0],
                      [0, 0, 1, dT],
                      [0, 0, 0, 1]])
        self.assertTrue(np.array_equal(a, b))

    def test_shift_coefficients(self):
        BSC = pyfstat.BaseSearchClass()
        thetaA = np.array([10., 1e2, 10., 1e2])
        dT = 100

        # Calculate the 'long' way
        thetaB = np.zeros(len(thetaA))
        thetaB[3] = thetaA[3]
        thetaB[2] = thetaA[2] + thetaA[3]*dT
        thetaB[1] = thetaA[1] + thetaA[2]*dT + .5*thetaA[3]*dT**2
        thetaB[0] = thetaA[0] + 2*np.pi*(thetaA[1]*dT + .5*thetaA[2]*dT**2
                                         + thetaA[3]*dT**3 / 6.0)

        self.assertTrue(
            np.array_equal(
138
                thetaB, BSC._shift_coefficients(thetaA, dT)))
139
140
141
142
143

    def test_shift_coefficients_loop(self):
        BSC = pyfstat.BaseSearchClass()
        thetaA = np.array([10., 1e2, 10., 1e2])
        dT = 1e1
144
        thetaB = BSC._shift_coefficients(thetaA, dT)
145
146
        self.assertTrue(
            np.allclose(
147
                thetaA, BSC._shift_coefficients(thetaB, -dT),
148
149
150
                rtol=1e-9, atol=1e-9))


151
class ComputeFstat(Test):
Gregory Ashton's avatar
Gregory Ashton committed
152
    label = "TestComputeFstat"
153
154

    def test_run_computefstatistic_single_point(self):
Gregory Ashton's avatar
Gregory Ashton committed
155
        Writer = pyfstat.Writer(self.label, outdir=self.outdir, duration=86400,
156
                                h0=1, sqrtSX=1, detectors='H1,L1')
157
158
159
        Writer.make_data()
        predicted_FS = Writer.predict_fstat()

160
161
162
163
164
165
        search_H1L1 = pyfstat.ComputeFstat(
            tref=Writer.tref,
            sftfilepattern='{}/*{}*sft'.format(Writer.outdir, Writer.label))
        FS = search_H1L1.get_fullycoherent_twoF(
            Writer.tstart, Writer.tend, Writer.F0, Writer.F1, Writer.F2,
            Writer.Alpha, Writer.Delta)
Gregory Ashton's avatar
Gregory Ashton committed
166
        self.assertTrue(np.abs(predicted_FS-FS)/FS < 0.3)
167
168
169

        Writer.detectors = 'H1'
        predicted_FS = Writer.predict_fstat()
170
171
172
173
174
175
176
        search_H1 = pyfstat.ComputeFstat(
            tref=Writer.tref, detectors='H1',
            sftfilepattern='{}/*{}*sft'.format(Writer.outdir, Writer.label),
            SSBprec=lalpulsar.SSBPREC_RELATIVISTIC)
        FS = search_H1.get_fullycoherent_twoF(
            Writer.tstart, Writer.tend, Writer.F0, Writer.F1, Writer.F2,
            Writer.Alpha, Writer.Delta)
Gregory Ashton's avatar
Gregory Ashton committed
177
        self.assertTrue(np.abs(predicted_FS-FS)/FS < 0.3)
178

179
    def run_computefstatistic_single_point_no_noise(self):
180
181
182
        Writer = pyfstat.Writer(
            self.label, outdir=self.outdir, add_noise=False, duration=86400,
            h0=1, sqrtSX=1)
183
184
185
186
187
        Writer.make_data()
        predicted_FS = Writer.predict_fstat()

        search = pyfstat.ComputeFstat(
            tref=Writer.tref, assumeSqrtSX=1,
188
            sftfilepattern='{}/*{}*sft'.format(Writer.outdir, Writer.label))
189
190
191
        FS = search.get_fullycoherent_twoF(
            Writer.tstart, Writer.tend, Writer.F0, Writer.F1, Writer.F2,
            Writer.Alpha, Writer.Delta)
192
        self.assertTrue(np.abs(predicted_FS-FS)/FS < 0.3)
Gregory Ashton's avatar
Gregory Ashton committed
193

194
    def test_injectSources(self):
195
196
197
198
        # This seems to be writing with a signal...
        Writer = pyfstat.Writer(
            self.label, outdir=self.outdir, add_noise=False, duration=86400,
            h0=1, sqrtSX=1)
199
200
201
202
203
204
205
206
        Writer.make_cff()
        injectSources = Writer.config_file_name

        search = pyfstat.ComputeFstat(
            tref=Writer.tref, assumeSqrtSX=1, injectSources=injectSources,
            minCoverFreq=28, maxCoverFreq=32, minStartTime=Writer.tstart,
            maxStartTime=Writer.tstart+Writer.duration,
            detectors=Writer.detectors)
207
        FS_from_file = search.get_fullycoherent_twoF(
208
209
            Writer.tstart, Writer.tend, Writer.F0, Writer.F1, Writer.F2,
            Writer.Alpha, Writer.Delta)
210
211
        Writer.make_data()
        predicted_FS = Writer.predict_fstat()
Gregory Ashton's avatar
Gregory Ashton committed
212
        self.assertTrue(np.abs(predicted_FS-FS_from_file)/FS_from_file < 0.3)
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228

        injectSourcesdict = pyfstat.core.read_par(Writer.config_file_name)
        injectSourcesdict['F0'] = injectSourcesdict['Freq']
        injectSourcesdict['F1'] = injectSourcesdict['f1dot']
        injectSourcesdict['F2'] = injectSourcesdict['f2dot']
        search = pyfstat.ComputeFstat(
            tref=Writer.tref, assumeSqrtSX=1, injectSources=injectSourcesdict,
            minCoverFreq=28, maxCoverFreq=32, minStartTime=Writer.tstart,
            maxStartTime=Writer.tstart+Writer.duration,
            detectors=Writer.detectors)
        FS_from_dict = search.get_fullycoherent_twoF(
            Writer.tstart, Writer.tend, Writer.F0, Writer.F1, Writer.F2,
            Writer.Alpha, Writer.Delta)
        self.assertTrue(FS_from_dict == FS_from_file)


229
class SemiCoherentSearch(Test):
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
    label = "TestSemiCoherentSearch"

    def test_get_semicoherent_twoF(self):
        duration = 10*86400
        Writer = pyfstat.Writer(
            self.label, outdir=self.outdir, duration=duration, h0=1, sqrtSX=1)
        Writer.make_data()

        search = pyfstat.SemiCoherentSearch(
            label=self.label, outdir=self.outdir, nsegs=2,
            sftfilepattern='{}/*{}*sft'.format(Writer.outdir, Writer.label),
            tref=Writer.tref, minStartTime=Writer.tstart,
            maxStartTime=Writer.tend)

        search.get_semicoherent_twoF(
            Writer.F0, Writer.F1, Writer.F2, Writer.Alpha, Writer.Delta,
            record_segments=True)

        # Compute the predicted semi-coherent Fstat
        minStartTime = Writer.tstart
        maxStartTime = Writer.tend

        Writer.maxStartTime = minStartTime + duration / 2.0
        FSA = Writer.predict_fstat()

        Writer.tstart = minStartTime + duration / 2.0
        Writer.tend = maxStartTime
        FSB = Writer.predict_fstat()

        FSs = np.array([FSA, FSB])
        diffs = (np.array(search.detStat_per_segment) - FSs) / FSs
        self.assertTrue(np.all(diffs < 0.3))

    def test_get_semicoherent_BSGL(self):
        duration = 10*86400
        Writer = pyfstat.Writer(
            self.label, outdir=self.outdir, duration=duration,
            detectors='H1,L1')
        Writer.make_data()

        search = pyfstat.SemiCoherentSearch(
            label=self.label, outdir=self.outdir, nsegs=2,
            sftfilepattern='{}/*{}*sft'.format(Writer.outdir, Writer.label),
            tref=Writer.tref, minStartTime=Writer.tstart,
            maxStartTime=Writer.tend, BSGL=True)

        BSGL = search.get_semicoherent_twoF(
            Writer.F0, Writer.F1, Writer.F2, Writer.Alpha, Writer.Delta,
            record_segments=True)
        self.assertTrue(BSGL > 0)
280
281


282
class SemiCoherentGlitchSearch(Test):
Gregory Ashton's avatar
Gregory Ashton committed
283
    label = "TestSemiCoherentGlitchSearch"
Gregory Ashton's avatar
Gregory Ashton committed
284

285
    def test_get_semicoherent_nglitch_twoF(self):
Gregory Ashton's avatar
Gregory Ashton committed
286
287
        duration = 10*86400
        dtglitch = .5*duration
288
        delta_F0 = 0
Gregory Ashton's avatar
Gregory Ashton committed
289
290
        h0 = 1
        sqrtSX = 1
291
        Writer = pyfstat.GlitchWriter(
Gregory Ashton's avatar
Gregory Ashton committed
292
            self.label, outdir=self.outdir, duration=duration, dtglitch=dtglitch,
Gregory Ashton's avatar
Gregory Ashton committed
293
            delta_F0=delta_F0, sqrtSX=sqrtSX, h0=h0)
294
295
296
297

        Writer.make_data()

        search = pyfstat.SemiCoherentGlitchSearch(
Gregory Ashton's avatar
Gregory Ashton committed
298
            label=self.label, outdir=self.outdir,
299
            sftfilepattern='{}/*{}*sft'.format(Writer.outdir, Writer.label),
300
301
            tref=Writer.tref, minStartTime=Writer.tstart,
            maxStartTime=Writer.tend, nglitch=1)
302

303
304
305
        FS = search.get_semicoherent_nglitch_twoF(
            Writer.F0, Writer.F1, Writer.F2, Writer.Alpha, Writer.Delta,
            Writer.delta_F0, Writer.delta_F1, search.minStartTime+dtglitch)
306
307

        # Compute the predicted semi-coherent glitch Fstat
308
309
        minStartTime = Writer.tstart
        maxStartTime = Writer.tend
310

311
        Writer.maxStartTime = minStartTime + dtglitch
312
313
        FSA = Writer.predict_fstat()

314
315
        Writer.tstart = minStartTime + dtglitch
        Writer.tend = maxStartTime
316
317
        FSB = Writer.predict_fstat()

318
        print FSA, FSB
319
320
321
        predicted_FS = (FSA + FSB)

        print(predicted_FS, FS)
Gregory Ashton's avatar
Gregory Ashton committed
322
        self.assertTrue(np.abs((FS - predicted_FS))/predicted_FS < 0.3)
323
324


325
class MCMCSearch(Test):
Gregory Ashton's avatar
Gregory Ashton committed
326
    label = "TestMCMCSearch"
327
328

    def test_fully_coherent(self):
Gregory Ashton's avatar
Gregory Ashton committed
329
330
        h0 = 1
        sqrtSX = 1
331
332
333
        F0 = 30
        F1 = -1e-10
        F2 = 0
334
        minStartTime = 700000000
Gregory Ashton's avatar
Gregory Ashton committed
335
        duration = 1 * 86400
336
        maxStartTime = minStartTime + duration
337
338
        Alpha = 5e-3
        Delta = 1.2
339
        tref = minStartTime
340
341
        Writer = pyfstat.Writer(F0=F0, F1=F1, F2=F2, label=self.label,
                                h0=h0, sqrtSX=sqrtSX,
Gregory Ashton's avatar
Gregory Ashton committed
342
                                outdir=self.outdir, tstart=minStartTime,
343
                                Alpha=Alpha, Delta=Delta, tref=tref,
344
                                duration=duration,
345
                                Band=4)
346
347
348
349

        Writer.make_data()
        predicted_FS = Writer.predict_fstat()

Gregory Ashton's avatar
Gregory Ashton committed
350
351
        theta = {'F0': {'type': 'norm', 'loc': F0, 'scale': np.abs(1e-10*F0)},
                 'F1': {'type': 'norm', 'loc': F1, 'scale': np.abs(1e-10*F1)},
352
353
                 'F2': F2, 'Alpha': Alpha, 'Delta': Delta}

Gregory Ashton's avatar
Gregory Ashton committed
354
        search = pyfstat.MCMCSearch(
Gregory Ashton's avatar
Gregory Ashton committed
355
            label=self.label, outdir=self.outdir, theta_prior=theta, tref=tref,
356
            sftfilepattern='{}/*{}*sft'.format(Writer.outdir, Writer.label),
357
            minStartTime=minStartTime, maxStartTime=maxStartTime,
358
            nsteps=[100, 100], nwalkers=100, ntemps=2, log10beta_min=-1)
359
        search.run(create_plots=False)
360
361
362
363
        _, FS = search.get_max_twoF()

        print('Predicted twoF is {} while recovered is {}'.format(
                predicted_FS, FS))
Gregory Ashton's avatar
Gregory Ashton committed
364
365
        self.assertTrue(
            FS > predicted_FS or np.abs((FS-predicted_FS))/predicted_FS < 0.3)
366
367


368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
class GridSearch(Test):
    F0s = [29, 31, 0.1]
    F1s = [-1e-10, 0, 1e-11]
    tref = 700000000

    def test_grid_search(self):
        search = pyfstat.GridSearch(
            'grid_search', self.outdir, self.sftfilepath, F0s=self.F0s,
            F1s=[0], F2s=[0], Alphas=[0], Deltas=[0], tref=self.tref)
        search.run()
        self.assertTrue(os.path.isfile(search.out_file))

    def test_semicoherent_grid_search(self):
        search = pyfstat.GridSearch(
            'sc_grid_search', self.outdir, self.sftfilepath, F0s=self.F0s,
            F1s=[0], F2s=[0], Alphas=[0], Deltas=[0], tref=self.tref, nsegs=2)
        search.run()
        self.assertTrue(os.path.isfile(search.out_file))

    def test_slice_grid_search(self):
        search = pyfstat.SliceGridSearch(
            'slice_grid_search', self.outdir, self.sftfilepath, F0s=self.F0s,
            F1s=self.F1s, F2s=[0], Alphas=[0], Deltas=[0], tref=self.tref,
            Lambda0=[30, 0, 0, 0])
        search.run()
        self.assertTrue(os.path.isfile('{}/{}_slice_projection.png'
                                       .format(search.outdir, search.label)))

    def test_glitch_grid_search(self):
        search = pyfstat.GridGlitchSearch(
            'grid_grid_search', self.outdir, self.sftfilepath, F0s=self.F0s,
            F1s=self.F1s, F2s=[0], Alphas=[0], Deltas=[0], tref=self.tref,
            tglitchs=[self.tref])
        search.run()
        self.assertTrue(os.path.isfile(search.out_file))

    def test_sliding_window(self):
        search = pyfstat.FrequencySlidingWindow(
            'grid_grid_search', self.outdir, self.sftfilepath, F0s=self.F0s,
            F1=0, F2=0, Alpha=0, Delta=0, tref=self.tref,
            minStartTime=self.minStartTime, maxStartTime=self.maxStartTime)
        search.run()
        self.assertTrue(os.path.isfile(search.out_file))

412
413
if __name__ == '__main__':
    unittest.main()