tests.py 16.8 KB
Newer Older
1
2
3
import unittest
import numpy as np
import os
Gregory Ashton's avatar
Gregory Ashton committed
4
5
import shutil
import pyfstat
6
import lalpulsar
7
import logging
8
import time
9

10

Gregory Ashton's avatar
Gregory Ashton committed
11
class Test(unittest.TestCase):
12
    outdir = "TestData"
Gregory Ashton's avatar
Gregory Ashton committed
13

Gregory Ashton's avatar
Gregory Ashton committed
14
    @classmethod
Gregory Ashton's avatar
Gregory Ashton committed
15
16
    def setUpClass(self):
        if os.path.isdir(self.outdir):
17
18
19
            try:
                shutil.rmtree(self.outdir)
            except OSError:
20
                logging.warning("{} not removed prior to tests".format(self.outdir))
21
22
23
24
25
26
27
28
29
30
        h0 = 1
        sqrtSX = 1
        F0 = 30
        F1 = -1e-10
        F2 = 0
        minStartTime = 700000000
        duration = 2 * 86400
        Alpha = 5e-3
        Delta = 1.2
        tref = minStartTime
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
        Writer = pyfstat.Writer(
            F0=F0,
            F1=F1,
            F2=F2,
            label="test",
            h0=h0,
            sqrtSX=sqrtSX,
            outdir=self.outdir,
            tstart=minStartTime,
            Alpha=Alpha,
            Delta=Delta,
            tref=tref,
            duration=duration,
            Band=4,
        )
46
47
48
49
50
        Writer.make_data()
        self.sftfilepath = Writer.sftfilepath
        self.minStartTime = minStartTime
        self.maxStartTime = minStartTime + duration
        self.duration = duration
51

Gregory Ashton's avatar
Gregory Ashton committed
52
    @classmethod
Gregory Ashton's avatar
Gregory Ashton committed
53
54
    def tearDownClass(self):
        if os.path.isdir(self.outdir):
55
56
57
            try:
                shutil.rmtree(self.outdir)
            except OSError:
58
                logging.warning("{} not removed prior to tests".format(self.outdir))
Gregory Ashton's avatar
Gregory Ashton committed
59
60


61
class Writer(Test):
Gregory Ashton's avatar
Gregory Ashton committed
62
    label = "TestWriter"
63
64

    def test_make_cff(self):
Gregory Ashton's avatar
Gregory Ashton committed
65
        Writer = pyfstat.Writer(self.label, outdir=self.outdir)
66
        Writer.make_cff()
67
        self.assertTrue(os.path.isfile("./{}/{}.cff".format(self.outdir, self.label)))
68
69

    def test_run_makefakedata(self):
70
        Writer = pyfstat.Writer(self.label, outdir=self.outdir, duration=3600)
71
72
        Writer.make_cff()
        Writer.run_makefakedata()
73
74
75
76
77
        self.assertTrue(
            os.path.isfile(
                "./{}/H-2_H1_1800SFT_TestWriter-700000000-3600.sft".format(self.outdir)
            )
        )
78
79

    def test_makefakedata_usecached(self):
80
        Writer = pyfstat.Writer(self.label, outdir=self.outdir, duration=3600)
81
82
        if os.path.isfile(Writer.sftfilepath):
            os.remove(Writer.sftfilepath)
Gregory Ashton's avatar
Gregory Ashton committed
83
        Writer.make_cff()
84
        Writer.run_makefakedata()
85
        time_first = os.path.getmtime(Writer.sftfilepath)
86
        Writer.run_makefakedata()
87
        time_second = os.path.getmtime(Writer.sftfilepath)
88
        self.assertTrue(time_first == time_second)
89
        time.sleep(1)  # make sure timestamp is actually different!
90
        os.system("touch {}".format(Writer.config_file_name))
91
        Writer.run_makefakedata()
92
        time_third = os.path.getmtime(Writer.sftfilepath)
93
94
95
        self.assertFalse(time_first == time_third)


96
class Bunch(Test):
Gregory Ashton's avatar
Gregory Ashton committed
97
98
99
100
101
    def test_bunch(self):
        b = pyfstat.core.Bunch(dict(x=10))
        self.assertTrue(b.x == 10)


102
class par(Test):
103
    label = "TestPar"
Gregory Ashton's avatar
Gregory Ashton committed
104
105

    def test(self):
106
        os.system('echo "x=100\ny=10" > {}/{}.par'.format(self.outdir, self.label))
