tests.py 16.8 KB
Newer Older
1 2 3
import unittest
import numpy as np
import os
Gregory Ashton's avatar
Gregory Ashton committed
4 5
import shutil
import pyfstat
6
import lalpulsar
7
import logging
8
import time
9

10

Gregory Ashton's avatar
Gregory Ashton committed
11
class Test(unittest.TestCase):
12
    outdir = "TestData"
Gregory Ashton's avatar
Gregory Ashton committed
13

Gregory Ashton's avatar
Gregory Ashton committed
14
    @classmethod
Gregory Ashton's avatar
Gregory Ashton committed
15 16
    def setUpClass(self):
        if os.path.isdir(self.outdir):
17 18 19
            try:
                shutil.rmtree(self.outdir)
            except OSError:
20
                logging.warning("{} not removed prior to tests".format(self.outdir))
21 22 23 24 25 26 27 28 29 30
        h0 = 1
        sqrtSX = 1
        F0 = 30
        F1 = -1e-10
        F2 = 0
        minStartTime = 700000000
        duration = 2 * 86400
        Alpha = 5e-3
        Delta = 1.2
        tref = minStartTime
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
        Writer = pyfstat.Writer(
            F0=F0,
            F1=F1,
            F2=F2,
            label="test",
            h0=h0,
            sqrtSX=sqrtSX,
            outdir=self.outdir,
            tstart=minStartTime,
            Alpha=Alpha,
            Delta=Delta,
            tref=tref,
            duration=duration,
            Band=4,
        )
46 47 48 49 50
        Writer.make_data()
        self.sftfilepath = Writer.sftfilepath
        self.minStartTime = minStartTime
        self.maxStartTime = minStartTime + duration
        self.duration = duration
51

Gregory Ashton's avatar
Gregory Ashton committed
52
    @classmethod
Gregory Ashton's avatar
Gregory Ashton committed
53 54
    def tearDownClass(self):
        if os.path.isdir(self.outdir):
55 56 57
            try:
                shutil.rmtree(self.outdir)
            except OSError:
58
                logging.warning("{} not removed prior to tests".format(self.outdir))
Gregory Ashton's avatar
Gregory Ashton committed
59 60


61
class Writer(Test):
Gregory Ashton's avatar
Gregory Ashton committed
62
    label = "TestWriter"
63 64

    def test_make_cff(self):
Gregory Ashton's avatar
Gregory Ashton committed
65
        Writer = pyfstat.Writer(self.label, outdir=self.outdir)
66
        Writer.make_cff()
67
        self.assertTrue(os.path.isfile("./{}/{}.cff".format(self.outdir, self.label)))
68 69

    def test_run_makefakedata(self):
70
        Writer = pyfstat.Writer(self.label, outdir=self.outdir, duration=3600)
71 72
        Writer.make_cff()
        Writer.run_makefakedata()
73 74 75 76 77
        self.assertTrue(
            os.path.isfile(
                "./{}/H-2_H1_1800SFT_TestWriter-700000000-3600.sft".format(self.outdir)
            )
        )
78 79

    def test_makefakedata_usecached(self):
80
        Writer = pyfstat.Writer(self.label, outdir=self.outdir, duration=3600)
81 82
        if os.path.isfile(Writer.sftfilepath):
            os.remove(Writer.sftfilepath)
Gregory Ashton's avatar
Gregory Ashton committed
83
        Writer.make_cff()
84
        Writer.run_makefakedata()
85
        time_first = os.path.getmtime(Writer.sftfilepath)
86
        Writer.run_makefakedata()
87
        time_second = os.path.getmtime(Writer.sftfilepath)
88
        self.assertTrue(time_first == time_second)
89
        time.sleep(1)  # make sure timestamp is actually different!
90
        os.system("touch {}".format(Writer.config_file_name))
91
        Writer.run_makefakedata()
92
        time_third = os.path.getmtime(Writer.sftfilepath)
93 94 95
        self.assertFalse(time_first == time_third)


96
class Bunch(Test):
Gregory Ashton's avatar
Gregory Ashton committed
97 98 99 100 101
    def test_bunch(self):
        b = pyfstat.core.Bunch(dict(x=10))
        self.assertTrue(b.x == 10)


102
class par(Test):
103
    label = "TestPar"
Gregory Ashton's avatar
Gregory Ashton committed
104 105

    def test(self):
106
        os.system('echo "x=100\ny=10" > {}/{}.par'.format(self.outdir, self.label))
