tests.py 16.8 KB
Newer Older
1 2 3
import unittest
import numpy as np
import os
Gregory Ashton's avatar
Gregory Ashton committed
4 5
import shutil
import pyfstat
6
import lalpulsar
7
import logging
8
import time
9

Gregory Ashton's avatar
Gregory Ashton committed
10
class Test(unittest.TestCase):
11
    outdir = "TestData"
Gregory Ashton's avatar
Gregory Ashton committed
12

Gregory Ashton's avatar
Gregory Ashton committed
13
    @classmethod
Gregory Ashton's avatar
Gregory Ashton committed
14 15
    def setUpClass(self):
        if os.path.isdir(self.outdir):
16 17 18
            try:
                shutil.rmtree(self.outdir)
            except OSError:
19
                logging.warning("{} not removed prior to tests".format(self.outdir))
20 21 22 23 24 25 26 27 28 29
        h0 = 1
        sqrtSX = 1
        F0 = 30
        F1 = -1e-10
        F2 = 0
        minStartTime = 700000000
        duration = 2 * 86400
        Alpha = 5e-3
        Delta = 1.2
        tref = minStartTime
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
        Writer = pyfstat.Writer(
            F0=F0,
            F1=F1,
            F2=F2,
            label="test",
            h0=h0,
            sqrtSX=sqrtSX,
            outdir=self.outdir,
            tstart=minStartTime,
            Alpha=Alpha,
            Delta=Delta,
            tref=tref,
            duration=duration,
            Band=4,
        )
45 46 47 48 49
        Writer.make_data()
        self.sftfilepath = Writer.sftfilepath
        self.minStartTime = minStartTime
        self.maxStartTime = minStartTime + duration
        self.duration = duration
50

Gregory Ashton's avatar
Gregory Ashton committed
51
    @classmethod
Gregory Ashton's avatar
Gregory Ashton committed
52 53
    def tearDownClass(self):
        if os.path.isdir(self.outdir):
54 55 56
            try:
                shutil.rmtree(self.outdir)
            except OSError:
57
                logging.warning("{} not removed prior to tests".format(self.outdir))
Gregory Ashton's avatar
Gregory Ashton committed
58 59


60
class Writer(Test):
Gregory Ashton's avatar
Gregory Ashton committed
61
    label = "TestWriter"
62 63

    def test_make_cff(self):
Gregory Ashton's avatar
Gregory Ashton committed
64
        Writer = pyfstat.Writer(self.label, outdir=self.outdir)
65
        Writer.make_cff()
66
        self.assertTrue(os.path.isfile("./{}/{}.cff".format(self.outdir, self.label)))
67 68

    def test_run_makefakedata(self):
69
        Writer = pyfstat.Writer(self.label, outdir=self.outdir, duration=3600)
70 71
        Writer.make_cff()
        Writer.run_makefakedata()
72 73 74 75 76
        self.assertTrue(
            os.path.isfile(
                "./{}/H-2_H1_1800SFT_TestWriter-700000000-3600.sft".format(self.outdir)
            )
        )
77 78

    def test_makefakedata_usecached(self):
79
        Writer = pyfstat.Writer(self.label, outdir=self.outdir, duration=3600)
80 81
        if os.path.isfile(Writer.sftfilepath):
            os.remove(Writer.sftfilepath)
Gregory Ashton's avatar
Gregory Ashton committed
82
        Writer.make_cff()
83
        Writer.run_makefakedata()
84
        time_first = os.path.getmtime(Writer.sftfilepath)
85
        Writer.run_makefakedata()
86
        time_second = os.path.getmtime(Writer.sftfilepath)
87
        self.assertTrue(time_first == time_second)
88
        time.sleep(1)  # make sure timestamp is actually different!
89
        os.system("touch {}".format(Writer.config_file_name))
90
        Writer.run_makefakedata()
91
        time_third = os.path.getmtime(Writer.sftfilepath)
92 93 94
        self.assertFalse(time_first == time_third)


95
class Bunch(Test):
Gregory Ashton's avatar
Gregory Ashton committed
96 97 98 99 100
    def test_bunch(self):
        b = pyfstat.core.Bunch(dict(x=10))
        self.assertTrue(b.x == 10)


101
class par(Test):
102
    label = "TestPar"
Gregory Ashton's avatar
Gregory Ashton committed
103 104

    def test(self):
105
        os.system('echo "x=100\ny=10" > {}/{}.par'.format(self.outdir, self.label))