Gregory Ashton's avatar
Gregory Ashton committed
107
108

        par = pyfstat.core.read_par(
109
110
            "{}/{}.par".format(self.outdir, self.label), return_type="Bunch"
        )
Gregory Ashton's avatar
Gregory Ashton committed
111
112
113
        self.assertTrue(par.x == 100)
        self.assertTrue(par.y == 10)

114
115
116
117
118
119
        par = pyfstat.core.read_par(
            outdir=self.outdir, label=self.label, return_type="dict"
        )
        self.assertTrue(par["x"] == 100)
        self.assertTrue(par["y"] == 10)
        os.system("rm -r {}".format(self.outdir))
Gregory Ashton's avatar
Gregory Ashton committed
120
121


122
class BaseSearchClass(Test):
123
124
125
    def test_shift_matrix(self):
        BSC = pyfstat.BaseSearchClass()
        dT = 10
126
        a = BSC._shift_matrix(4, dT)
127
128
129
130
131
132
133
134
135
136
137
138
139
        b = np.array(
            [
                [
                    1,
                    2 * np.pi * dT,
                    2 * np.pi * dT ** 2 / 2.0,
                    2 * np.pi * dT ** 3 / 6.0,
                ],
                [0, 1, dT, dT ** 2 / 2.0],
                [0, 0, 1, dT],
                [0, 0, 0, 1],
            ]
        )
140
141
142
143
        self.assertTrue(np.array_equal(a, b))

    def test_shift_coefficients(self):
        BSC = pyfstat.BaseSearchClass()
144
        thetaA = np.array([10.0, 1e2, 10.0, 1e2])
145
146
147
148
149
        dT = 100

        # Calculate the 'long' way
        thetaB = np.zeros(len(thetaA))
        thetaB[3] = thetaA[3]
150
151
152
153
154
        thetaB[2] = thetaA[2] + thetaA[3] * dT
        thetaB[1] = thetaA[1] + thetaA[2] * dT + 0.5 * thetaA[3] * dT ** 2
        thetaB[0] = thetaA[0] + 2 * np.pi * (
            thetaA[1] * dT + 0.5 * thetaA[2] * dT ** 2 + thetaA[3] * dT ** 3 / 6.0
        )
155

156
        self.assertTrue(np.array_equal(thetaB, BSC._shift_coefficients(thetaA, dT)))
157
158
159

    def test_shift_coefficients_loop(self):
        BSC = pyfstat.BaseSearchClass()
160
        thetaA = np.array([10.0, 1e2, 10.0, 1e2])
161
        dT = 1e1
162
        thetaB = BSC._shift_coefficients(thetaA, dT)
163
164
        self.assertTrue(
            np.allclose(
165
166
167
                thetaA, BSC._shift_coefficients(thetaB, -dT), rtol=1e-9, atol=1e-9
            )
        )
168
169


170
class ComputeFstat(Test):
Gregory Ashton's avatar
Gregory Ashton committed
171
    label = "TestComputeFstat"
172
173

    def test_run_computefstatistic_single_point(self):
174
175
176
177
178
179
180
181
        Writer = pyfstat.Writer(
            self.label,
            outdir=self.outdir,
            duration=86400,
            h0=1,
            sqrtSX=1,
            detectors="H1",
        )
182
183
184
        Writer.make_data()
        predicted_FS = Writer.predict_fstat()

185
186
        search_H1L1 = pyfstat.ComputeFstat(
            tref=Writer.tref,
187
188
            sftfilepattern="{}/*{}*sft".format(Writer.outdir, Writer.label),
        )
189
        FS = search_H1L1.get_fullycoherent_twoF(
190
191
192
193
194
195
196
197
198
199
200
            Writer.tstart,
            Writer.tend,
            Writer.F0,
            Writer.F1,
            Writer.F2,
            Writer.Alpha,
            Writer.Delta,
        )
        self.assertTrue(np.abs(predicted_FS - FS) / FS < 0.3)

        Writer.detectors = "H1"
201
        predicted_FS = Writer.predict_fstat()
202
        search_H1 = pyfstat.ComputeFstat(
203
204
205
206
207
            tref=Writer.tref,
            detectors="H1",
            sftfilepattern="{}/*{}*sft".format(Writer.outdir, Writer.label),
            SSBprec=lalpulsar.SSBPREC_RELATIVISTIC,
        )
208
        FS = search_H1.get_fullycoherent_twoF(
209
210
211
212
213
214
215
216
217
            Writer.tstart,
            Writer.tend,
            Writer.F0,
            Writer.F1,
            Writer.F2,
            Writer.Alpha,
            Writer.Delta,
        )
        self.assertTrue(np.abs(predicted_FS - FS) / FS < 0.3)