Gregory Ashton's avatar
Gregory Ashton committed
107 108

        par = pyfstat.core.read_par(
109 110
            "{}/{}.par".format(self.outdir, self.label), return_type="Bunch"
        )
Gregory Ashton's avatar
Gregory Ashton committed
111 112 113
        self.assertTrue(par.x == 100)
        self.assertTrue(par.y == 10)

114 115 116 117 118 119
        par = pyfstat.core.read_par(
            outdir=self.outdir, label=self.label, return_type="dict"
        )
        self.assertTrue(par["x"] == 100)
        self.assertTrue(par["y"] == 10)
        os.system("rm -r {}".format(self.outdir))
Gregory Ashton's avatar
Gregory Ashton committed
120 121


122
class BaseSearchClass(Test):
123 124 125
    def test_shift_matrix(self):
        BSC = pyfstat.BaseSearchClass()
        dT = 10
126
        a = BSC._shift_matrix(4, dT)
127 128 129 130 131 132 133 134 135 136 137 138 139
        b = np.array(
            [
                [
                    1,
                    2 * np.pi * dT,
                    2 * np.pi * dT ** 2 / 2.0,
                    2 * np.pi * dT ** 3 / 6.0,
                ],
                [0, 1, dT, dT ** 2 / 2.0],
                [0, 0, 1, dT],
                [0, 0, 0, 1],
            ]
        )
140 141 142 143
        self.assertTrue(np.array_equal(a, b))

    def test_shift_coefficients(self):
        BSC = pyfstat.BaseSearchClass()
144
        thetaA = np.array([10.0, 1e2, 10.0, 1e2])
145 146 147 148 149
        dT = 100

        # Calculate the 'long' way
        thetaB = np.zeros(len(thetaA))
        thetaB[3] = thetaA[3]
150 151 152 153 154
        thetaB[2] = thetaA[2] + thetaA[3] * dT
        thetaB[1] = thetaA[1] + thetaA[2] * dT + 0.5 * thetaA[3] * dT ** 2
        thetaB[0] = thetaA[0] + 2 * np.pi * (
            thetaA[1] * dT + 0.5 * thetaA[2] * dT ** 2 + thetaA[3] * dT ** 3 / 6.0
        )
155

156
        self.assertTrue(np.array_equal(thetaB, BSC._shift_coefficients(thetaA, dT)))
157 158 159

    def test_shift_coefficients_loop(self):
        BSC = pyfstat.BaseSearchClass()
160
        thetaA = np.array([10.0, 1e2, 10.0, 1e2])
161
        dT = 1e1
162
        thetaB = BSC._shift_coefficients(thetaA, dT)
163 164
        self.assertTrue(
            np.allclose(
165 166 167
                thetaA, BSC._shift_coefficients(thetaB, -dT), rtol=1e-9, atol=1e-9
            )
        )
168 169


170
class ComputeFstat(Test):
Gregory Ashton's avatar
Gregory Ashton committed
171
    label = "TestComputeFstat"
172 173

    def test_run_computefstatistic_single_point(self):
174 175 176 177 178 179 180 181
        Writer = pyfstat.Writer(
            self.label,
            outdir=self.outdir,
            duration=86400,
            h0=1,
            sqrtSX=1,
            detectors="H1",
        )
182 183 184
        Writer.make_data()
        predicted_FS = Writer.predict_fstat()

185 186
        search_H1L1 = pyfstat.ComputeFstat(
            tref=Writer.tref,
187 188
            sftfilepattern="{}/*{}*sft".format(Writer.outdir, Writer.label),
        )
189
        FS = search_H1L1.get_fullycoherent_twoF(
190 191 192 193 194 195 196 197 198 199 200
            Writer.tstart,
            Writer.tend,
            Writer.F0,
            Writer.F1,
            Writer.F2,
            Writer.Alpha,
            Writer.Delta,
        )
        self.assertTrue(np.abs(predicted_FS - FS) / FS < 0.3)

        Writer.detectors = "H1"
201
        predicted_FS = Writer.predict_fstat()
202
        search_H1 = pyfstat.ComputeFstat(
203 204 205 206 207
            tref=Writer.tref,
            detectors="H1",
            sftfilepattern="{}/*{}*sft".format(Writer.outdir, Writer.label),
            SSBprec=lalpulsar.SSBPREC_RELATIVISTIC,
        )
208
        FS = search_H1.get_fullycoherent_twoF(
209 210 211 212 213 214 215 216 217
            Writer.tstart,
            Writer.tend,
            Writer.F0,
            Writer.F1,
            Writer.F2,
            Writer.Alpha,
            Writer.Delta,
        )
        self.assertTrue(np.abs(predicted_FS - FS) / FS < 0.3)