Gregory Ashton's avatar
Gregory Ashton committed
106 107

        par = pyfstat.core.read_par(
108 109
            "{}/{}.par".format(self.outdir, self.label), return_type="Bunch"
        )
Gregory Ashton's avatar
Gregory Ashton committed
110 111 112
        self.assertTrue(par.x == 100)
        self.assertTrue(par.y == 10)

113 114 115 116 117 118
        par = pyfstat.core.read_par(
            outdir=self.outdir, label=self.label, return_type="dict"
        )
        self.assertTrue(par["x"] == 100)
        self.assertTrue(par["y"] == 10)
        os.system("rm -r {}".format(self.outdir))
Gregory Ashton's avatar
Gregory Ashton committed
119 120


121
class BaseSearchClass(Test):
122 123 124
    def test_shift_matrix(self):
        BSC = pyfstat.BaseSearchClass()
        dT = 10
125
        a = BSC._shift_matrix(4, dT)
126 127 128 129 130 131 132 133 134 135 136 137 138
        b = np.array(
            [
                [
                    1,
                    2 * np.pi * dT,
                    2 * np.pi * dT ** 2 / 2.0,
                    2 * np.pi * dT ** 3 / 6.0,
                ],
                [0, 1, dT, dT ** 2 / 2.0],
                [0, 0, 1, dT],
                [0, 0, 0, 1],
            ]
        )
139 140 141 142
        self.assertTrue(np.array_equal(a, b))

    def test_shift_coefficients(self):
        BSC = pyfstat.BaseSearchClass()
143
        thetaA = np.array([10.0, 1e2, 10.0, 1e2])
144 145 146 147 148
        dT = 100

        # Calculate the 'long' way
        thetaB = np.zeros(len(thetaA))
        thetaB[3] = thetaA[3]
149 150 151 152 153
        thetaB[2] = thetaA[2] + thetaA[3] * dT
        thetaB[1] = thetaA[1] + thetaA[2] * dT + 0.5 * thetaA[3] * dT ** 2
        thetaB[0] = thetaA[0] + 2 * np.pi * (
            thetaA[1] * dT + 0.5 * thetaA[2] * dT ** 2 + thetaA[3] * dT ** 3 / 6.0
        )
154

155
        self.assertTrue(np.array_equal(thetaB, BSC._shift_coefficients(thetaA, dT)))
156 157 158

    def test_shift_coefficients_loop(self):
        BSC = pyfstat.BaseSearchClass()
159
        thetaA = np.array([10.0, 1e2, 10.0, 1e2])
160
        dT = 1e1
161
        thetaB = BSC._shift_coefficients(thetaA, dT)
162 163
        self.assertTrue(
            np.allclose(
164 165 166
                thetaA, BSC._shift_coefficients(thetaB, -dT), rtol=1e-9, atol=1e-9
            )
        )
167 168


169
class ComputeFstat(Test):
Gregory Ashton's avatar
Gregory Ashton committed
170
    label = "TestComputeFstat"
171 172

    def test_run_computefstatistic_single_point(self):
173 174 175 176 177 178 179 180
        Writer = pyfstat.Writer(
            self.label,
            outdir=self.outdir,
            duration=86400,
            h0=1,
            sqrtSX=1,
            detectors="H1",
        )
181 182 183
        Writer.make_data()
        predicted_FS = Writer.predict_fstat()

184 185
        search_H1L1 = pyfstat.ComputeFstat(
            tref=Writer.tref,
186 187
            sftfilepattern="{}/*{}*sft".format(Writer.outdir, Writer.label),
        )
188
        FS = search_H1L1.get_fullycoherent_twoF(
189 190 191 192 193 194 195 196 197 198 199
            Writer.tstart,
            Writer.tend,
            Writer.F0,
            Writer.F1,
            Writer.F2,
            Writer.Alpha,
            Writer.Delta,
        )
        self.assertTrue(np.abs(predicted_FS - FS) / FS < 0.3)

        Writer.detectors = "H1"
200
        predicted_FS = Writer.predict_fstat()
201
        search_H1 = pyfstat.ComputeFstat(
202 203 204 205 206
            tref=Writer.tref,
            detectors="H1",
            sftfilepattern="{}/*{}*sft".format(Writer.outdir, Writer.label),
            SSBprec=lalpulsar.SSBPREC_RELATIVISTIC,
        )
207
        FS = search_H1.get_fullycoherent_twoF(
208 209 210 211 212 213 214 215 216
            Writer.tstart,
            Writer.tend,
            Writer.F0,
            Writer.F1,
            Writer.F2,
            Writer.Alpha,
            Writer.Delta,
        )
        self.assertTrue(np.abs(predicted_FS - FS) / FS < 0.3)