218

219
    def run_computefstatistic_single_point_no_noise(self):
220
        Writer = pyfstat.Writer(
221
222
223
224
225
226
227
            self.label,
            outdir=self.outdir,
            add_noise=False,
            duration=86400,
            h0=1,
            sqrtSX=1,
        )
228
229
230
231
        Writer.make_data()
        predicted_FS = Writer.predict_fstat()

        search = pyfstat.ComputeFstat(
232
233
234
235
            tref=Writer.tref,
            assumeSqrtSX=1,
            sftfilepattern="{}/*{}*sft".format(Writer.outdir, Writer.label),
        )
236
        FS = search.get_fullycoherent_twoF(
237
238
239
240
241
242
243
244
245
            Writer.tstart,
            Writer.tend,
            Writer.F0,
            Writer.F1,
            Writer.F2,
            Writer.Alpha,
            Writer.Delta,
        )
        self.assertTrue(np.abs(predicted_FS - FS) / FS < 0.3)
Gregory Ashton's avatar
Gregory Ashton committed
246

247
    def test_injectSources(self):
248
249
        # This seems to be writing with a signal...
        Writer = pyfstat.Writer(
250
251
252
253
254
255
256
            self.label,
            outdir=self.outdir,
            add_noise=False,
            duration=86400,
            h0=1,
            sqrtSX=1,
        )
257
258
259
260
        Writer.make_cff()
        injectSources = Writer.config_file_name

        search = pyfstat.ComputeFstat(
261
262
263
264
265
266
267
268
269
            tref=Writer.tref,
            assumeSqrtSX=1,
            injectSources=injectSources,
            minCoverFreq=28,
            maxCoverFreq=32,
            minStartTime=Writer.tstart,
            maxStartTime=Writer.tstart + Writer.duration,
            detectors=Writer.detectors,
        )
270
        FS_from_file = search.get_fullycoherent_twoF(
271
272
273
274
275
276
277
278
            Writer.tstart,
            Writer.tend,
            Writer.F0,
            Writer.F1,
            Writer.F2,
            Writer.Alpha,
            Writer.Delta,
        )
279
280
        Writer.make_data()
        predicted_FS = Writer.predict_fstat()
281
        self.assertTrue(np.abs(predicted_FS - FS_from_file) / FS_from_file < 0.3)
282
283

        injectSourcesdict = pyfstat.core.read_par(Writer.config_file_name)
284
285
286
        injectSourcesdict["F0"] = injectSourcesdict["Freq"]
        injectSourcesdict["F1"] = injectSourcesdict["f1dot"]
        injectSourcesdict["F2"] = injectSourcesdict["f2dot"]
287
        search = pyfstat.ComputeFstat(
288
289
290
291
292
293
294
295
296
            tref=Writer.tref,
            assumeSqrtSX=1,
            injectSources=injectSourcesdict,
            minCoverFreq=28,
            maxCoverFreq=32,
            minStartTime=Writer.tstart,
            maxStartTime=Writer.tstart + Writer.duration,
            detectors=Writer.detectors,
        )
297
        FS_from_dict = search.get_fullycoherent_twoF(
298
299
300
301
302
303
304
305
            Writer.tstart,
            Writer.tend,
            Writer.F0,
            Writer.F1,
            Writer.F2,
            Writer.Alpha,
            Writer.Delta,
        )
306
307
308
        self.assertTrue(FS_from_dict == FS_from_file)


309
class SemiCoherentSearch(Test):
310
311
312
    label = "TestSemiCoherentSearch"

    def test_get_semicoherent_twoF(self):
313
        duration = 10 * 86400
314
        Writer = pyfstat.Writer(
315
316
            self.label, outdir=self.outdir, duration=duration, h0=1, sqrtSX=1
        )
317
318
319
        Writer.make_data()

        search = pyfstat.SemiCoherentSearch(
320
321
322
323
324
325
326
327
            label=self.label,
            outdir=self.outdir,
            nsegs=2,
            sftfilepattern="{}/*{}*sft".format(Writer.outdir, Writer.label),
            tref=Writer.tref,
            minStartTime=Writer.tstart,
            maxStartTime=Writer.tend,
        )
