Source code for nifits.extra.extra

"""
NIFITS Extra
-------------

The NIFITS extra contains some additional tools to go somewhat **beyond** the
a simple implementation of the NIFITS standard, and facilitate the usage of 
common powerful techniques of nulling interferometry data reduction and
interpretation.

This includes:

* statistical whitening of observables (Ceau et al. 2019, Laugier et al. 2023)
* Correlation maps (work in progress)
* Test statistics Te and Tnp from (Ceau et al. 2019, Laugier et al. 2023)
* Monte-Carlo simulation of instrumental noise (coming soon)
* Various plotting macros (coming soon)

At present, this is done through a new class Post that inherits from Backend
offering dirctly whitened forward models prfixed with ``w_``, and ``NI_KIOUT``

The extra module is currently considered unstable. Please contact us if you make
use of its feature: we can give you warnings and help when our updates are at 
risk of breaking your code. It is likely that the installation of the extra
module becomes optional.

"""


# extra.py
import nifits.io as io
from nifits.io.oifits import NIFITS_EXTENSIONS, STATIC_EXTENSIONS
from nifits.io.oifits import nifits as NIFITSClass
import numpy as np
import matplotlib.pyplot as plt
import astropy.units as units
import nifits.backend as be
from scipy.linalg import sqrtm
import types
from einops import rearrange
from copy import copy
from scipy.stats import ncx2 as ncx2
from scipy.stats import norm as norm
from scipy.stats import chi2 as chi2