218

219
    def run_computefstatistic_single_point_no_noise(self):
220
        Writer = pyfstat.Writer(
221 222 223 224 225 226 227
            self.label,
            outdir=self.outdir,
            add_noise=False,
            duration=86400,
            h0=1,
            sqrtSX=1,
        )
228 229 230 231
        Writer.make_data()
        predicted_FS = Writer.predict_fstat()

        search = pyfstat.ComputeFstat(
232 233 234 235
            tref=Writer.tref,
            assumeSqrtSX=1,
            sftfilepattern="{}/*{}*sft".format(Writer.outdir, Writer.label),
        )
236
        FS = search.get_fullycoherent_twoF(
237 238 239 240 241 242 243 244 245
            Writer.tstart,
            Writer.tend,
            Writer.F0,
            Writer.F1,
            Writer.F2,
            Writer.Alpha,
            Writer.Delta,
        )
        self.assertTrue(np.abs(predicted_FS - FS) / FS < 0.3)
Gregory Ashton's avatar
Gregory Ashton committed
246

247
    def test_injectSources(self):
248 249
        # This seems to be writing with a signal...
        Writer = pyfstat.Writer(
250 251 252 253 254 255 256
            self.label,
            outdir=self.outdir,
            add_noise=False,
            duration=86400,
            h0=1,
            sqrtSX=1,
        )
257 258 259 260
        Writer.make_cff()
        injectSources = Writer.config_file_name

        search = pyfstat.ComputeFstat(
261 262 263 264 265 266 267 268 269
            tref=Writer.tref,
            assumeSqrtSX=1,
            injectSources=injectSources,
            minCoverFreq=28,
            maxCoverFreq=32,
            minStartTime=Writer.tstart,
            maxStartTime=Writer.tstart + Writer.duration,
            detectors=Writer.detectors,
        )
270
        FS_from_file = search.get_fullycoherent_twoF(
271 272 273 274 275 276 277 278
            Writer.tstart,
            Writer.tend,
            Writer.F0,
            Writer.F1,
            Writer.F2,
            Writer.Alpha,
            Writer.Delta,
        )
279 280
        Writer.make_data()
        predicted_FS = Writer.predict_fstat()
281
        self.assertTrue(np.abs(predicted_FS - FS_from_file) / FS_from_file < 0.3)
282 283

        injectSourcesdict = pyfstat.core.read_par(Writer.config_file_name)
284 285 286
        injectSourcesdict["F0"] = injectSourcesdict["Freq"]
        injectSourcesdict["F1"] = injectSourcesdict["f1dot"]
        injectSourcesdict["F2"] = injectSourcesdict["f2dot"]
287
        search = pyfstat.ComputeFstat(
288 289 290 291 292 293 294 295 296
            tref=Writer.tref,
            assumeSqrtSX=1,
            injectSources=injectSourcesdict,
            minCoverFreq=28,
            maxCoverFreq=32,
            minStartTime=Writer.tstart,
            maxStartTime=Writer.tstart + Writer.duration,
            detectors=Writer.detectors,
        )
297
        FS_from_dict = search.get_fullycoherent_twoF(
298 299 300 301 302 303 304 305
            Writer.tstart,
            Writer.tend,
            Writer.F0,
            Writer.F1,
            Writer.F2,
            Writer.Alpha,
            Writer.Delta,
        )
306 307 308
        self.assertTrue(FS_from_dict == FS_from_file)


309
class SemiCoherentSearch(Test):
310 311 312
    label = "TestSemiCoherentSearch"

    def test_get_semicoherent_twoF(self):
313
        duration = 10 * 86400
314
        Writer = pyfstat.Writer(
315 316
            self.label, outdir=self.outdir, duration=duration, h0=1, sqrtSX=1
        )
317 318 319
        Writer.make_data()

        search = pyfstat.SemiCoherentSearch(
320 321 322 323 324 325 326 327
            label=self.label,
            outdir=self.outdir,
            nsegs=2,
            sftfilepattern="{}/*{}*sft".format(Writer.outdir, Writer.label),
            tref=Writer.tref,
            minStartTime=Writer.tstart,
            maxStartTime=Writer.tend,
        )
328 329

        search.get_semicoherent_twoF(
330 331 332 333 334 335 336
            Writer.F0,
            Writer.F1,
            Writer.F2,
            Writer.Alpha,
            Writer.Delta,
            record_segments=True,
        )