217

218
    def run_computefstatistic_single_point_no_noise(self):
219
        Writer = pyfstat.Writer(
220 221 222 223 224 225 226
            self.label,
            outdir=self.outdir,
            add_noise=False,
            duration=86400,
            h0=1,
            sqrtSX=1,
        )
227 228 229 230
        Writer.make_data()
        predicted_FS = Writer.predict_fstat()

        search = pyfstat.ComputeFstat(
231 232 233 234
            tref=Writer.tref,
            assumeSqrtSX=1,
            sftfilepattern="{}/*{}*sft".format(Writer.outdir, Writer.label),
        )
235
        FS = search.get_fullycoherent_twoF(
236 237 238 239 240 241 242 243 244
            Writer.tstart,
            Writer.tend,
            Writer.F0,
            Writer.F1,
            Writer.F2,
            Writer.Alpha,
            Writer.Delta,
        )
        self.assertTrue(np.abs(predicted_FS - FS) / FS < 0.3)
Gregory Ashton's avatar
Gregory Ashton committed
245

246
    def test_injectSources(self):
247 248
        # This seems to be writing with a signal...
        Writer = pyfstat.Writer(
249 250 251 252 253 254 255
            self.label,
            outdir=self.outdir,
            add_noise=False,
            duration=86400,
            h0=1,
            sqrtSX=1,
        )
256 257 258 259
        Writer.make_cff()
        injectSources = Writer.config_file_name

        search = pyfstat.ComputeFstat(
260 261 262 263 264 265 266 267 268
            tref=Writer.tref,
            assumeSqrtSX=1,
            injectSources=injectSources,
            minCoverFreq=28,
            maxCoverFreq=32,
            minStartTime=Writer.tstart,
            maxStartTime=Writer.tstart + Writer.duration,
            detectors=Writer.detectors,
        )
269
        FS_from_file = search.get_fullycoherent_twoF(
270 271 272 273 274 275 276 277
            Writer.tstart,
            Writer.tend,
            Writer.F0,
            Writer.F1,
            Writer.F2,
            Writer.Alpha,
            Writer.Delta,
        )
278 279
        Writer.make_data()
        predicted_FS = Writer.predict_fstat()
280
        self.assertTrue(np.abs(predicted_FS - FS_from_file) / FS_from_file < 0.3)
281 282

        injectSourcesdict = pyfstat.core.read_par(Writer.config_file_name)
283 284 285
        injectSourcesdict["F0"] = injectSourcesdict["Freq"]
        injectSourcesdict["F1"] = injectSourcesdict["f1dot"]
        injectSourcesdict["F2"] = injectSourcesdict["f2dot"]
286
        search = pyfstat.ComputeFstat(
287 288 289 290 291 292 293 294 295
            tref=Writer.tref,
            assumeSqrtSX=1,
            injectSources=injectSourcesdict,
            minCoverFreq=28,
            maxCoverFreq=32,
            minStartTime=Writer.tstart,
            maxStartTime=Writer.tstart + Writer.duration,
            detectors=Writer.detectors,
        )
296
        FS_from_dict = search.get_fullycoherent_twoF(
297 298 299 300 301 302 303 304
            Writer.tstart,
            Writer.tend,
            Writer.F0,
            Writer.F1,
            Writer.F2,
            Writer.Alpha,
            Writer.Delta,
        )
305 306 307
        self.assertTrue(FS_from_dict == FS_from_file)


308
class SemiCoherentSearch(Test):
309 310 311
    label = "TestSemiCoherentSearch"

    def test_get_semicoherent_twoF(self):
312
        duration = 10 * 86400
313
        Writer = pyfstat.Writer(
314 315
            self.label, outdir=self.outdir, duration=duration, h0=1, sqrtSX=1
        )
316 317 318
        Writer.make_data()

        search = pyfstat.SemiCoherentSearch(
319 320 321 322 323 324 325 326
            label=self.label,
            outdir=self.outdir,
            nsegs=2,
            sftfilepattern="{}/*{}*sft".format(Writer.outdir, Writer.label),
            tref=Writer.tref,
            minStartTime=Writer.tstart,
            maxStartTime=Writer.tend,
        )
327 328

        search.get_semicoherent_twoF(
329 330 331 332 333 334 335
            Writer.F0,
            Writer.F1,
            Writer.F2,
            Writer.Alpha,
            Writer.Delta,
            record_segments=True,
        )
