Skip to content
Snippets Groups Projects
Select Git revision
  • d569dc90b0e739936c1d4ac54d419f537ccbd934
  • master default protected
  • Binary
  • add-version-information
  • os-path-join
  • develop-GA
  • timeFstatmap
  • add-higher-spindown-components
  • develop-DK
  • adds-header-to-grid-search
  • v1.3
  • v1.2
  • v1.1.2
  • v1.1.0
  • v1.0.1
15 results

tests.py

Blame
  • Forked from Gregory Ashton / PyFstat
    32 commits behind the upstream repository.
    Reinhard Prix's avatar
    Reinhard Prix authored
    - fix test that can otherwise fail (observed on MacOSX)
    367e4a02
    History
    tests.py 16.76 KiB
    import unittest
    import numpy as np
    import os
    import shutil
    import pyfstat
    import lalpulsar
    import logging
    import time
    
    class Test(unittest.TestCase):
        outdir = "TestData"
    
        @classmethod
        def setUpClass(self):
            if os.path.isdir(self.outdir):
                try:
                    shutil.rmtree(self.outdir)
                except OSError:
                    logging.warning("{} not removed prior to tests".format(self.outdir))
            h0 = 1
            sqrtSX = 1
            F0 = 30
            F1 = -1e-10
            F2 = 0
            minStartTime = 700000000
            duration = 2 * 86400
            Alpha = 5e-3
            Delta = 1.2
            tref = minStartTime
            Writer = pyfstat.Writer(
                F0=F0,
                F1=F1,
                F2=F2,
                label="test",
                h0=h0,
                sqrtSX=sqrtSX,
                outdir=self.outdir,
                tstart=minStartTime,
                Alpha=Alpha,
                Delta=Delta,
                tref=tref,
                duration=duration,
                Band=4,
            )
            Writer.make_data()
            self.sftfilepath = Writer.sftfilepath
            self.minStartTime = minStartTime
            self.maxStartTime = minStartTime + duration
            self.duration = duration
    
        @classmethod
        def tearDownClass(self):
            if os.path.isdir(self.outdir):
                try:
                    shutil.rmtree(self.outdir)
                except OSError:
                    logging.warning("{} not removed prior to tests".format(self.outdir))
    
    
    class Writer(Test):
        label = "TestWriter"
    
        def test_make_cff(self):
            Writer = pyfstat.Writer(self.label, outdir=self.outdir)
            Writer.make_cff()
            self.assertTrue(os.path.isfile("./{}/{}.cff".format(self.outdir, self.label)))
    
        def test_run_makefakedata(self):
            Writer = pyfstat.Writer(self.label, outdir=self.outdir, duration=3600)
            Writer.make_cff()
            Writer.run_makefakedata()
            self.assertTrue(
                os.path.isfile(
                    "./{}/H-2_H1_1800SFT_TestWriter-700000000-3600.sft".format(self.outdir)
                )
            )
    
        def test_makefakedata_usecached(self):
            Writer = pyfstat.Writer(self.label, outdir=self.outdir, duration=3600)
            if os.path.isfile(Writer.sftfilepath):
                os.remove(Writer.sftfilepath)
            Writer.make_cff()
            Writer.run_makefakedata()
            time_first = os.path.getmtime(Writer.sftfilepath)
            Writer.run_makefakedata()
            time_second = os.path.getmtime(Writer.sftfilepath)
            self.assertTrue(time_first == time_second)
            time.sleep(1)  # make sure timestamp is actually different!
            os.system("touch {}".format(Writer.config_file_name))
            Writer.run_makefakedata()
            time_third = os.path.getmtime(Writer.sftfilepath)
            self.assertFalse(time_first == time_third)
    
    
    class Bunch(Test):
        def test_bunch(self):
            b = pyfstat.core.Bunch(dict(x=10))
            self.assertTrue(b.x == 10)
    
    
    class par(Test):
        label = "TestPar"
    
        def test(self):
            os.system('echo "x=100\ny=10" > {}/{}.par'.format(self.outdir, self.label))
    
            par = pyfstat.core.read_par(
                "{}/{}.par".format(self.outdir, self.label), return_type="Bunch"
            )
            self.assertTrue(par.x == 100)
            self.assertTrue(par.y == 10)
    
            par = pyfstat.core.read_par(
                outdir=self.outdir, label=self.label, return_type="dict"
            )
            self.assertTrue(par["x"] == 100)
            self.assertTrue(par["y"] == 10)
            os.system("rm -r {}".format(self.outdir))
    
    
    class BaseSearchClass(Test):
        def test_shift_matrix(self):
            BSC = pyfstat.BaseSearchClass()
            dT = 10
            a = BSC._shift_matrix(4, dT)
            b = np.array(
                [
                    [
                        1,
                        2 * np.pi * dT,
                        2 * np.pi * dT ** 2 / 2.0,
                        2 * np.pi * dT ** 3 / 6.0,
                    ],
                    [0, 1, dT, dT ** 2 / 2.0],
                    [0, 0, 1, dT],
                    [0, 0, 0, 1],
                ]
            )
            self.assertTrue(np.array_equal(a, b))
    
        def test_shift_coefficients(self):
            BSC = pyfstat.BaseSearchClass()
            thetaA = np.array([10.0, 1e2, 10.0, 1e2])
            dT = 100
    
            # Calculate the 'long' way
            thetaB = np.zeros(len(thetaA))
            thetaB[3] = thetaA[3]
            thetaB[2] = thetaA[2] + thetaA[3] * dT
            thetaB[1] = thetaA[1] + thetaA[2] * dT + 0.5 * thetaA[3] * dT ** 2
            thetaB[0] = thetaA[0] + 2 * np.pi * (
                thetaA[1] * dT + 0.5 * thetaA[2] * dT ** 2 + thetaA[3] * dT ** 3 / 6.0
            )
    
            self.assertTrue(np.array_equal(thetaB, BSC._shift_coefficients(thetaA, dT)))
    
        def test_shift_coefficients_loop(self):
            BSC = pyfstat.BaseSearchClass()
            thetaA = np.array([10.0, 1e2, 10.0, 1e2])
            dT = 1e1
            thetaB = BSC._shift_coefficients(thetaA, dT)
            self.assertTrue(
                np.allclose(
                    thetaA, BSC._shift_coefficients(thetaB, -dT), rtol=1e-9, atol=1e-9
                )
            )
    
    
    class ComputeFstat(Test):
        label = "TestComputeFstat"
    
        def test_run_computefstatistic_single_point(self):
            Writer = pyfstat.Writer(
                self.label,
                outdir=self.outdir,
                duration=86400,
                h0=1,
                sqrtSX=1,
                detectors="H1",
            )
            Writer.make_data()
            predicted_FS = Writer.predict_fstat()
    
            search_H1L1 = pyfstat.ComputeFstat(
                tref=Writer.tref,
                sftfilepattern="{}/*{}*sft".format(Writer.outdir, Writer.label),
            )
            FS = search_H1L1.get_fullycoherent_twoF(
                Writer.tstart,
                Writer.tend,
                Writer.F0,
                Writer.F1,
                Writer.F2,
                Writer.Alpha,
                Writer.Delta,
            )
            self.assertTrue(np.abs(predicted_FS - FS) / FS < 0.3)
    
            Writer.detectors = "H1"
            predicted_FS = Writer.predict_fstat()
            search_H1 = pyfstat.ComputeFstat(
                tref=Writer.tref,
                detectors="H1",
                sftfilepattern="{}/*{}*sft".format(Writer.outdir, Writer.label),
                SSBprec=lalpulsar.SSBPREC_RELATIVISTIC,
            )
            FS = search_H1.get_fullycoherent_twoF(
                Writer.tstart,
                Writer.tend,
                Writer.F0,
                Writer.F1,
                Writer.F2,
                Writer.Alpha,
                Writer.Delta,
            )
            self.assertTrue(np.abs(predicted_FS - FS) / FS < 0.3)
    
        def run_computefstatistic_single_point_no_noise(self):
            Writer = pyfstat.Writer(
                self.label,
                outdir=self.outdir,
                add_noise=False,
                duration=86400,
                h0=1,
                sqrtSX=1,
            )
            Writer.make_data()
            predicted_FS = Writer.predict_fstat()
    
            search = pyfstat.ComputeFstat(
                tref=Writer.tref,
                assumeSqrtSX=1,
                sftfilepattern="{}/*{}*sft".format(Writer.outdir, Writer.label),
            )
            FS = search.get_fullycoherent_twoF(
                Writer.tstart,
                Writer.tend,
                Writer.F0,
                Writer.F1,
                Writer.F2,
                Writer.Alpha,
                Writer.Delta,
            )
            self.assertTrue(np.abs(predicted_FS - FS) / FS < 0.3)
    
        def test_injectSources(self):
            # This seems to be writing with a signal...
            Writer = pyfstat.Writer(
                self.label,
                outdir=self.outdir,
                add_noise=False,
                duration=86400,
                h0=1,
                sqrtSX=1,
            )
            Writer.make_cff()
            injectSources = Writer.config_file_name
    
            search = pyfstat.ComputeFstat(
                tref=Writer.tref,
                assumeSqrtSX=1,
                injectSources=injectSources,
                minCoverFreq=28,
                maxCoverFreq=32,
                minStartTime=Writer.tstart,
                maxStartTime=Writer.tstart + Writer.duration,
                detectors=Writer.detectors,
            )
            FS_from_file = search.get_fullycoherent_twoF(
                Writer.tstart,
                Writer.tend,
                Writer.F0,
                Writer.F1,
                Writer.F2,
                Writer.Alpha,
                Writer.Delta,
            )
            Writer.make_data()
            predicted_FS = Writer.predict_fstat()
            self.assertTrue(np.abs(predicted_FS - FS_from_file) / FS_from_file < 0.3)
    
            injectSourcesdict = pyfstat.core.read_par(Writer.config_file_name)
            injectSourcesdict["F0"] = injectSourcesdict["Freq"]
            injectSourcesdict["F1"] = injectSourcesdict["f1dot"]
            injectSourcesdict["F2"] = injectSourcesdict["f2dot"]
            search = pyfstat.ComputeFstat(
                tref=Writer.tref,
                assumeSqrtSX=1,
                injectSources=injectSourcesdict,
                minCoverFreq=28,
                maxCoverFreq=32,
                minStartTime=Writer.tstart,
                maxStartTime=Writer.tstart + Writer.duration,
                detectors=Writer.detectors,
            )
            FS_from_dict = search.get_fullycoherent_twoF(
                Writer.tstart,
                Writer.tend,
                Writer.F0,
                Writer.F1,
                Writer.F2,
                Writer.Alpha,
                Writer.Delta,
            )
            self.assertTrue(FS_from_dict == FS_from_file)
    
    
    class SemiCoherentSearch(Test):
        label = "TestSemiCoherentSearch"
    
        def test_get_semicoherent_twoF(self):
            duration = 10 * 86400
            Writer = pyfstat.Writer(
                self.label, outdir=self.outdir, duration=duration, h0=1, sqrtSX=1
            )
            Writer.make_data()
    
            search = pyfstat.SemiCoherentSearch(
                label=self.label,
                outdir=self.outdir,
                nsegs=2,
                sftfilepattern="{}/*{}*sft".format(Writer.outdir, Writer.label),
                tref=Writer.tref,
                minStartTime=Writer.tstart,
                maxStartTime=Writer.tend,
            )
    
            search.get_semicoherent_twoF(
                Writer.F0,
                Writer.F1,
                Writer.F2,
                Writer.Alpha,
                Writer.Delta,
                record_segments=True,
            )
    
            # Compute the predicted semi-coherent Fstat
            minStartTime = Writer.tstart
            maxStartTime = Writer.tend
    
            Writer.maxStartTime = minStartTime + duration / 2.0
            FSA = Writer.predict_fstat()
    
            Writer.tstart = minStartTime + duration / 2.0
            Writer.tend = maxStartTime
            FSB = Writer.predict_fstat()
    
            FSs = np.array([FSA, FSB])
            diffs = (np.array(search.detStat_per_segment) - FSs) / FSs
            self.assertTrue(np.all(diffs < 0.3))
    
        def test_get_semicoherent_BSGL(self):
            duration = 10 * 86400
            Writer = pyfstat.Writer(
                self.label, outdir=self.outdir, duration=duration, detectors="H1,L1"
            )
            Writer.make_data()
    
            search = pyfstat.SemiCoherentSearch(
                label=self.label,
                outdir=self.outdir,
                nsegs=2,
                sftfilepattern="{}/*{}*sft".format(Writer.outdir, Writer.label),
                tref=Writer.tref,
                minStartTime=Writer.tstart,
                maxStartTime=Writer.tend,
                BSGL=True,
            )
    
            BSGL = search.get_semicoherent_twoF(
                Writer.F0,
                Writer.F1,
                Writer.F2,
                Writer.Alpha,
                Writer.Delta,
                record_segments=True,
            )
            self.assertTrue(BSGL > 0)
    
    
    class SemiCoherentGlitchSearch(Test):
        label = "TestSemiCoherentGlitchSearch"
    
        def test_get_semicoherent_nglitch_twoF(self):
            duration = 10 * 86400
            dtglitch = 0.5 * duration
            delta_F0 = 0
            h0 = 1
            sqrtSX = 1
            Writer = pyfstat.GlitchWriter(
                self.label,
                outdir=self.outdir,
                duration=duration,
                dtglitch=dtglitch,
                delta_F0=delta_F0,
                sqrtSX=sqrtSX,
                h0=h0,
            )
    
            Writer.make_data()
    
            search = pyfstat.SemiCoherentGlitchSearch(
                label=self.label,
                outdir=self.outdir,
                sftfilepattern="{}/*{}*sft".format(Writer.outdir, Writer.label),
                tref=Writer.tref,
                minStartTime=Writer.tstart,
                maxStartTime=Writer.tend,
                nglitch=1,
            )
    
            FS = search.get_semicoherent_nglitch_twoF(
                Writer.F0,
                Writer.F1,
                Writer.F2,
                Writer.Alpha,
                Writer.Delta,
                Writer.delta_F0,
                Writer.delta_F1,
                search.minStartTime + dtglitch,
            )
    
            # Compute the predicted semi-coherent glitch Fstat
            minStartTime = Writer.tstart
            maxStartTime = Writer.tend
    
            Writer.maxStartTime = minStartTime + dtglitch
            FSA = Writer.predict_fstat()
    
            Writer.tstart = minStartTime + dtglitch
            Writer.tend = maxStartTime
            FSB = Writer.predict_fstat()
    
            print(FSA, FSB)
            predicted_FS = FSA + FSB
    
            print((predicted_FS, FS))
            self.assertTrue(np.abs((FS - predicted_FS)) / predicted_FS < 0.3)
    
    
    class MCMCSearch(Test):
        label = "TestMCMCSearch"
    
        def test_fully_coherent(self):
            h0 = 1
            sqrtSX = 1
            F0 = 30
            F1 = -1e-10
            F2 = 0
            minStartTime = 700000000
            duration = 1 * 86400
            maxStartTime = minStartTime + duration
            Alpha = 5e-3
            Delta = 1.2
            tref = minStartTime
            Writer = pyfstat.Writer(
                F0=F0,
                F1=F1,
                F2=F2,
                label=self.label,
                h0=h0,
                sqrtSX=sqrtSX,
                outdir=self.outdir,
                tstart=minStartTime,
                Alpha=Alpha,
                Delta=Delta,
                tref=tref,
                duration=duration,
                Band=4,
            )
    
            Writer.make_data()
            predicted_FS = Writer.predict_fstat()
    
            theta = {
                "F0": {"type": "norm", "loc": F0, "scale": np.abs(1e-10 * F0)},
                "F1": {"type": "norm", "loc": F1, "scale": np.abs(1e-10 * F1)},
                "F2": F2,
                "Alpha": Alpha,
                "Delta": Delta,
            }
    
            search = pyfstat.MCMCSearch(
                label=self.label,
                outdir=self.outdir,
                theta_prior=theta,
                tref=tref,
                sftfilepattern="{}/*{}*sft".format(Writer.outdir, Writer.label),
                minStartTime=minStartTime,
                maxStartTime=maxStartTime,
                nsteps=[100, 100],
                nwalkers=100,
                ntemps=2,
                log10beta_min=-1,
            )
            search.run(create_plots=False)
            _, FS = search.get_max_twoF()
    
            print(("Predicted twoF is {} while recovered is {}".format(predicted_FS, FS)))
            self.assertTrue(
                FS > predicted_FS or np.abs((FS - predicted_FS)) / predicted_FS < 0.3
            )
    
    
    class GridSearch(Test):
        F0s = [29, 31, 0.1]
        F1s = [-1e-10, 0, 1e-11]
        tref = 700000000
    
        def test_grid_search(self):
            search = pyfstat.GridSearch(
                "grid_search",
                self.outdir,
                self.sftfilepath,
                F0s=self.F0s,
                F1s=[0],
                F2s=[0],
                Alphas=[0],
                Deltas=[0],
                tref=self.tref,
            )
            search.run()
            self.assertTrue(os.path.isfile(search.out_file))
    
        def test_semicoherent_grid_search(self):
            search = pyfstat.GridSearch(
                "sc_grid_search",
                self.outdir,
                self.sftfilepath,
                F0s=self.F0s,
                F1s=[0],
                F2s=[0],
                Alphas=[0],
                Deltas=[0],
                tref=self.tref,
                nsegs=2,
            )
            search.run()
            self.assertTrue(os.path.isfile(search.out_file))
    
        def test_slice_grid_search(self):
            search = pyfstat.SliceGridSearch(
                "slice_grid_search",
                self.outdir,
                self.sftfilepath,
                F0s=self.F0s,
                F1s=self.F1s,
                F2s=[0],
                Alphas=[0],
                Deltas=[0],
                tref=self.tref,
                Lambda0=[30, 0, 0, 0],
            )
            fig, axes = search.run(save=False)
            self.assertTrue(fig is not None)
    
        def test_glitch_grid_search(self):
            search = pyfstat.GridGlitchSearch(
                "grid_grid_search",
                self.outdir,
                self.sftfilepath,
                F0s=self.F0s,
                F1s=self.F1s,
                F2s=[0],
                Alphas=[0],
                Deltas=[0],
                tref=self.tref,
                tglitchs=[self.tref],
            )
            search.run()
            self.assertTrue(os.path.isfile(search.out_file))
    
        def test_sliding_window(self):
            search = pyfstat.FrequencySlidingWindow(
                "grid_grid_search",
                self.outdir,
                self.sftfilepath,
                F0s=self.F0s,
                F1=0,
                F2=0,
                Alpha=0,
                Delta=0,
                tref=self.tref,
                minStartTime=self.minStartTime,
                maxStartTime=self.maxStartTime,
            )
            search.run()
            self.assertTrue(os.path.isfile(search.out_file))
    
    
    if __name__ == "__main__":
        unittest.main()