tests.py 15 KB
Newer Older
1 2 3
import unittest
import numpy as np
import os
Gregory Ashton's avatar
Gregory Ashton committed
4 5
import shutil
import pyfstat
6
import lalpulsar
7
import logging
Gregory Ashton's avatar
Gregory Ashton committed
8

9

Gregory Ashton's avatar
Gregory Ashton committed
10
class Test(unittest.TestCase):
Gregory Ashton's avatar
Gregory Ashton committed
11 12
    outdir = 'TestData'

Gregory Ashton's avatar
Gregory Ashton committed
13
    @classmethod
Gregory Ashton's avatar
Gregory Ashton committed
14 15
    def setUpClass(self):
        if os.path.isdir(self.outdir):
16 17 18 19 20
            try:
                shutil.rmtree(self.outdir)
            except OSError:
                logging.warning(
                    "{} not removed prior to tests".format(self.outdir))
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
        h0 = 1
        sqrtSX = 1
        F0 = 30
        F1 = -1e-10
        F2 = 0
        minStartTime = 700000000
        duration = 2 * 86400
        Alpha = 5e-3
        Delta = 1.2
        tref = minStartTime
        Writer = pyfstat.Writer(F0=F0, F1=F1, F2=F2, label='test',
                                h0=h0, sqrtSX=sqrtSX,
                                outdir=self.outdir, tstart=minStartTime,
                                Alpha=Alpha, Delta=Delta, tref=tref,
                                duration=duration,
                                Band=4)
        Writer.make_data()
        self.sftfilepath = Writer.sftfilepath
        self.minStartTime = minStartTime
        self.maxStartTime = minStartTime + duration
        self.duration = duration
42

Gregory Ashton's avatar
Gregory Ashton committed
43
    @classmethod
Gregory Ashton's avatar
Gregory Ashton committed
44 45
    def tearDownClass(self):
        if os.path.isdir(self.outdir):
46 47 48 49 50
            try:
                shutil.rmtree(self.outdir)
            except OSError:
                logging.warning(
                    "{} not removed prior to tests".format(self.outdir))
Gregory Ashton's avatar
Gregory Ashton committed
51 52


53
class Writer(Test):
Gregory Ashton's avatar
Gregory Ashton committed
54
    label = "TestWriter"
55 56

    def test_make_cff(self):
Gregory Ashton's avatar
Gregory Ashton committed
57
        Writer = pyfstat.Writer(self.label, outdir=self.outdir)
58
        Writer.make_cff()
59 60
        self.assertTrue(os.path.isfile(
            './{}/{}.cff'.format(self.outdir, self.label)))
61 62

    def test_run_makefakedata(self):
63
        Writer = pyfstat.Writer(self.label, outdir=self.outdir, duration=3600)
64 65 66
        Writer.make_cff()
        Writer.run_makefakedata()
        self.assertTrue(os.path.isfile(
67
            './{}/H-2_H1_1800SFT_TestWriter-700000000-3600.sft'
68
            .format(self.outdir)))
69 70

    def test_makefakedata_usecached(self):
71
        Writer = pyfstat.Writer(self.label, outdir=self.outdir, duration=3600)
72 73
        if os.path.isfile(Writer.sftfilepath):
            os.remove(Writer.sftfilepath)
Gregory Ashton's avatar
Gregory Ashton committed
74
        Writer.make_cff()
75
        Writer.run_makefakedata()
76
        time_first = os.path.getmtime(Writer.sftfilepath)
77
        Writer.run_makefakedata()
78
        time_second = os.path.getmtime(Writer.sftfilepath)
79 80 81
        self.assertTrue(time_first == time_second)
        os.system('touch {}'.format(Writer.config_file_name))
        Writer.run_makefakedata()
82
        time_third = os.path.getmtime(Writer.sftfilepath)
83 84 85
        self.assertFalse(time_first == time_third)


86
class Bunch(Test):
Gregory Ashton's avatar
Gregory Ashton committed
87 88 89 90 91
    def test_bunch(self):
        b = pyfstat.core.Bunch(dict(x=10))
        self.assertTrue(b.x == 10)