[docs] class Post(be.NI_Backend): """ This variant of the backend class offers a statistically whitened alternate forward model with directly whitened observables by calling ``w_`` prefixed methods. (Ceau et al. 2019, Laugier et al. 2023) After normal construction, use ``create_whitening_matrix()`` to update the whitening matrix based on the ``NI_KCOV`` data. Use ``w_get_all_outs`` and ``get_moving_outs`` in the same way, but they return whitened observables. Compare it to ``self.nifits.ni_kiout.w_kiout`` instead of ``self.nifits.ni_kiout.kiout``. """
[docs] def create_whitening_matrix(self, replace: bool = False, md: types.ModuleType = np): """ Updates the whitening matrix: Args: replace : If true, the forward model methods are replaced by the whitened ones. The old ones get a ``old_`` prefix. md : A numpy-like backend module. The pile of whitening matrices is stored as ``self.Ws`` (one for each frame). """ self.md = md # Assertion: assert assert hasattr(self.nifits, "ni_kcov") if self.nifits.ni_kcov.header["NIFITS SHAPE"] != "frame (wavelength output)": raise NotImplementedError("Covariance shape expected: frame (wavelength output)") Ws = [] for amat in self.nifits.ni_kcov.data_array: Ws.append(np.linalg.inv(sqrtm(amat))) self.Ws = md.array(Ws) self.W_unit = self.nifits.ni_kcov.unit**(-1/2) if hasattr(self.nifits, "ni_kiout"): self.datashape = (self.nifits.ni_kiout.shape) self.flatshape = (len(self.nifits.ni_mod), self.nifits.ni_kcov.shape[1]) self.fullyflatshape = np.prod(self.flatshape) if replace: # Backup the forward model mehtods, then decorate them self.old_get_all_outs = copy(self.get_all_outs) self.get_all_outs = self.whitened_outputs(self.get_all_outs) self.old_get_moving_outs = copy(self.get_moving_outs) self.get_moving_outs = self.whitened_outputs(self.get_moving_outs) else: # Adds new whitening methods self.w_get_all_outs = self.whitened_outputs(self.get_all_outs) self.w_get_moving_outs = self.whitened_outputs(self.get_moving_outs) if hasattr(self.nifits, "ni_kiout"): self.nifits.ni_kiout.Ws = self.Ws self.nifits.ni_kiout.md = self.md self.nifits.ni_kiout.__class__.w_kiout = whitened_kiout
[docs] def whiten_signal(self, signal): """ Whitens a signal so that error covariance is identity in the new axis. Args: signal: The direct signal to whiten (differential observable a.k.a kernel-null) Returns: wout_full: the whitened signal ($\\mathbf{W}\\cdot \\mathbf{s}$) in the new basis. """ full_shape = signal.shape # Flatten the spectral and output dimension flat_full_shape = (*self.flatshape, full_shape[-1]) flat_out = rearrange(signal, "frame wavelength output source -> frame (wavelength output) source") wout = self.md.einsum("f o i , f i m -> f o m", self.Ws, flat_out) return wout
[docs] def whitened_outputs(self, func): """ A decorator methods that gives applies statistical whitening to the output of the function. The whitening relies on `self.W` which is computed as by `create_whitening_matrix`. Args: func: The function to decorate. Returns: inner: The decorated function """ def inner(*args, **kwargs): output = func(*args, **kwargs) wout_full = self.whiten_signal(output) return wout_full inner.__doc__ = """**Modified to whiten the observables (output and wavelength dimensions are fused)**\n\n"""\ + func.__doc__ return inner
[docs] def add_blackbody(self, temperature): """ Initializes the blackbody for a given temperature Args: temperature: units.Quantity """ self.bb = BB(temperature)
[docs] def get_pfa_Te(self, signal=None, md=np): """ Compute the Pfa for the energy detector test. Args: signal: The raw signal (non-whitened) Returns: pfa the false alarm probability (p-value) associated to the given signal. """ w_signal = self.whiten_signal(signal) wf_signal = md.flatten(w_signal) pfa = 1 - ncx2.cdf(wf_signal, df=self.fullyflatshape, nc=0.) return pfa
[docs] def get_pfa_Tnp(self, alphas, betas, signal=None, model_signal=None, md=np): """ Compute the Pfa for the Neyman-Pearson test. """ pass
[docs] def get_blackbody_native(self, ): """ Returns: blackbody_spectrum: in units consistent with the native units of the file $[a.sr^{-1}.m^{-2}]$ (where [a] is typically [ph/s]). """ # Typically given there in erg/Hz/s/sr/cm^2 myspectral_density = self.bb(self.nifits.oi_wavelength.lambs * units.m) # Photon_energies in J/ph photon_energies = (self.nifits.oi_wavelength.lambs*units.m).to( units.J, equivalencies=units.equivalencies.spectral())\ / units.photon dnus = self.nifits.oi_wavelength.dnus * (units.Hz) print((myspectral_density * dnus / photon_energies).unit) blackbody_spectrum = (myspectral_density * dnus / photon_energies).to( self.nifits.ni_iout.unit / units.sr / (units.m**2)) return blackbody_spectrum
[docs] def get_blackbody_collected(self, alphas, betas, kernels=True, whiten=True, to_si=True): """ Obtain the output spectra of a blackbody at the given blackbody temperature Args: alphas: ArrayLike: Relative position in rad betas: ArrayLike: Relative position in rad kernels: Bool (True) Whether to work in the kernel postprocessing space (False is not implemented yet) whiten: Bool (True) whether to use whitening post-processing (False is not implemented yet) to_si: Bool (True) convert to SI units """ collecting_map_q = units.m**2 * self.get_all_outs(alphas, betas, kernels=kernels) blackbody_spectrum = self.get_blackbody_native() collected_flux = blackbody_spectrum[None,:,None,None] \ * collecting_map_q[:,:,:,:] if whiten: blackbody_signal = self.W_unit * self.whiten_signal(collected_flux) else: blackbody_signal = collected_flux # collected_flux is in equivalent W / rad^2 # That is a power per solid angle of source if to_si: return blackbody_signal.to(blackbody_signal.unit.to_system(units.si)[0]) else: return blackbody_signal
[docs] def get_Te(self): """ Computes the Te test statistic of the current file. This test statistic is supposed to be distributed as a chi^2 under H_0. Returns: Te : x.T.dot(x) where x is the whitened signal. """ if hasattr(self.nifits, "ni_kiout"): kappa = self.whiten_signal(self.nifits.ni_kiout.kiout) else: raise NotImplementedError("Needs a NI_KIOUT extension") x = kappa.flatten() return x.T.dot(x)
[docs] def get_pdet_te(self, alphas, betas, solid_angle, kernels=True, pfa=0.046, whiten=True, temperature=None): """ pfa: * 1 sigma: 0.32 * 2 sigma: 0.046 * 3 sigma: 0.0027 """ if temperature is not None: self.add_blackbody(temperature) ref_spectrum = solid_angle * self.get_blackbody_collected(alphas, betas, kernels=kernels, whiten=True, to_si=True) print(ref_spectrum.unit) threshold = ncx2.ppf(1-pfa, df=self.fullyflatshape, nc=0.) x = ref_spectrum.reshape(-1, 1000) xTx = np.einsum("o m , o m -> m", x, x) pdet_Pfa = 1 - ncx2.cdf(threshold, self.fullyflatshape, xTx) return pdet_Pfa
[docs] def get_sensitivity_te(self, alphas, betas, kernels=True, temperature=None, pfa=0.046, pdet=0.90, distance=None, radius_unit=units.Rjup, md=np): """ .. code-block:: python from scipy.stats import ncx2 xs = np.linspace(-10, 10, 100) ys = np.linspace(1e-6, 0.999, 100) u = 1 - ncx2.cdf(xs, df=10, nc=0) v = ncx2.ppf(1 - ys, df=10, nc=0) plt.figure() plt.plot(xs, u) plt.show() plt.figure() plt.plot(ys, v) plt.plot(u, xs) plt.show() """ from scipy.optimize import leastsq if temperature is not None: self.add_blackbody(temperature) ref_spectrum = self.get_blackbody_collected(alphas=alphas,betas=betas, kernels=kernels, whiten=True, to_si=True) print("Ref spectrum unit: ", ref_spectrum.unit) threshold = ncx2.ppf(1-pfa, df=self.fullyflatshape, nc=0.) x = (ref_spectrum).reshape((-1, ref_spectrum.shape[-1])) print("Ref signal (x) unit: ", x.unit) print("Ref signal (x) shape: ", x.shape) xtx = md.einsum("m i, i m -> m", x.T, x) lambda0 = 1.0e-3 * self.fullyflatshape # The solution lambda is the x^T.x value satisfying Pdet and Pfa sol = leastsq(residual_pdet_Te, lambda0, args=(threshold, self.fullyflatshape, pdet))# AKA lambda lamb = sol[0][0] # Concatenate the wavelengths lim_solid_angle = np.sqrt(lamb) / np.sqrt(xtx) if distance is None: return lim_solid_angle elif isinstance(distance, units.Quantity): dist_converted = distance.to(radius_unit) lim_radius = 2*dist_converted*md.sqrt(lim_solid_angle/md.pi) return lim_radius.to(radius_unit, equivalencies=units.equivalencies.dimensionless_angles())
[docs] def get_pdet_tnp(self, transmission_map, pfa=0.046, pdet=0.90): pass
[docs] def get_sensitivity_tnp(self, transmission_map, pfa=0.046, pdet=0.90): pass
massq2sr = (units.mas**2).to(units.sr) sr2massq = (units.sr).to(units.mas**2) from astropy.modeling.models import BlackBody as BB import astropy.constants as cst
[docs] def e2ph(energy, wl): nu = cst.c / (wl * units.m) ephot = cst.h * nu nphot = energy/ephot return nphot.to(nphot.unit.to_system(units.si)[0])
[docs] def ph2e(nphot, wl): nu = cst.c / (wl * units.m) ephot = cst.h * nu energy = nphot * ephot return energy.to(energy.unit.to_system(units.si)[0])
[docs] def whitened_kiout(self): data = self.data_table["value"].data full_shape = data.shape flat_full_shape = (full_shape[0], full_shape[1]*full_shape[2]) flat_out = rearrange(data, "frame wavelength output -> frame (wavelength output)") wout = self.md.einsum("f o i , f i -> f o", self.Ws, flat_out) wout_full = wout.reshape((full_shape)) return wout_full
# def whitening_transform(self, input): # """ # Currently unused # """ # full_shape = data.shape # flat_full_shape = (full_shape[0], full_shape[1]*full_shape[2]) # flat_out = rearrange(data, "frame wavelength output -> frame (wavelength output)") # output = self.md.einsum("f o i , f i -> f o", self.Ws, flat_out) # return output
[docs] def residual_pdet_Te(lamb, xsi, rank, targ): """ Computes the residual of Pdet **Arguments:** * lamb : The noncentrality parameter representing the feature * targ : The target Pdet to converge to * xsi : The location of threshold * rank : The rank of observable **Returns** the Pdet difference. See *Ceau et al. 2019* for more information """ respdet = 1 - ncx2.cdf(xsi,rank,lamb) - targ return respdet