336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352

        # Compute the predicted semi-coherent Fstat
        minStartTime = Writer.tstart
        maxStartTime = Writer.tend

        Writer.maxStartTime = minStartTime + duration / 2.0
        FSA = Writer.predict_fstat()

        Writer.tstart = minStartTime + duration / 2.0
        Writer.tend = maxStartTime
        FSB = Writer.predict_fstat()

        FSs = np.array([FSA, FSB])
        diffs = (np.array(search.detStat_per_segment) - FSs) / FSs
        self.assertTrue(np.all(diffs < 0.3))

    def test_get_semicoherent_BSGL(self):
353
        duration = 10 * 86400
354
        Writer = pyfstat.Writer(
355 356
            self.label, outdir=self.outdir, duration=duration, detectors="H1,L1"
        )
357 358 359
        Writer.make_data()

        search = pyfstat.SemiCoherentSearch(
360 361 362 363 364 365 366 367 368
            label=self.label,
            outdir=self.outdir,
            nsegs=2,
            sftfilepattern="{}/*{}*sft".format(Writer.outdir, Writer.label),
            tref=Writer.tref,
            minStartTime=Writer.tstart,
            maxStartTime=Writer.tend,
            BSGL=True,
        )
369 370

        BSGL = search.get_semicoherent_twoF(
371 372 373 374 375 376 377
            Writer.F0,
            Writer.F1,
            Writer.F2,
            Writer.Alpha,
            Writer.Delta,
            record_segments=True,
        )
378
        self.assertTrue(BSGL > 0)
379 380


381
class SemiCoherentGlitchSearch(Test):
Gregory Ashton's avatar
Gregory Ashton committed
382
    label = "TestSemiCoherentGlitchSearch"
Gregory Ashton's avatar
Gregory Ashton committed
383

384
    def test_get_semicoherent_nglitch_twoF(self):
385 386
        duration = 10 * 86400
        dtglitch = 0.5 * duration
387
        delta_F0 = 0
Gregory Ashton's avatar
Gregory Ashton committed
388 389
        h0 = 1
        sqrtSX = 1
390
        Writer = pyfstat.GlitchWriter(
391 392 393 394 395 396 397 398
            self.label,
            outdir=self.outdir,
            duration=duration,
            dtglitch=dtglitch,
            delta_F0=delta_F0,
            sqrtSX=sqrtSX,
            h0=h0,
        )
399 400 401 402

        Writer.make_data()

        search = pyfstat.SemiCoherentGlitchSearch(
403 404 405 406 407 408 409 410
            label=self.label,
            outdir=self.outdir,
            sftfilepattern="{}/*{}*sft".format(Writer.outdir, Writer.label),
            tref=Writer.tref,
            minStartTime=Writer.tstart,
            maxStartTime=Writer.tend,
            nglitch=1,
        )
411

412
        FS = search.get_semicoherent_nglitch_twoF(
413 414 415 416 417 418 419 420 421
            Writer.F0,
            Writer.F1,
            Writer.F2,
            Writer.Alpha,
            Writer.Delta,
            Writer.delta_F0,
            Writer.delta_F1,
            search.minStartTime + dtglitch,
        )
422 423

        # Compute the predicted semi-coherent glitch Fstat
424 425
        minStartTime = Writer.tstart
        maxStartTime = Writer.tend
426

427
        Writer.maxStartTime = minStartTime + dtglitch
428 429
        FSA = Writer.predict_fstat()

430 431
        Writer.tstart = minStartTime + dtglitch
        Writer.tend = maxStartTime
432 433
        FSB = Writer.predict_fstat()

434
        print(FSA, FSB)
435
        predicted_FS = FSA + FSB
436

437
        print((predicted_FS, FS))
438
        self.assertTrue(np.abs((FS - predicted_FS)) / predicted_FS < 0.3)
439 440


441
class MCMCSearch(Test):
Gregory Ashton's avatar
Gregory Ashton committed
442
    label = "TestMCMCSearch"
443 444

    def test_fully_coherent(self):
Gregory Ashton's avatar
Gregory Ashton committed
445 446
        h0 = 1
        sqrtSX = 1
447 448 449
        F0 = 30
        F1 = -1e-10
        F2 = 0
450
        minStartTime = 700000000
Gregory Ashton's avatar
Gregory Ashton committed
451
        duration = 1 * 86400
452
        maxStartTime = minStartTime + duration
453 454
        Alpha = 5e-3
        Delta = 1.2
455
        tref = minStartTime