337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353

        # Compute the predicted semi-coherent Fstat
        minStartTime = Writer.tstart
        maxStartTime = Writer.tend

        Writer.maxStartTime = minStartTime + duration / 2.0
        FSA = Writer.predict_fstat()

        Writer.tstart = minStartTime + duration / 2.0
        Writer.tend = maxStartTime
        FSB = Writer.predict_fstat()

        FSs = np.array([FSA, FSB])
        diffs = (np.array(search.detStat_per_segment) - FSs) / FSs
        self.assertTrue(np.all(diffs < 0.3))

    def test_get_semicoherent_BSGL(self):
354
        duration = 10 * 86400
355
        Writer = pyfstat.Writer(
356 357
            self.label, outdir=self.outdir, duration=duration, detectors="H1,L1"
        )
358 359 360
        Writer.make_data()

        search = pyfstat.SemiCoherentSearch(
361 362 363 364 365 366 367 368 369
            label=self.label,
            outdir=self.outdir,
            nsegs=2,
            sftfilepattern="{}/*{}*sft".format(Writer.outdir, Writer.label),
            tref=Writer.tref,
            minStartTime=Writer.tstart,
            maxStartTime=Writer.tend,
            BSGL=True,
        )
370 371

        BSGL = search.get_semicoherent_twoF(
372 373 374 375 376 377 378
            Writer.F0,
            Writer.F1,
            Writer.F2,
            Writer.Alpha,
            Writer.Delta,
            record_segments=True,
        )
379
        self.assertTrue(BSGL > 0)
380 381


382
class SemiCoherentGlitchSearch(Test):
Gregory Ashton's avatar
Gregory Ashton committed
383
    label = "TestSemiCoherentGlitchSearch"
Gregory Ashton's avatar
Gregory Ashton committed
384

385
    def test_get_semicoherent_nglitch_twoF(self):
386 387
        duration = 10 * 86400
        dtglitch = 0.5 * duration
388
        delta_F0 = 0
Gregory Ashton's avatar
Gregory Ashton committed
389 390
        h0 = 1
        sqrtSX = 1
391
        Writer = pyfstat.GlitchWriter(
392 393 394 395 396 397 398 399
            self.label,
            outdir=self.outdir,
            duration=duration,
            dtglitch=dtglitch,
            delta_F0=delta_F0,
            sqrtSX=sqrtSX,
            h0=h0,
        )
400 401 402 403

        Writer.make_data()

        search = pyfstat.SemiCoherentGlitchSearch(
404 405 406 407 408 409 410 411
            label=self.label,
            outdir=self.outdir,
            sftfilepattern="{}/*{}*sft".format(Writer.outdir, Writer.label),
            tref=Writer.tref,
            minStartTime=Writer.tstart,
            maxStartTime=Writer.tend,
            nglitch=1,
        )
412

413
        FS = search.get_semicoherent_nglitch_twoF(
414 415 416 417 418 419 420 421 422
            Writer.F0,
            Writer.F1,
            Writer.F2,
            Writer.Alpha,
            Writer.Delta,
            Writer.delta_F0,
            Writer.delta_F1,
            search.minStartTime + dtglitch,
        )
423 424

        # Compute the predicted semi-coherent glitch Fstat
425 426
        minStartTime = Writer.tstart
        maxStartTime = Writer.tend
427

428
        Writer.maxStartTime = minStartTime + dtglitch
429 430
        FSA = Writer.predict_fstat()

431 432
        Writer.tstart = minStartTime + dtglitch
        Writer.tend = maxStartTime
433 434
        FSB = Writer.predict_fstat()

435
        print(FSA, FSB)
436
        predicted_FS = FSA + FSB
437

438
        print((predicted_FS, FS))
439
        self.assertTrue(np.abs((FS - predicted_FS)) / predicted_FS < 0.3)
440 441


442
class MCMCSearch(Test):
Gregory Ashton's avatar
Gregory Ashton committed
443
    label = "TestMCMCSearch"
444 445

    def test_fully_coherent(self):
Gregory Ashton's avatar
Gregory Ashton committed
446 447
        h0 = 1
        sqrtSX = 1
448 449 450
        F0 = 30
        F1 = -1e-10
        F2 = 0
451
        minStartTime = 700000000
Gregory Ashton's avatar
Gregory Ashton committed
452
        duration = 1 * 86400
453
        maxStartTime = minStartTime + duration
454 455
        Alpha = 5e-3
        Delta = 1.2
456
        tref = minStartTime