328
329

        search.get_semicoherent_twoF(
330
331
332
333
334
335
336
            Writer.F0,
            Writer.F1,
            Writer.F2,
            Writer.Alpha,
            Writer.Delta,
            record_segments=True,
        )
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353

        # Compute the predicted semi-coherent Fstat
        minStartTime = Writer.tstart
        maxStartTime = Writer.tend

        Writer.maxStartTime = minStartTime + duration / 2.0
        FSA = Writer.predict_fstat()

        Writer.tstart = minStartTime + duration / 2.0
        Writer.tend = maxStartTime
        FSB = Writer.predict_fstat()

        FSs = np.array([FSA, FSB])
        diffs = (np.array(search.detStat_per_segment) - FSs) / FSs
        self.assertTrue(np.all(diffs < 0.3))

    def test_get_semicoherent_BSGL(self):
354
        duration = 10 * 86400
355
        Writer = pyfstat.Writer(
356
357
            self.label, outdir=self.outdir, duration=duration, detectors="H1,L1"
        )
358
359
360
        Writer.make_data()

        search = pyfstat.SemiCoherentSearch(
361
362
363
364
365
366
367
368
369
            label=self.label,
            outdir=self.outdir,
            nsegs=2,
            sftfilepattern="{}/*{}*sft".format(Writer.outdir, Writer.label),
            tref=Writer.tref,
            minStartTime=Writer.tstart,
            maxStartTime=Writer.tend,
            BSGL=True,
        )
370
371

        BSGL = search.get_semicoherent_twoF(
372
373
374
375
376
377
378
            Writer.F0,
            Writer.F1,
            Writer.F2,
            Writer.Alpha,
            Writer.Delta,
            record_segments=True,
        )
379
        self.assertTrue(BSGL > 0)
380
381


382
class SemiCoherentGlitchSearch(Test):
Gregory Ashton's avatar
Gregory Ashton committed
383
    label = "TestSemiCoherentGlitchSearch"
Gregory Ashton's avatar
Gregory Ashton committed
384

385
    def test_get_semicoherent_nglitch_twoF(self):
386
387
        duration = 10 * 86400
        dtglitch = 0.5 * duration
388
        delta_F0 = 0
Gregory Ashton's avatar
Gregory Ashton committed
389
390
        h0 = 1
        sqrtSX = 1
391
        Writer = pyfstat.GlitchWriter(
392
393
394
395
396
397
398
399
            self.label,
            outdir=self.outdir,
            duration=duration,
            dtglitch=dtglitch,
            delta_F0=delta_F0,
            sqrtSX=sqrtSX,
            h0=h0,
        )
400
401
402
403

        Writer.make_data()

        search = pyfstat.SemiCoherentGlitchSearch(
404
405
406
407
408
409
410
411
            label=self.label,
            outdir=self.outdir,
            sftfilepattern="{}/*{}*sft".format(Writer.outdir, Writer.label),
            tref=Writer.tref,
            minStartTime=Writer.tstart,
            maxStartTime=Writer.tend,
            nglitch=1,
        )
412

413
        FS = search.get_semicoherent_nglitch_twoF(
414
415
416
417
418
419
420
421
422
            Writer.F0,
            Writer.F1,
            Writer.F2,
            Writer.Alpha,
            Writer.Delta,
            Writer.delta_F0,
            Writer.delta_F1,
            search.minStartTime + dtglitch,
        )
423
424

        # Compute the predicted semi-coherent glitch Fstat
425
426
        minStartTime = Writer.tstart
        maxStartTime = Writer.tend
427

428
        Writer.maxStartTime = minStartTime + dtglitch
429
430
        FSA = Writer.predict_fstat()

431
432
        Writer.tstart = minStartTime + dtglitch
        Writer.tend = maxStartTime
433
434
        FSB = Writer.predict_fstat()

435
        print(FSA, FSB)
436
        predicted_FS = FSA + FSB
437

438
        print((predicted_FS, FS))
439
        self.assertTrue(np.abs((FS - predicted_FS)) / predicted_FS < 0.3)
440
441


442
class MCMCSearch(Test):
Gregory Ashton's avatar
Gregory Ashton committed
443
    label = "TestMCMCSearch"
444
445

    def test_fully_coherent(self):
Gregory Ashton's avatar
Gregory Ashton committed
446
447
        h0 = 1
        sqrtSX = 1
448
449
450
        F0 = 30
        F1 = -1e-10
        F2 = 0
451
        minStartTime = 700000000
Gregory Ashton's avatar
Gregory Ashton committed
452
        duration = 1 * 86400
453
        maxStartTime = minStartTime + duration
454
455
        Alpha = 5e-3
        Delta = 1.2
