Source code for nifits.io.oifits

"""
A module for reading/writing NIFITS files and handling the data.

To open an existing NIFITS file, use ``nifits.from_nifits`` constructor.

To save an NIFITS object to a file, use ``nifits.to_nifits`` method.

A summary of the information in the oifits object can be obtained by
using the info() method:

   > import oifits
   > oifitsobj = oifits.open('foo.fits')
   > oifitsobj.info()

For further information, contact R. Laugier

"""

import numpy as np
from astropy.io import fits
import astropy.units as u
import astropy.table
Table = astropy.table.Table
from astropy.coordinates import EarthLocation
import datetime
import warnings
from packaging import version

import sys
from dataclasses import dataclass, field
# from numpy.typing import ArrayLike
# A hack to fix the documentation of type hinting
import numpy.typing
ArrayLike = np.typing.ArrayLike


__version__ = "0.0.6"
__standard_version__ = "0.4"

_mjdzero = datetime.datetime(1858, 11, 17)

matchtargetbyname = False
matchstationbyname = False
refdate = datetime.datetime(2000, 1, 1)


import warnings
[docs] def check_item(func): """ A decorator for the `fits.Header.__getitem__`. This is here to save from compatibility issues with files of standard version <= 0.2 while warning that that this version will """ def inner(*args, **kwargs): good_kw = True try : item = func(*args, **kwargs) good_kw = True except KeyError: good_kw = False if good_kw: return item bad_kw = True try : akw = args[1] mykw = akw.split(" ")[-1] baditem = func(args[0], mykw, **kwargs) bad_kw = True except KeyError: bad_kw = False if bad_kw and not good_kw: warnings.warn(f"Keyword deprecation. Expected `{args[1]}` (`HIERARCH` keyword)\n Found `{mykw}`\n This file was generated for NIFITS standard version <= 0.2. It will stop working for library versions >= 0.1.0 .") item = baditem return item elif not bad_kw and not good_kw: raise KeyError(f"Neither {args[1]} nor {mykw} found.") return None return item return inner
fits.Header.__getitem__ = check_item(fits.Header.__getitem__)
[docs] def _plurals(count): if count != 1: return 's' return ''
[docs] def array_eq(a: ArrayLike, b: ArrayLike): """ Test whether all the elements of two arrays are equal. Args: a: one input. b: another input. """ if len(a) != len(b): return False try: return not (a != b).any() except: return not (a != b)
[docs] def _isnone(x): """Convenience hack for checking if x is none; needed because numpy arrays will, at some point, return arrays for x == None.""" return type(x) == type(None)
[docs] def _notnone(x): """Convenience hack for checking if x is not none; needed because numpy arrays will, at some point, return arrays for x != None.""" return type(x) != type(None)
[docs] class OI_STATION(object): """ This class corresponds to a single row (i.e. single station/telescope) of an OI_ARRAY table.""" def __init__(self, tel_name=None, sta_name=None, diameter=None, staxyz=[None, None, None], fov=None, fovtype=None, revision=1): if revision > 2: warnings.warn('OI_ARRAY revision %d not implemented yet'%revision, UserWarning) self.revision = revision self.tel_name = tel_name self.sta_name = sta_name self.diameter = diameter self.staxyz = staxyz if revision >= 2: self.fov = fov self.fovtype = fovtype else: self.fov = self.fovtype = None
[docs] def __eq__(self, other): if type(self) != type(other): return False return not ( (self.revision != other.revision) or (self.tel_name != other.tel_name) or (self.sta_name != other.sta_name) or (self.diameter != other.diameter) or (not _array_eq(self.staxyz, other.staxyz)) or (self.fov != other.fov) or (self.fovtype != other.fovtype))
[docs] def __ne__(self, other): return not self.__eq__(other)
[docs] def __repr__(self): if self.revision >= 2: return '%s/%s (%g m, fov %g arcsec (%s))'%(self.sta_name, self.tel_name, self.diameter, self.fov, self.fovtype) else: return '%s/%s (%g m)'%(self.sta_name, self.tel_name, self.diameter)
@dataclass class NI_CATM(object): """Contains the complex amplitude transfer matrix CATM of the instrument. The CATM is a complex matrix representing the transformation from the each of the complex amplitude of electric field from the inputs to the outputs of the instrument. The dimensions are (n_ch, n_out, n_in) where n_ch represents the spectral channels. It is expected that :math:`\\textbf{n}_{out} = \\textbf{M}_{CATM}.\\textbf{m}_{mod} \circ \\textbf{x}_{in}` with :math:`\\textbf{m}_{mod}` containded in NI_MOD. """ Mcatm: ArrayLike header: fits.Header
[docs] @classmethod def from_hdu(cls, hdu: type(fits.hdu.ImageHDU)): """ Create the NI_CATM object from the HDU extension of an opened fits file. """ Mcatm = hdu.data header = hdu.header myobj = cls(Mcatm, header) return myobj
[docs] def to_hdu(self): """ Returns and hdu object to save into fits """ myhdu = fits.hdu.ImageHDU(data=self.Mcatm) myhdu.header = self.header return myhdu
# TODO add a check method
[docs] def nulfunc(self, *args, **kwargs): raise TypeError
NI_OITAG_DEFAULT_HEADER = fits.Header(cards=[("HIERRARCH NIFITS IOSWAPS", False, "The units for output values")]) NI_MOD_DEFAULT_HEADER = fits.Header(cards=[("HIERARCH NIFITS AMOD_PHAS_UNITS", "rad", "The units for modulation phasors"), ("HIERARCH NIFITS ARRCOL_UNITS", "m^2", "The units for collecting area") ]) # Possible to use "chromatic_gaussian_radial", "diameter_gaussian_radial". # Simplest default is a gaussian with r0 = lambda/D NI_FOV_DEFAULT_HEADER = fits.Header(cards=[("HIERARCH NIFITS FOV_MODE","diameter_gaussian_radial","Type of FOV definition"), ("HIERARCH NIFITS FOV_offset"), ("HIERARCH NIFITS FOV_TELDIAM", 8.0, "diameter of a collecting aperture for FOV"), ("HIERARCH NIFITS FOV_TELDIAM_UNIT", "m", ""), ("HIERARCH NIFITS WL_SHIFT_MODE", "")]) """ Obtaining the goecentric location of observatories. (unchecked) ```python from astroplan import Observer import astropy.coordinates as coords myobs = Observer.at_site("CHARA") print([acoord.value for acoord in myobs.location.to_geocentric()]) ``` VLTI: 1946404.3410388362, -5467644.290798524, -2642728.2014442487 CHARA: -2484228.6029109913, -4660044.467216573, 3567867.961141405 """ OI_ARRAY_DEFAULT_VLTI_HEADER = fits.Header(cards=[ ("OI_REVN", 1, "Revision number of the table definition (refers no OIFITS version, not NIFITS)."), ("ARRNAME", "VLTI", "Array name, for cross-referencing"), ("FRAME", "GEOCENTRIC", "Coordinate frame"), ("ARRAYX", 1946404.3410388362, "Array center coordinates (m)"), ("ARRAYY", -5467644.290798524,"Array center coordinates (m)"), ("ARRAYZ", -2642728.2014442487, "Array center coordinates (m)"), ]) OI_ARRAY_DEFAULT_CHARA_HEADER = fits.Header(cards=[ ("OI_REVN", 1, "Revision number of the table definition (refers no OIFITS version, not NIFITS)."), ("ARRNAME", "CHARA", "Array name, for cross-referencing"), ("FRAME", "GEOCENTRIC", "Coordinate frame"), ("ARRAYX", -2484228.6029109913, "Array center coordinates (m)"), ("ARRAYY", -4660044.467216573, "Array center coordinates (m)"), ("ARRAYZ", 3567867.961141405, "Array center coordinates (m)"), ])
[docs] @dataclass class NI_EXTENSION(object): """ ``NI_EXTENSION`` Generic class for NIFITS extensions **Inherited methods:** * ``from_hdu``: Creates the object from an ``astropy.io.fits.TableHDU`` object * ``to_hdu`` : Returns the ``TableHDU`` from itself. Args: data_table: [ArrayLike] The data to stored header: [fits.Header] A fits header (optional) unit: [astropy.units.Unit] Units of the data stored (mandatory of NI_IOUT, NI_KIOUT and NI_KCOV) """ data_table: Table = field(default_factory=Table) header: fits.Header = field(default_factory=fits.Header) unit: u.Unit = None # TODO: Potentially, this should be a None by default, while still being a # fits.Header type hint... We can if we have a None, we can catch it with # a __post_init__ method. TODO this will help cleanup the signature in the doc.
[docs] @classmethod def from_hdu(cls, hdu: type(fits.hdu.TableHDU)): """ Create the data object from the HDU extension of an opened fits file. Args: hdu : TableHDU object containing the relevant data """ data_table = Table(hdu.data) header = hdu.header if "HIERARCH NIFITS IUNIT" in header.keys(): return cls(data_table=data_table, header=header, unit=u.Unit(header["NIFITS IUNIT"])) else: return cls(data_table=data_table, header=header)
[docs] def to_hdu(self): """ Returns and hdu object to save into fits .. admonition:: This also updates the header if dimension changes """ if hasattr(self, "unit"): if self.unit is not None: self.header["NIFITS IUNIT"] = (self.unit.to_string(), "Unit for the content") # TODO this looks like a bug in astropy.fits: the header should update on its own. myhdu = fits.hdu.BinTableHDU(name=self.name, data=self.data_table, header=self.header) # myhdu = fits.hdu.BinTableHDU(name=self.name, data=self.data_table) # TODO: fix the diffing? # print("Updating header:\n", fits.HeaderDiff(myhdu.header, self.header).__repr__) self.header = myhdu.header return myhdu
[docs] def __len__(self): return len(self.data_table)
[docs] @dataclass class NI_EXTENSION_ARRAY(NI_EXTENSION): """ Generic class for NIFITS array extensions Args: data_array: [ArrayLike] The data to stored header: [fits.Header] A fits header (optional) unit: [astropy.units.Unit] Units of the data stored (mandatory of NI_IOUT, NI_KIOUT and NI_KCOV) """ data_array: ArrayLike = field(default_factory=np.array) header: fits.Header = field(default_factory=fits.Header) unit: u.Unit = None
[docs] @classmethod def from_hdu(cls, hdu: type(fits.hdu.ImageHDU)): """ Create the data object from the HDU extension of an opened fits file. """ data_array = hdu.data header = hdu.header if "IUNIT" in header.keys(): return cls(data_array=data_array, header=header, unit=u.Unit(header["NIFITS IUNIT"])) else: return cls(data_array=data_array, header=header)
[docs] def to_hdu(self,): """ Returns and hdu object to save into fits .. admonition:: Note This also updates the header if dimension changes """ if hasattr(self, "unit"): if self.unit is not None: self.header["NIFITS IUNIT"] = (self.unit.to_string(), "Unit for the content") myhdu = fits.hdu.ImageHDU(name=self.name,data=self.data_array, header=self.header) print("Updating header:\n", fits.HeaderDiff(myhdu.header, self.header)) self.header = myhdu.header return myhdu
[docs] def __len__(self): pass
@property def shape(self): return self.data_array.shape
[docs] @dataclass class NI_EXTENSION_CPX_ARRAY(NI_EXTENSION): """ Generic class for NIFITS array extensions. The array is kept locally as complex valued, but it is stored to and loaded from a real-valued array with an extra first dimension of length 2 for (real, imaginary) parts. """ data_array: ArrayLike = field(default_factory=np.array) header: fits.Header = field(default_factory=fits.Header)
[docs] @classmethod def from_hdu(cls, hdu: type(fits.hdu.ImageHDU)): """ Create the data object from the HDU extension of an opened fits file. """ assert hdu.data.shape[0] == 2,\ f"Data should have 2 layers for real and imag. {hdu.data.shape}" data_array = hdu.data[0] + 1j*hdu.data[1] header = hdu.header return cls(data_array=data_array, header=header)
[docs] def to_hdu(self,): """ Returns and hdu object to save into fits .. admonition:: Note This also updates the header if dimension changes """ real_valued_data = np.array([self.data_array.real, self.data_array.imag], dtype=float) myhdu = fits.hdu.ImageHDU(name=self.name,data=real_valued_data, header=self.header) print("Updating header:\n", fits.HeaderDiff(myhdu.header, self.header)) self.header = myhdu.header return myhdu
[docs] def __len__(self): pass
@property def shape(self): return self.data_array.shape
[docs] class OI_ARRAY(NI_EXTENSION): __doc__ = """ ``OI_ARRAY`` definition Args: data_table: The data to hold header: The associated fits header """ + NI_EXTENSION.__doc__ name="OI_ARRAY"
from astropy.constants import c as cst_c
[docs] class OI_WAVELENGTH(NI_EXTENSION): __doc__ = """ An object storing the OI_WAVELENGTH information, in compatibility with OIFITS practices. **Shorthands:** * ``self.lambs`` : ``ArrayLike`` [m] returns an array containing the center of each spectral channel. * ``self.dlmabs`` : ``ArrayLike`` [m] an array containing the spectral bin widths. * ``self.nus`` : ``ArrayLike`` [Hz] an array containing central frequencies of the spectral channels. * ``self.dnus`` : ``ArrayLike`` [Hz] an array containing the frequency bin widths. """ + NI_EXTENSION.__doc__ name = "OI_WAVELENGTH" @property def lambs(self): return self.data_table["EFF_WAVE"].data @property def dlambs(self): return self.data_table["EFF_BAND"].data @property def nus(self): return (cst_c/(self.lambs*u.m)).value @property def dnus(self): return (cst_c/(self.dlambs*u.m)).value
[docs] class NI_OSWAVELENGTH(NI_EXTENSION): __doc__ = """ An object storing the wavelength before a downsampling. This must have the wavelength for each of the slice of the CATM matrix, each of the ``NI_MOD`` phasors and each column of the ``NI_DSAMP`` matrix. If ``NI_OOSWAVELENGTH`` is absent, assume that there is no over or down- sampling and take the values directly from ``OI_WAVELENGTH``. **Shorthands:** * ``self.lambs`` : ``ArrayLike`` [m] returns an array containing the center of each spectral channel. * ``self.dlmabs`` : ``ArrayLike`` [m] an array containing the spectral bin widths. * ``self.nus`` : ``ArrayLike`` [Hz] an array containing central frequencies of the spectral channels. * ``self.dnus`` : ``ArrayLike`` [Hz] an array containing the frequency bin widths. """ + NI_EXTENSION.__doc__ name = "OI_WAVELENGTH" @property def lambs(self): return self.data_table["EFF_WAVE"].data @property def dlambs(self): return self.data_table["EFF_BAND"].data @property def nus(self): return (cst_c/(self.lambs*u.m)).value @property def dnus(self): return (cst_c/(self.dlambs*u.m)).value
from dataclasses import field from typing import List
[docs] @dataclass class OI_TARGET(NI_EXTENSION): """ ``OI_TARGET`` definition. """ # target: List[str] = field(default_factory=list) # raep0: float = 0. # decep0: float = 0. name="OI_TARGET"
[docs] @classmethod def from_scratch(cls, ): """ Creates the OI_TARGET object with an empty table. **Returns: OI_TARGET: object with an empty table. Use ``add_target()`` to finish the job. """ data_table = Table(names=["TARGET_ID", "TARGET", "RAEP0", "DECEP0", "EQUINOX", "RA_ERR", "DEC_ERR", "SYSVEL", "VELTYP", "VELDEF", "PMRA", "PMDEC", "PMRA_ERR", "PMDEC_ERR", "PARALLAX", "PARA_ERR", "SPECTYP", "CATEGORY" ], dtype=[int, str, float, float, float, float, float, float, str, str, float, float, float, float, float, float, str, str ],) return cls(data_table=data_table)
[docs] def add_target(self, target_id=0, target="MyTarget", raep0=0., decep0=0., equinox=0., ra_err=0., dec_err=0., sysvel=0., veltyp="", veldef="", pmra=0., pmdec=0., pmra_err=0., pmdec_err=0., parallax=0., para_err=0., spectyp="", category="" ): """ Use this method to add a row to the table of targets Args: param target_id : (default: 0) target : (default: "MyTarget") raep0 : (default: 0.) decep0 : (default: 0.) equinox : (default: 0.) ra_err : (default: 0.) dec_err : (default: 0.) sysvel : (default: 0.) veltyp : (default: "") veldef : (default: "") pmra : (default: 0.) pmdec : (default: 0.) pmra_err : (default: 0.) pmdec_err : (default: 0.) parallax : (default: 0.) para_err : (default: 0.) spectyp : (default: "") category : (default: "") """ self.data_table.add_row(vals=[target_id, target, raep0, decep0, equinox, ra_err, dec_err, sysvel, veltyp, veldef, pmra, pmdec, pmra_err, pmdec_err, parallax, para_err, spectyp, category ])
[docs] @dataclass class NI_CATM(NI_EXTENSION_CPX_ARRAY): __doc__ = """ The complex amplitude transfre function """ + NI_EXTENSION_CPX_ARRAY.__doc__ name = "NI_CATM" @property def M(self): return self.data_array
[docs] class NI_IOUT(NI_EXTENSION): __doc__ = """ ``NI_IOUT`` : a recording of the output values, given in intensity, flux, counts or arbitrary units. Providing unit is mandatory since 0.2. """ + NI_EXTENSION.__doc__ # TODO: resume here proper IO for units name = "NI_IOUT" @property def iout(self): return self.data_table["value"].data
[docs] def set_unit(self, new_unit, comment=None): if comment is None: comment = "The unit of the raw output flux." self.header["NIFITS IUNIT"] = (new_unit.to_string(), comment)
[docs] class NI_KIOUT(NI_EXTENSION): __doc__ = """ ``NI_KIOUT`` : a recording of the processed output values using the the post-processing matrix given by ``NI_KMAT``. Typically differential null or kernel-null. Providing unit is mandatory since 0.2. The unit should match NI_IOUT. """ + NI_EXTENSION_ARRAY.__doc__ unit: u.Unit = u.photon/u.s name = "NI_KIOUT" @property def kiout(self): return self.data_table["value"].data @property def shape(self): return self.data_table["value"].data.shape
[docs] def set_unit(self, new_unit, comment=None): if comment is None: comment = "The unit of the processed flux." self.header["NIFITS IUNIT"] = (new_unit.to_string(), comment)
[docs] class NI_KCOV(NI_EXTENSION_ARRAY): __doc__ = """ The covariance matrix for the processed data contained in KIOUT. Providing unit is mandatory since 0.2. The unit should be the unit of NI_KIOUT ^2. """ + NI_EXTENSION_ARRAY.__doc__ unit: u.Unit = (u.photon/u.s)**2 name = "NI_KCOV" @property def kcov(self): return self.data_array
[docs] def set_unit(self, new_unit, comment=None): if comment is None: comment = "The unit of the covariance matrix." self.header["NIFITS IUNIT"] = (new_unit.to_string(), comment)
[docs] @dataclass class NI_KMAT(NI_EXTENSION_ARRAY): __doc__ = """ The kernel matrix that defines the post-processing operation between outputs. The linear combination is defined by a real-valued matrix. """ + NI_EXTENSION_ARRAY.__doc__ name = "NI_KMAT" @property def K(self): return self.data_array.astype(float)
[docs] @dataclass class NI_DSAMP(NI_EXTENSION_ARRAY): __doc__ = """ The matrix that defines linear combinations of output wavelengths. It is meant to be used to down-sample the wavelengths of the forward model. The number of columns should match the number of channels described by ``NI_OSWAVELENGTH`` and the number of rows should match the number of channels described in OI_WAVELENGTH. If ``NI_DSAMP`` or ``NI_OSWAVELENGTH`` are missing, then assume the identity matrix and possibly skip the computation step. The linear combination is defined by a real-valued matrix. It is recommended that the matrix be semi-unitary on the left, so that a flux conservation is observed, and both input and outputs can be described in the same units. """ + NI_EXTENSION_ARRAY.__doc__ name = "NI_DSAMP" @property def D(self): return self.data_array.astype(float)
[docs] @dataclass class NI_IOTAGS(NI_EXTENSION): r""" Contains information on the inputs and outputs. """ data_table: Table = field(default_factory=Table) header: fits.Header = field(default_factory=fits.Header) name = "NI_MOD" @property def outbright(self): """ The flags of bright outputs """ return self.data_table["BRIGHT"].data @property def outdark(self): """ The flags of dark outputs """ return self.data_table["DARK"].data @property def outbright(self): """ The flags of photometric outputs """ return self.data_table["PHOT"].data @property def outpola(self): """ The polarization of outputs. """ return self.data_table["OUTPOLA"].data @property def inpola(self): """ The polarization of inputs. """ return self.data_table["INPOLA"].data
[docs] @dataclass class NI_MOD(NI_EXTENSION): r""" Contains input modulation vector for the given observation. The format is a complex phasor representing the alteration applied by the instrument to the light at its inputs. Either an intended modulation, or an estimated instrumental error. the dimenstions are (n_ch, n_in) The effects modeled in NI_MOD must cumulate with some that may be modeled in NI_CATM. It is recommended to include in CATM the static effects and in NI_MOD any affect that may vary throughout the observµng run. :math:`n_a \times \lambda` .. table:: ``NI_MOD``: The table of time-dependent collectorwise information. +---------------+----------------------------+------------------+-------------------+ | Item | format | unit | comment | +===============+============================+==================+===================+ | ``APP_INDEX`` | ``int`` | NA | Indices of | | | | | subaperture | | | | | (starts at 0) | +---------------+----------------------------+------------------+-------------------+ | ``TARGET_ID`` | ``int`` | d | Index of target | | | | | in ``OI_TARGET`` | +---------------+----------------------------+------------------+-------------------+ | ``TIME`` | ``float`` | s | Backwards | | | | | compatibility | +---------------+----------------------------+------------------+-------------------+ | ``MJD`` | ``float`` | day | | +---------------+----------------------------+------------------+-------------------+ | ``INT_TIME`` | ``float`` | s | Exposure time | +---------------+----------------------------+------------------+-------------------+ | ``MOD_PHAS`` | ``n_{wl},n_a`` ``float`` | | Complex phasor of | | | | | modulation for | | | | | all collectors | +---------------+----------------------------+------------------+-------------------+ | ``APPXY`` | ``n_a,2`` ``float`` | m | Projected | | | | | location of | | | | | subapertures in | | | | | the plane | | | | | orthogonal to the | | | | | line of sight and | | | | | oriented as | | | | | ``( | | | | | \alpha, \delta)`` | +---------------+----------------------------+------------------+-------------------+ | ``ARRCOL`` | ``n_a`` ``float`` | ``\mathrm{m}^2`` | Collecting area | | | | | of the | | | | | subaperture | +---------------+----------------------------+------------------+-------------------+ | ``FOV_INDEX`` | ``n_a`` ``int`` | NA | The entry of the | | | | | ``NI_FOV`` to use | | | | | for this | | | | | subaperture. | +---------------+----------------------------+------------------+-------------------+ """ data_table: Table = field(default_factory=Table) header: fits.Header = field(default_factory=fits.Header) name = "NI_MOD" @property def n_series(self): return len(self.data_table) @property def all_phasors(self): return self.data_table["MOD_PHAS"].data @property def appxy(self): """Shape n_frames x n_a x 2""" return self.data_table["APPXY"].data.astype(float) @property def dateobs(self): """ Get the dateobs from the weighted mean of the observation time from each of the observation times given in the rows of ``NI_MOD`` table. """ raise NotImplementedError(self.dateobs) return None @property def arrcol(self): """ The collecting area of the telescopes """ return self.data_table["ARRCOL"].data @property def int_time(self): """ Conveniently retrieve the integration time. """ return self.data_table["INT_TIME"].data
[docs] def create_basic_fov_data(D, offset, lamb, n): """ A convenience function to help define the FOV function and data model """ r_0 = (lamb/D)*u.rad.to(u.mas) def xy2phasor(x,y, offset): r = np.hypot(x[None,:]-offset[:,0], y[None,:]-offset[:,1]) phasor = np.exp(-(r/r_0)**2) return phasor.astype(complex) all_offsets = np.zeros((n, lamb.shape[0], 2)) indices = np.arange(n) mytable = Table(names=["INDEX", "offsets"], data=[indices, all_offsets]) return mytable, xy2phasor
[docs] class NI_FOV(NI_EXTENSION): __doc__ = r""" The ``NI_FOV`` data containing information of the field of view (vigneting) function as a function of wavelength. This can be interpreted in different ways depending on the value of the header keyword ``NIFITS FOV_MODE``. * ``diameter_gaussian_radial`` : A simple gaussian radial falloff function based on a size of :math:`\lambda/D` and a chromatic offset defined for each spectral bin. The ``simple_from_header()`` constructor can help create a simple extension with 0 offset. * More options will come. """ + NI_EXTENSION.__doc__ name = "NI_FOV"
[docs] @classmethod def simple_from_header(cls, header=None, lamb=None, n=0): r""" Constructor for a simple ``NIFITS NI_FOV`` object with chromatic gaussian profile and no offset. Args: header : (astropy.io.fits.Header) Header containing the required information such as ``NIFITS FOV_TELDIAM`` and ``NIFITS FOV_TELDIAM_UNIT`` which are used to create the gaussian profiles of radius :math:`\lambda/D` """ offset = np.zeros((n,2)) telescope_diameter_q = header["NIFITS FOV_TELDIAM"]*u.Unit(header["NIFITS FOV_TELDIAM_UNIT"]) telescope_diameter_m = telescope_diameter_q.to(u.m).value mytable, xh2phasor = create_basic_fov_data(telescope_diameter_m, offset=offset, lamb=lamb, n=n) return cls(data_table=mytable, header=header)
[docs] def get_fov_function(self, lamb: ArrayLike, n: int): """ Returns the function to get the chromatic phasor given by injection for a the index n of the time series **This method will move to the backend** Args: lamb : The array of wavelength bins [m] n : The index of the time series to compute for """ assert self.header["NIFITS FOV_MODE"] == "diameter_gaussian_radial" D = self.header["NIFITS FOV_TELDIAM"] r_0 = (lamb/D)*u.rad.to(u.mas) offset = self.data_table["offsets"][n] def xy2phasor(alpha, beta): """ Returns the phasor for a given position of the field of view Args: alpha : Position in RA beta : Position in Dec Returns: The complex phasor """ r = np.hypot(alpha[None,:]-offset[:,0], beta[None,:]-offset[:,1]) phasor = np.exp(-(r/r_0)**2) return phasor.astype(complex) return xy2phasor
# class NI_MOD(object): # """Contains input modulation vector for the given observation. The format # is a complex phasor representing the alteration applied by the instrument # to the light at its inputs. Either an intended modulation, or an estimated # instrumental error. the dimenstions are (n_ch, n_in) # # The effects modeled in NI_MOD must cumulate with some that may be modeled # in NI_CATM. It is recommended to include in CATM the static effects and in # NI_MOD any affect that may vary throughout the observing run.""" # def __init__(self, app_index, target_id, time, mjd, # int_time, mod_phas, app_xy, arrcol, # fov_index): # self.app_index = app_index # self.target_id = target_id # self.time = time # self.mjd = mjd # self.int_time = int_time # self.app_xy = app_xy # self.arrcol = arrcol # self.fov_index = fov_index # self.mod_phas = mod_phas TEST_BACKUP = True NIFITS_EXTENSIONS = np.array(["OI_ARRAY", "OI_WAVELENGTH", "NI_CATM", "NI_FOV", "NI_KMAT", "NI_MOD", "NI_IOUT", "NI_KIOUT", "NI_KCOV", "NI_DSAMP", "NI_OSWAVELENGTH", "NI_IOTAGS"]) NIFITS_KEYWORDS = [] STATIC_EXTENSIONS = [True, True, True, True, False, False, False, False, False, True, True, False]
[docs] def getclass(classname): return getattr(sys.modules[__name__], classname)
H_PREFIX = "HIERARCH NIFITS "
[docs] @dataclass class nifits(object): """Class representation of the nifits object.""" header: fits.Header = None oi_array: OI_ARRAY = None ni_catm: NI_CATM = None ni_fov: NI_FOV = None ni_kmat: NI_KMAT = None oi_wavelength: OI_WAVELENGTH = None oi_target: OI_TARGET = None ni_mod: NI_MOD = None ni_iout: NI_IOUT = None ni_kiout: NI_KIOUT = None ni_kcov: NI_KCOV = None ni_dsamp: NI_DSAMP = None ni_oswavelength: NI_OSWAVELENGTH = None ni_iotags: NI_IOTAGS = None
[docs] @classmethod def from_nifits(cls, filename: str): """ Create the nifits object from the HDU extension of an opened fits file. """ if isinstance(filename, fits.hdu.hdulist.HDUList): hdulist = filename else: hdulist = fits.open(filename) obj_dict = {} header = hdulist["PRIMARY"].header obj_dict["header"] = header for anext in NIFITS_EXTENSIONS: if hdulist.__contains__(anext): theclass = getclass(anext) theobj = theclass.from_hdu(hdulist[anext]) obj_dict[anext.lower()] = theobj else: print(f"Missing {anext}") print("Checking header", isinstance(header, fits.Header)) print("contains_header:", obj_dict.__contains__("header")) return cls(**obj_dict)
[docs] def to_nifits(self, filename:str = "", static_only: bool = False, dynamic_only: bool = False, static_hash: str = "", writefile: bool = True, overwrite: bool = False): """ Write the extension objects to a nifits file. Args: static_only : (bool) only save the extensions corresponding to static parameters of the model (NI_CATM and NI_FOV). Default: False dynamic_only : (bool) only save the dynamic extensions. If true, the hash of the static file should be passed as `static_hash`. Defaultult: False static_hash : (str) The hash of the static file. Default: "" """ # TODO: Possibly, the static_hash should be a dictionary with # a hash for each extension self.header["HIERARCH NIFITS VERSION"] = (__standard_version__, f"Writen with rlaugier/nifits v{__version__}") hdulist = fits.HDUList() hdu = fits.PrimaryHDU() if static_only: extension_list = NIFITS_EXTENSIONS[STATIC_EXTENSIONS] elif dynamic_only: extension_list = NIFITS_EXTENSIONS[np.logical_not(STATIC_EXTENSIONS)] else: extension_list = NIFITS_EXTENSIONS hdulist.append(hdu) for anext in extension_list: print(anext, hasattr(self,anext.lower())) if hasattr(self, anext.lower()): print(anext.lower(), flush=True) theobj = getattr(self, anext.lower()) if theobj is not None: thehdu = theobj.to_hdu() hdulist.append(thehdu) hdu.header[H_PREFIX + anext] = "Included" # TODO Possibly we need to do this differently: # TODO Maybe pass the header to the `to_hdu` method? else: hdu.header[H_PREFIX + anext] = "Not included (None)" print(f"Warning: {anext} was present but empty") else: hdu.header[H_PREFIX + anext] = "Not included" print(f"Warning: Could not find the {anext} object") print(hdu.header) if writefile: hdulist.writeto(filename, overwrite=overwrite) return hdulist else: return hdulist
[docs] def check_unit_coherence(self): """ Check the coherence of the units of and prints the result NI_IOUT, NI_KCOV, and NI_KIOUT if they exist. Otherwise, does nothing. """ if hasattr(self, "ni_iout"): print("NI_IOUT", self.ni_iout.unit) else: print("No NI_IOUT") if hasattr(self, "ni_kiout"): print("NI_KIOUT", self.ni_kiout.unit) else: print("No NI_KIOUT") if hasattr(self, "ni_kcov"): print("NI_KCOV", self.ni_kcov.unit) else: print("No NI_KCOV") if hasattr(self, "ni_iout") and hasattr(self, "ni_kiout"): print(self.ni_iout.unit.is_equivalent(self.ni_kiout.unit)) if hasattr(self, "ni_kcov") and hasattr(self, "ni_kiout"): print(np.sqrt(self.ni_kcov.unit).is_equivalent(self.ni_kiout.unit)) if hasattr(self, "ni_kcov") and hasattr(self, "ni_iout"): print(np.sqrt(self.ni_kcov.unit).is_equivalent(self.ni_iout.unit))