92
class par(Test):
Gregory Ashton's avatar
Gregory Ashton committed
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
    label = 'TestPar'

    def test(self):
        os.system('mkdir {}'.format(self.outdir))
        os.system(
            'echo "x=100\ny=10" > {}/{}.par'.format(self.outdir, self.label))

        par = pyfstat.core.read_par(
            '{}/{}.par'.format(self.outdir, self.label), return_type='Bunch')
        self.assertTrue(par.x == 100)
        self.assertTrue(par.y == 10)

        par = pyfstat.core.read_par(outdir=self.outdir, label=self.label,
                                    return_type='dict')
        self.assertTrue(par['x'] == 100)
        self.assertTrue(par['y'] == 10)
109
        os.system('rm -r {}'.format(self.outdir))
Gregory Ashton's avatar
Gregory Ashton committed
110 111


112
class BaseSearchClass(Test):
113 114 115
    def test_shift_matrix(self):
        BSC = pyfstat.BaseSearchClass()
        dT = 10
116
        a = BSC._shift_matrix(4, dT)
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
        b = np.array([[1, 2*np.pi*dT, 2*np.pi*dT**2/2.0, 2*np.pi*dT**3/6.0],
                      [0, 1, dT, dT**2/2.0],
                      [0, 0, 1, dT],
                      [0, 0, 0, 1]])
        self.assertTrue(np.array_equal(a, b))

    def test_shift_coefficients(self):
        BSC = pyfstat.BaseSearchClass()
        thetaA = np.array([10., 1e2, 10., 1e2])
        dT = 100

        # Calculate the 'long' way
        thetaB = np.zeros(len(thetaA))
        thetaB[3] = thetaA[3]
        thetaB[2] = thetaA[2] + thetaA[3]*dT
        thetaB[1] = thetaA[1] + thetaA[2]*dT + .5*thetaA[3]*dT**2
        thetaB[0] = thetaA[0] + 2*np.pi*(thetaA[1]*dT + .5*thetaA[2]*dT**2
                                         + thetaA[3]*dT**3 / 6.0)

        self.assertTrue(
            np.array_equal(
138
                thetaB, BSC._shift_coefficients(thetaA, dT)))
139 140 141 142 143

    def test_shift_coefficients_loop(self):
        BSC = pyfstat.BaseSearchClass()
        thetaA = np.array([10., 1e2, 10., 1e2])
        dT = 1e1
144
        thetaB = BSC._shift_coefficients(thetaA, dT)
145 146
        self.assertTrue(
            np.allclose(
147
                thetaA, BSC._shift_coefficients(thetaB, -dT),
148 149 150
                rtol=1e-9, atol=1e-9))


151
class ComputeFstat(Test):
Gregory Ashton's avatar
Gregory Ashton committed
152
    label = "TestComputeFstat"
153 154

    def test_run_computefstatistic_single_point(self):
Gregory Ashton's avatar
Gregory Ashton committed
155
        Writer = pyfstat.Writer(self.label, outdir=self.outdir, duration=86400,
156
                                h0=1, sqrtSX=1, detectors='H1,L1')
157 158 159
        Writer.make_data()
        predicted_FS = Writer.predict_fstat()

160 161 162 163 164 165
        search_H1L1 = pyfstat.ComputeFstat(
            tref=Writer.tref,
            sftfilepattern='{}/*{}*sft'.format(Writer.outdir, Writer.label))
        FS = search_H1L1.get_fullycoherent_twoF(
            Writer.tstart, Writer.tend, Writer.F0, Writer.F1, Writer.F2,
            Writer.Alpha, Writer.Delta)
Gregory Ashton's avatar
Gregory Ashton committed
166
        self.assertTrue(np.abs(predicted_FS-FS)/FS < 0.3)
167 168 169

        Writer.detectors = 'H1'
        predicted_FS = Writer.predict_fstat()
170 171 172 173 174 175 176
        search_H1 = pyfstat.ComputeFstat(
            tref=Writer.tref, detectors='H1',
            sftfilepattern='{}/*{}*sft'.format(Writer.outdir, Writer.label),
            SSBprec=lalpulsar.SSBPREC_RELATIVISTIC)
        FS = search_H1.get_fullycoherent_twoF(
            Writer.tstart, Writer.tend, Writer.F0, Writer.F1, Writer.F2,
            Writer.Alpha, Writer.Delta)