456
        tref = minStartTime
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
        Writer = pyfstat.Writer(
            F0=F0,
            F1=F1,
            F2=F2,
            label=self.label,
            h0=h0,
            sqrtSX=sqrtSX,
            outdir=self.outdir,
            tstart=minStartTime,
            Alpha=Alpha,
            Delta=Delta,
            tref=tref,
            duration=duration,
            Band=4,
        )
472
473
474
475

        Writer.make_data()
        predicted_FS = Writer.predict_fstat()

476
477
478
479
480
481
482
        theta = {
            "F0": {"type": "norm", "loc": F0, "scale": np.abs(1e-10 * F0)},
            "F1": {"type": "norm", "loc": F1, "scale": np.abs(1e-10 * F1)},
            "F2": F2,
            "Alpha": Alpha,
            "Delta": Delta,
        }
483

Gregory Ashton's avatar
Gregory Ashton committed
484
        search = pyfstat.MCMCSearch(
485
486
487
488
489
490
491
492
493
494
495
496
            label=self.label,
            outdir=self.outdir,
            theta_prior=theta,
            tref=tref,
            sftfilepattern="{}/*{}*sft".format(Writer.outdir, Writer.label),
            minStartTime=minStartTime,
            maxStartTime=maxStartTime,
            nsteps=[100, 100],
            nwalkers=100,
            ntemps=2,
            log10beta_min=-1,
        )
497
        search.run(create_plots=False)
498
499
        _, FS = search.get_max_twoF()

500
        print(("Predicted twoF is {} while recovered is {}".format(predicted_FS, FS)))
Gregory Ashton's avatar
Gregory Ashton committed
501
        self.assertTrue(
502
503
            FS > predicted_FS or np.abs((FS - predicted_FS)) / predicted_FS < 0.3
        )
504
505


506
507
508
509
510
511
512
class GridSearch(Test):
    F0s = [29, 31, 0.1]
    F1s = [-1e-10, 0, 1e-11]
    tref = 700000000

    def test_grid_search(self):
        search = pyfstat.GridSearch(
513
514
515
516
517
518
519
520
521
522
            "grid_search",
            self.outdir,
            self.sftfilepath,
            F0s=self.F0s,
            F1s=[0],
            F2s=[0],
            Alphas=[0],
            Deltas=[0],
            tref=self.tref,
        )
523
524
525
526
527
        search.run()
        self.assertTrue(os.path.isfile(search.out_file))

    def test_semicoherent_grid_search(self):
        search = pyfstat.GridSearch(
528
529
530
531
532
533
534
535
536
537
538
            "sc_grid_search",
            self.outdir,
            self.sftfilepath,
            F0s=self.F0s,
            F1s=[0],
            F2s=[0],
            Alphas=[0],
            Deltas=[0],
            tref=self.tref,
            nsegs=2,
        )
539
540
541
542
543
        search.run()
        self.assertTrue(os.path.isfile(search.out_file))

    def test_slice_grid_search(self):
        search = pyfstat.SliceGridSearch(
544
545
546
547
548
549
550
551
552
553
554
            "slice_grid_search",
            self.outdir,
            self.sftfilepath,
            F0s=self.F0s,
            F1s=self.F1s,
            F2s=[0],
            Alphas=[0],
            Deltas=[0],
            tref=self.tref,
            Lambda0=[30, 0, 0, 0],
        )
555
556
        fig, axes = search.run(save=False)
        self.assertTrue(fig is not None)
557
558
559

    def test_glitch_grid_search(self):
        search = pyfstat.GridGlitchSearch(
560
561
562
563
564
565
566
567
568
569
570
            "grid_grid_search",
            self.outdir,
            self.sftfilepath,
            F0s=self.F0s,
            F1s=self.F1s,
            F2s=[0],
            Alphas=[0],
            Deltas=[0],
            tref=self.tref,
            tglitchs=[self.tref],
        )
571
572
573
574
575
        search.run()
        self.assertTrue(os.path.isfile(search.out_file))

    def test_sliding_window(self):
        search = pyfstat.FrequencySlidingWindow(
576
577
578
579
580
581
582
583
584
585
586
587
            "grid_grid_search",
            self.outdir,
            self.sftfilepath,
            F0s=self.F0s,
            F1=0,
            F2=0,
            Alpha=0,
            Delta=0,
            tref=self.tref,
            minStartTime=self.minStartTime,
            maxStartTime=self.maxStartTime,
        )
588
589
590
        search.run()
        self.assertTrue(os.path.isfile(search.out_file))

591
592

if __name__ == "__main__":
593
    unittest.main()