457 458 459 460 461 462 463 464 465 466 467 468 469 470 471
        Writer = pyfstat.Writer(
            F0=F0,
            F1=F1,
            F2=F2,
            label=self.label,
            h0=h0,
            sqrtSX=sqrtSX,
            outdir=self.outdir,
            tstart=minStartTime,
            Alpha=Alpha,
            Delta=Delta,
            tref=tref,
            duration=duration,
            Band=4,
        )
472 473 474 475

        Writer.make_data()
        predicted_FS = Writer.predict_fstat()

476 477 478 479 480 481 482
        theta = {
            "F0": {"type": "norm", "loc": F0, "scale": np.abs(1e-10 * F0)},
            "F1": {"type": "norm", "loc": F1, "scale": np.abs(1e-10 * F1)},
            "F2": F2,
            "Alpha": Alpha,
            "Delta": Delta,
        }
483

Gregory Ashton's avatar
Gregory Ashton committed
484
        search = pyfstat.MCMCSearch(
485 486 487 488 489 490 491 492 493 494 495 496
            label=self.label,
            outdir=self.outdir,
            theta_prior=theta,
            tref=tref,
            sftfilepattern="{}/*{}*sft".format(Writer.outdir, Writer.label),
            minStartTime=minStartTime,
            maxStartTime=maxStartTime,
            nsteps=[100, 100],
            nwalkers=100,
            ntemps=2,
            log10beta_min=-1,
        )
497
        search.run(create_plots=False)
498 499
        _, FS = search.get_max_twoF()

500
        print(("Predicted twoF is {} while recovered is {}".format(predicted_FS, FS)))
Gregory Ashton's avatar
Gregory Ashton committed
501
        self.assertTrue(
502 503
            FS > predicted_FS or np.abs((FS - predicted_FS)) / predicted_FS < 0.3
        )
504 505


506 507 508 509 510 511 512
class GridSearch(Test):
    F0s = [29, 31, 0.1]
    F1s = [-1e-10, 0, 1e-11]
    tref = 700000000

    def test_grid_search(self):
        search = pyfstat.GridSearch(
513 514 515 516 517 518 519 520 521 522
            "grid_search",
            self.outdir,
            self.sftfilepath,
            F0s=self.F0s,
            F1s=[0],
            F2s=[0],
            Alphas=[0],
            Deltas=[0],
            tref=self.tref,
        )
523 524 525 526 527
        search.run()
        self.assertTrue(os.path.isfile(search.out_file))

    def test_semicoherent_grid_search(self):
        search = pyfstat.GridSearch(
528 529 530 531 532 533 534 535 536 537 538
            "sc_grid_search",
            self.outdir,
            self.sftfilepath,
            F0s=self.F0s,
            F1s=[0],
            F2s=[0],
            Alphas=[0],
            Deltas=[0],
            tref=self.tref,
            nsegs=2,
        )
539 540 541 542 543
        search.run()
        self.assertTrue(os.path.isfile(search.out_file))

    def test_slice_grid_search(self):
        search = pyfstat.SliceGridSearch(
544 545 546 547 548 549 550 551 552 553 554
            "slice_grid_search",
            self.outdir,
            self.sftfilepath,
            F0s=self.F0s,
            F1s=self.F1s,
            F2s=[0],
            Alphas=[0],
            Deltas=[0],
            tref=self.tref,
            Lambda0=[30, 0, 0, 0],
        )
555 556
        fig, axes = search.run(save=False)
        self.assertTrue(fig is not None)
557 558 559

    def test_glitch_grid_search(self):
        search = pyfstat.GridGlitchSearch(
560 561 562 563 564 565 566 567 568 569 570
            "grid_grid_search",
            self.outdir,
            self.sftfilepath,
            F0s=self.F0s,
            F1s=self.F1s,
            F2s=[0],
            Alphas=[0],
            Deltas=[0],
            tref=self.tref,
            tglitchs=[self.tref],
        )
571 572 573 574 575
        search.run()
        self.assertTrue(os.path.isfile(search.out_file))

    def test_sliding_window(self):
        search = pyfstat.FrequencySlidingWindow(
576 577 578 579 580 581 582 583 584 585 586 587
            "grid_grid_search",
            self.outdir,
            self.sftfilepath,
            F0s=self.F0s,
            F1=0,
            F2=0,
            Alpha=0,
            Delta=0,
            tref=self.tref,
            minStartTime=self.minStartTime,
            maxStartTime=self.maxStartTime,
        )
588 589 590
        search.run()
        self.assertTrue(os.path.isfile(search.out_file))

591 592

if __name__ == "__main__":
593
    unittest.main()