Gregory Ashton's avatar
Gregory Ashton committed
177
        self.assertTrue(np.abs(predicted_FS-FS)/FS < 0.3)
178

179
    def run_computefstatistic_single_point_no_noise(self):
180 181 182
        Writer = pyfstat.Writer(
            self.label, outdir=self.outdir, add_noise=False, duration=86400,
            h0=1, sqrtSX=1)
183 184 185 186 187
        Writer.make_data()
        predicted_FS = Writer.predict_fstat()

        search = pyfstat.ComputeFstat(
            tref=Writer.tref, assumeSqrtSX=1,
188
            sftfilepattern='{}/*{}*sft'.format(Writer.outdir, Writer.label))
189 190 191
        FS = search.get_fullycoherent_twoF(
            Writer.tstart, Writer.tend, Writer.F0, Writer.F1, Writer.F2,
            Writer.Alpha, Writer.Delta)
192
        self.assertTrue(np.abs(predicted_FS-FS)/FS < 0.3)
Gregory Ashton's avatar
Gregory Ashton committed
193

194
    def test_injectSources(self):
195 196 197 198
        # This seems to be writing with a signal...
        Writer = pyfstat.Writer(
            self.label, outdir=self.outdir, add_noise=False, duration=86400,
            h0=1, sqrtSX=1)
199 200 201 202 203 204 205 206
        Writer.make_cff()
        injectSources = Writer.config_file_name

        search = pyfstat.ComputeFstat(
            tref=Writer.tref, assumeSqrtSX=1, injectSources=injectSources,
            minCoverFreq=28, maxCoverFreq=32, minStartTime=Writer.tstart,
            maxStartTime=Writer.tstart+Writer.duration,
            detectors=Writer.detectors)
207
        FS_from_file = search.get_fullycoherent_twoF(
208 209
            Writer.tstart, Writer.tend, Writer.F0, Writer.F1, Writer.F2,
            Writer.Alpha, Writer.Delta)
210 211
        Writer.make_data()
        predicted_FS = Writer.predict_fstat()
Gregory Ashton's avatar
Gregory Ashton committed
212
        self.assertTrue(np.abs(predicted_FS-FS_from_file)/FS_from_file < 0.3)
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228

        injectSourcesdict = pyfstat.core.read_par(Writer.config_file_name)
        injectSourcesdict['F0'] = injectSourcesdict['Freq']
        injectSourcesdict['F1'] = injectSourcesdict['f1dot']
        injectSourcesdict['F2'] = injectSourcesdict['f2dot']
        search = pyfstat.ComputeFstat(
            tref=Writer.tref, assumeSqrtSX=1, injectSources=injectSourcesdict,
            minCoverFreq=28, maxCoverFreq=32, minStartTime=Writer.tstart,
            maxStartTime=Writer.tstart+Writer.duration,
            detectors=Writer.detectors)
        FS_from_dict = search.get_fullycoherent_twoF(
            Writer.tstart, Writer.tend, Writer.F0, Writer.F1, Writer.F2,
            Writer.Alpha, Writer.Delta)
        self.assertTrue(FS_from_dict == FS_from_file)