456 457 458 459 460 461 462 463 464 465 466 467 468 469 470
        Writer = pyfstat.Writer(
            F0=F0,
            F1=F1,
            F2=F2,
            label=self.label,
            h0=h0,
            sqrtSX=sqrtSX,
            outdir=self.outdir,
            tstart=minStartTime,
            Alpha=Alpha,
            Delta=Delta,
            tref=tref,
            duration=duration,
            Band=4,
        )
471 472 473 474

        Writer.make_data()
        predicted_FS = Writer.predict_fstat()

475 476 477 478 479 480 481
        theta = {
            "F0": {"type": "norm", "loc": F0, "scale": np.abs(1e-10 * F0)},
            "F1": {"type": "norm", "loc": F1, "scale": np.abs(1e-10 * F1)},
            "F2": F2,
            "Alpha": Alpha,
            "Delta": Delta,
        }
482

Gregory Ashton's avatar
Gregory Ashton committed
483
        search = pyfstat.MCMCSearch(
484 485 486 487 488 489 490 491 492 493 494 495
            label=self.label,
            outdir=self.outdir,
            theta_prior=theta,
            tref=tref,
            sftfilepattern="{}/*{}*sft".format(Writer.outdir, Writer.label),
            minStartTime=minStartTime,
            maxStartTime=maxStartTime,
            nsteps=[100, 100],
            nwalkers=100,
            ntemps=2,
            log10beta_min=-1,
        )
496
        search.run(create_plots=False)
497 498
        _, FS = search.get_max_twoF()

499
        print(("Predicted twoF is {} while recovered is {}".format(predicted_FS, FS)))
Gregory Ashton's avatar
Gregory Ashton committed
500
        self.assertTrue(
501 502
            FS > predicted_FS or np.abs((FS - predicted_FS)) / predicted_FS < 0.3
        )
503 504


505 506 507 508 509 510 511
class GridSearch(Test):
    F0s = [29, 31, 0.1]
    F1s = [-1e-10, 0, 1e-11]
    tref = 700000000

    def test_grid_search(self):
        search = pyfstat.GridSearch(
512 513 514 515 516 517 518 519 520 521
            "grid_search",
            self.outdir,
            self.sftfilepath,
            F0s=self.F0s,
            F1s=[0],
            F2s=[0],
            Alphas=[0],
            Deltas=[0],
            tref=self.tref,
        )
522 523 524 525 526
        search.run()
        self.assertTrue(os.path.isfile(search.out_file))

    def test_semicoherent_grid_search(self):
        search = pyfstat.GridSearch(
527 528 529 530 531 532 533 534 535 536 537
            "sc_grid_search",
            self.outdir,
            self.sftfilepath,
            F0s=self.F0s,
            F1s=[0],
            F2s=[0],
            Alphas=[0],
            Deltas=[0],
            tref=self.tref,
            nsegs=2,
        )
538 539 540 541 542
        search.run()
        self.assertTrue(os.path.isfile(search.out_file))

    def test_slice_grid_search(self):
        search = pyfstat.SliceGridSearch(
543 544 545 546 547 548 549 550 551 552 553
            "slice_grid_search",
            self.outdir,
            self.sftfilepath,
            F0s=self.F0s,
            F1s=self.F1s,
            F2s=[0],
            Alphas=[0],
            Deltas=[0],
            tref=self.tref,
            Lambda0=[30, 0, 0, 0],
        )
554 555
        fig, axes = search.run(save=False)
        self.assertTrue(fig is not None)
556 557 558

    def test_glitch_grid_search(self):
        search = pyfstat.GridGlitchSearch(
559 560 561 562 563 564 565 566 567 568 569
            "grid_grid_search",
            self.outdir,
            self.sftfilepath,
            F0s=self.F0s,
            F1s=self.F1s,
            F2s=[0],
            Alphas=[0],
            Deltas=[0],
            tref=self.tref,
            tglitchs=[self.tref],
        )
570 571 572 573 574
        search.run()
        self.assertTrue(os.path.isfile(search.out_file))

    def test_sliding_window(self):
        search = pyfstat.FrequencySlidingWindow(
575 576 577 578 579 580 581 582 583 584 585 586
            "grid_grid_search",
            self.outdir,
            self.sftfilepath,
            F0s=self.F0s,
            F1=0,
            F2=0,
            Alpha=0,
            Delta=0,
            tref=self.tref,
            minStartTime=self.minStartTime,
            maxStartTime=self.maxStartTime,
        )
587 588 589
        search.run()
        self.assertTrue(os.path.isfile(search.out_file))

590 591

if __name__ == "__main__":
592
    unittest.main()