229
class SemiCoherentSearch(Test):
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279
    label = "TestSemiCoherentSearch"

    def test_get_semicoherent_twoF(self):
        duration = 10*86400
        Writer = pyfstat.Writer(
            self.label, outdir=self.outdir, duration=duration, h0=1, sqrtSX=1)
        Writer.make_data()

        search = pyfstat.SemiCoherentSearch(
            label=self.label, outdir=self.outdir, nsegs=2,
            sftfilepattern='{}/*{}*sft'.format(Writer.outdir, Writer.label),
            tref=Writer.tref, minStartTime=Writer.tstart,
            maxStartTime=Writer.tend)

        search.get_semicoherent_twoF(
            Writer.F0, Writer.F1, Writer.F2, Writer.Alpha, Writer.Delta,
            record_segments=True)

        # Compute the predicted semi-coherent Fstat
        minStartTime = Writer.tstart
        maxStartTime = Writer.tend

        Writer.maxStartTime = minStartTime + duration / 2.0
        FSA = Writer.predict_fstat()

        Writer.tstart = minStartTime + duration / 2.0
        Writer.tend = maxStartTime
        FSB = Writer.predict_fstat()

        FSs = np.array([FSA, FSB])
        diffs = (np.array(search.detStat_per_segment) - FSs) / FSs
        self.assertTrue(np.all(diffs < 0.3))

    def test_get_semicoherent_BSGL(self):
        duration = 10*86400
        Writer = pyfstat.Writer(
            self.label, outdir=self.outdir, duration=duration,
            detectors='H1,L1')
        Writer.make_data()

        search = pyfstat.SemiCoherentSearch(
            label=self.label, outdir=self.outdir, nsegs=2,
            sftfilepattern='{}/*{}*sft'.format(Writer.outdir, Writer.label),
            tref=Writer.tref, minStartTime=Writer.tstart,
            maxStartTime=Writer.tend, BSGL=True)

        BSGL = search.get_semicoherent_twoF(
            Writer.F0, Writer.F1, Writer.F2, Writer.Alpha, Writer.Delta,
            record_segments=True)
        self.assertTrue(BSGL > 0)
280 281


282
class SemiCoherentGlitchSearch(Test):
Gregory Ashton's avatar
Gregory Ashton committed
283
    label = "TestSemiCoherentGlitchSearch"
Gregory Ashton's avatar
Gregory Ashton committed
284

285
    def test_get_semicoherent_nglitch_twoF(self):
Gregory Ashton's avatar
Gregory Ashton committed
286 287
        duration = 10*86400
        dtglitch = .5*duration
288
        delta_F0 = 0
Gregory Ashton's avatar
Gregory Ashton committed
289 290
        h0 = 1
        sqrtSX = 1
291
        Writer = pyfstat.GlitchWriter(
Gregory Ashton's avatar
Gregory Ashton committed
292
            self.label, outdir=self.outdir, duration=duration, dtglitch=dtglitch,
Gregory Ashton's avatar
Gregory Ashton committed
293
            delta_F0=delta_F0, sqrtSX=sqrtSX, h0=h0)
294 295 296 297

        Writer.make_data()

        search = pyfstat.SemiCoherentGlitchSearch(
Gregory Ashton's avatar
Gregory Ashton committed
298
            label=self.label, outdir=self.outdir,
299
            sftfilepattern='{}/*{}*sft'.format(Writer.outdir, Writer.label),
300 301
            tref=Writer.tref, minStartTime=Writer.tstart,
            maxStartTime=Writer.tend, nglitch=1)
302

303 304 305
        FS = search.get_semicoherent_nglitch_twoF(
            Writer.F0, Writer.F1, Writer.F2, Writer.Alpha, Writer.Delta,
            Writer.delta_F0, Writer.delta_F1, search.minStartTime+dtglitch)
306 307

        # Compute the predicted semi-coherent glitch Fstat
308 309
        minStartTime = Writer.tstart
        maxStartTime = Writer.tend
310

311
        Writer.maxStartTime = minStartTime + dtglitch
312 313
        FSA = Writer.predict_fstat()

314 315
        Writer.tstart = minStartTime + dtglitch
        Writer.tend = maxStartTime
316 317
        FSB = Writer.predict_fstat()

318
        print FSA, FSB
319 320 321
        predicted_FS = (FSA + FSB)

        print(predicted_FS, FS)
Gregory Ashton's avatar
Gregory Ashton committed
322
        self.assertTrue(np.abs((FS - predicted_FS))/predicted_FS < 0.3)
323 324


325
class MCMCSearch(Test):
Gregory Ashton's avatar
Gregory Ashton committed
326
    label = "TestMCMCSearch"
327 328

    def test_fully_coherent(self):
Gregory Ashton's avatar
Gregory Ashton committed
329 330
        h0 = 1
        sqrtSX = 1
331 332 333
        F0 = 30
        F1 = -1e-10
        F2 = 0
334
        minStartTime = 700000000
Gregory Ashton's avatar
Gregory Ashton committed
335
        duration = 1 * 86400
336
        maxStartTime = minStartTime + duration
337 338
        Alpha = 5e-3
        Delta = 1.2
339
        tref = minStartTime
340 341
        Writer = pyfstat.Writer(F0=F0, F1=F1, F2=F2, label=self.label,
                                h0=h0, sqrtSX=sqrtSX,
Gregory Ashton's avatar
Gregory Ashton committed
342
                                outdir=self.outdir, tstart=minStartTime,
343
                                Alpha=Alpha, Delta=Delta, tref=tref,
344
                                duration=duration,
345
                                Band=4)
346 347 348 349

        Writer.make_data()
        predicted_FS = Writer.predict_fstat()

Gregory Ashton's avatar
Gregory Ashton committed
350 351
        theta = {'F0': {'type': 'norm', 'loc': F0, 'scale': np.abs(1e-10*F0)},
                 'F1': {'type': 'norm', 'loc': F1, 'scale': np.abs(1e-10*F1)},
352 353
                 'F2': F2, 'Alpha': Alpha, 'Delta': Delta}

Gregory Ashton's avatar
Gregory Ashton committed
354
        search = pyfstat.MCMCSearch(
Gregory Ashton's avatar
Gregory Ashton committed
355
            label=self.label, outdir=self.outdir, theta_prior=theta, tref=tref,
356
            sftfilepattern='{}/*{}*sft'.format(Writer.outdir, Writer.label),
357
            minStartTime=minStartTime, maxStartTime=maxStartTime,
358
            nsteps=[100, 100], nwalkers=100, ntemps=2, log10beta_min=-1)
359
        search.run(create_plots=False)
360 361 362 363
        _, FS = search.get_max_twoF()

        print('Predicted twoF is {} while recovered is {}'.format(
                predicted_FS, FS))
Gregory Ashton's avatar
Gregory Ashton committed
364 365
        self.assertTrue(
            FS > predicted_FS or np.abs((FS-predicted_FS))/predicted_FS < 0.3)
366 367


368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411
class GridSearch(Test):
    F0s = [29, 31, 0.1]
    F1s = [-1e-10, 0, 1e-11]
    tref = 700000000

    def test_grid_search(self):
        search = pyfstat.GridSearch(
            'grid_search', self.outdir, self.sftfilepath, F0s=self.F0s,
            F1s=[0], F2s=[0], Alphas=[0], Deltas=[0], tref=self.tref)
        search.run()
        self.assertTrue(os.path.isfile(search.out_file))

    def test_semicoherent_grid_search(self):
        search = pyfstat.GridSearch(
            'sc_grid_search', self.outdir, self.sftfilepath, F0s=self.F0s,
            F1s=[0], F2s=[0], Alphas=[0], Deltas=[0], tref=self.tref, nsegs=2)
        search.run()
        self.assertTrue(os.path.isfile(search.out_file))

    def test_slice_grid_search(self):
        search = pyfstat.SliceGridSearch(
            'slice_grid_search', self.outdir, self.sftfilepath, F0s=self.F0s,
            F1s=self.F1s, F2s=[0], Alphas=[0], Deltas=[0], tref=self.tref,
            Lambda0=[30, 0, 0, 0])
        search.run()
        self.assertTrue(os.path.isfile('{}/{}_slice_projection.png'
                                       .format(search.outdir, search.label)))

    def test_glitch_grid_search(self):
        search = pyfstat.GridGlitchSearch(
            'grid_grid_search', self.outdir, self.sftfilepath, F0s=self.F0s,
            F1s=self.F1s, F2s=[0], Alphas=[0], Deltas=[0], tref=self.tref,
            tglitchs=[self.tref])
        search.run()
        self.assertTrue(os.path.isfile(search.out_file))

    def test_sliding_window(self):
        search = pyfstat.FrequencySlidingWindow(
            'grid_grid_search', self.outdir, self.sftfilepath, F0s=self.F0s,
            F1=0, F2=0, Alpha=0, Delta=0, tref=self.tref,
            minStartTime=self.minStartTime, maxStartTime=self.maxStartTime)
        search.run()
        self.assertTrue(os.path.isfile(search.out_file))

412 413
if __name__ == '__main__':
    unittest.main()