import sympy as sp
import numpy as np
import scifysim as sf
import logging
from einops import rearrange
import dask.array as da
import copy
from pdb import set_trace
from scipy.linalg import sqrtm
from astropy import units
import matplotlib.pyplot as plt
import pickle
from scipy.stats import chi2, ncx2, norm
from scipy.optimize import leastsq
logit = logging.getLogger(__name__)
[docs]class noiseprofile(object):
[docs] def __init__(self, integ, asim, diffobs_series, verbose=False, n_pixsplit=1):
"""
An object to dynamically compute a parametric noise floor
**Patameters:**
* integ : An integrator object augmented by some mean
and static measurements (see below)
* asim : A simulator object (mostly used to get some combiner
and injector parameters)
* diffobs : A series of realisations of te differential observable
intensity (e.g. `diffobs = np.einsum("ij, mkj->mk",asim.combiner.K, dit_intensity)`)
**Creation makes use of:**
* integ.mean_starlight (run ``integ.mean_starlight = np.mean(starlights, axis=0)``)
* integ.mean_planetlight (run ``integ.mean_planetlight = np.mean(planetlights, axis=0)``)
* integ.get_static() (run ``integ.static = asim.computed_static``)
**Notes:**
* `self.sigma_ron` in [e-]
* `self.s_enc_bg` is defined in [ph/s/pix]
* `self.s_dark_current` is defined in [e-/s/pix]
* `self.s_d` is defined in [ph/s/ch] (that is **per channel**)
* `self.s_total_bg_current_ch` is in [e-/s/ch]
"""
# Read noise
self.mynpix = n_pixsplit
self.eta = integ.eta
self.sigma_ron = np.sqrt(self.mynpix) * integ.ron
self.sigma_ron_d = np.sqrt(2) * self.sigma_ron
self.mask_dark = asim.combiner.dark
self.mask_bright = asim.combiner.bright
self.mask_phot = asim.combiner.photometric
self.K = asim.combiner.K
# Photon noise
self.dit_0 = asim.injector.screen[0].step_time * integ.n_subexps
self.m_0, self.mp0 = asim.context.get_mags_of_sim(asim)
self.F_0 = mag2F(self.m_0)
self.s_0 = integ.mean_starlight / self.dit_0
static, dark_current, enclosure = integ.get_static()
self.s_dark_current = dark_current / self.dit_0
self.s_enc_bg = enclosure / self.dit_0 / self.eta # Must convert that back in equ. photons
self.s_d = static / self.dit_0 / self.eta # Must convert that back in equ. photon
print(f" s_d = {self.s_d} [ph/s] the static output")
print(f" s_dark_current = {self.s_dark_current} [ph-/s] equivalent the dark current")
print(f" s_enc_bg = {self.s_enc_bg} [ph-/s] equivalent the enclosure background")
self.p_0 = integ.mean_planetlight / self.dit_0
# The total background current per channel
self.s_total_bg_current_ch = self.mynpix * self.s_dark_current +\
self.eta * self.mynpix * self.s_enc_bg +\
self.eta * self.s_d.sum(axis=1)
#self.sigma_phot = np.sqrt(integ.eta * self.s_0 * self.dit_0)
#self.sigma_phot_d = np.sqrt(2) * self.sigma_phot
# Sigma phi
# Can only be computed on the differential observable
self.diff_std = np.std(diffobs_series, axis=0)
self.k_phi = self.diff_std / self.F_0
# For getting a covariance
self.diff_cov = np.cov(diffobs_series.T)
self.k_phi_m = self.diff_cov / self.F_0**2
[docs] def get_noises(self, m_star, dit, matrix=False):
"""Returns the different noises
.. admonition:: Warning!
If matrix is True, then sigma_phi_d is a covariance matrix (beware of variances on diagonal)
**Arguments:**
* m_star : Magnitude of the star
* dit : Detector integration time [s]
**Returns:**
``sigma_phot_d, sigma_phi_d`` : standard deviation vectors (or
covariance matrix for ``sigma_phi_d``)
"""
Fs = mag2F(m_star)
# small s is a flux
s_s = Fs/self.F_0 * self.s_0
s_d = self.s_d + self.mynpix*(self.s_dark_current + self.s_enc_bg)[:,None]
sigma_phot = np.sqrt(self.eta * dit * (s_d + s_s))
vark = np.einsum("ki, wi-> wk", sigma_phot**2, np.abs(self.K))
sigma_phot_d = np.sqrt(vark.flatten())
# sigma_phot_d = np.sqrt(np.sum(sigma_phot[:,self.mask_dark]**2, axis=-1)) # kernel will need different treatment here
sigma_phot_photo = sigma_phot[:,self.mask_phot]
sigma_phot_bright = sigma_phot[:,self.mask_bright]
if matrix:
Sigma_phi_d = self.eta**2 * self.k_phi_m * Fs**2
return sigma_phot_d, Sigma_phi_d
else:
sigma_phi_d = self.eta * self.k_phi * Fs
return sigma_phot_d, sigma_phi_d
[docs] def diff_noise_floor_dit(self, m_star, dit, matrix=False):
"""Returns the compound noise floor
WARNING: if matrix is True, then the result is
a covariance matrix (beware of variances on diagonal)
**Arguments:**
* m_star : Magnitude of the star
* dit : Detector integratioon time [s]
**Returns:**
Either ``sigma_tot_diff`` or ``cov_tot_diff`` depending on
``matrix``.
"""
#print(self.sigma_ron_d.shape)
#print(sigma_phot_d.shape)
#print(sigma_phi_d.shape)
if not matrix:
sigma_phot_d, sigma_phi_d = self.get_noises(m_star, dit, matrix=matrix)
sigma_tot_diff = np.sqrt(\
self.sigma_ron_d**2\
+ sigma_phot_d**2\
+ sigma_phi_d**2)
return sigma_tot_diff
else: # Note that in this case a covariance matrix will be used
sigma_phot_d, Sigma_phi_d = self.get_noises(m_star, dit, matrix=matrix)
cov_tot_diff = self.sigma_ron_d**2*np.eye(sigma_phot_d.shape[0])\
+ np.diag(sigma_phot_d**2)\
+ Sigma_phi_d
return cov_tot_diff
[docs] def plot_noise_sources(self, wls, dit=1., starmags=np.linspace(2., 6., 5),
show=True, legend_loc="best", legend_font="x-small",
ymin=0., ymax=1., n_dits=1):
sigphis = []
sigphots = []
for amag in starmags:
asigphot, asigphi = self.get_noises(amag, dit)
sigphots.append(1/np.sqrt(n_dits) * asigphot)
sigphis.append(1/np.sqrt(n_dits) * asigphi)
sigphots = np.array(sigphots)
sigphis = np.array(sigphis)
mybmap = plt.matplotlib.cm.Blues
mygmap = plt.matplotlib.cm.YlOrBr
fig = plt.figure(dpi=200)
plt.plot(wls, 1/np.sqrt(n_dits) * self.sigma_ron_d * np.ones_like(wls),color="gray",
label=r"$\sigma_{RON}$")
#plt.plot(wls, sigma_phot, label="Photon")
for i, dump in enumerate(sigphis):
msg = f"$\sigma_{{inst}}$ (mag = {starmags[i]:.1f})"
cvalue = i/sigphis.shape[0]
aline = plt.plot(wls,
sigphis[i,:], label=msg,
color=mybmap(sf.utilities.trois(cvalue, 0.,1., ymin=ymin, ymax=ymax)))
for i, dump in enumerate(sigphots):
msg = f"$\sigma_{{phot}}$ (mag = {starmags[i]:.1f})"
cvalue = i/sigphis.shape[0]
aline = plt.plot(wls,
sigphots[i,:], label=msg,
color=mygmap(sf.utilities.trois(cvalue, 0.,1., ymin=ymin, ymax=ymax)))
#plt.text(np.min(wls), asnr[0], msg)
plt.yscale("log")
plt.ylabel("Noise $[e^-]$")
plt.xlabel("Wavelength [m]")
plt.legend(fontsize=legend_font, loc=legend_loc)
plt.title(f"Noise for {n_dits} x {dit:.2f} s DITs")
if show:
plt.show()
return fig
[docs] def summary(self):
print("n_pix = ", self.mynpix, "# The number of pixels used per spectral channel")
print("t_dit_0 = ", self.dit_0, "# The reference exposure time used in MC simulations")
print("eta = ", self.eta, "# The quantum efficiency of the detector")
print("sigma_ron_d = ", self.sigma_ron_d, "# The total readout noise per spectral channel for the differential observable(s)")
print("k_phi = ", self.k_phi_m, "# The instrumental error coefficient")
print(f" s_d = {self.s_d} [ph/s] the static output")
print(f" s_dark_current = {self.s_dark_current} [e-/s] the dark current")
print(f" s_enc_bg = {self.s_enc_bg} [e-/s] the enclosure background")
[docs]class spectral_context(object):
[docs] def __init__(self, vegafile="config/vega.ini", compensate_chromatic=True,
verbose=False):
"""Spectral context for magnitude considerations
**Arguments:**
* vegafile : Either
- The parsed config file of the original simulator:
a modified copy will be created for the reference
Vega observations.
- The path *str* to a file for observation of Vega
in the same spectral configuration.
* compensate_chromatic: Argument to be passed to the new simulator
* verbose : whether to give more details
"""
if isinstance(vegafile, str):
raise AttributeError("no longer supported\
Please provide a ConfigParser object (asim.config)")
self.avega = sf.utilities.prepare_all(vegafile, update_params=False,
instrumental_errors=False, compensate_chromatic=compensate_chromatic,
verbose=verbose)
elif isinstance(vegafile, sf.parsefile.ConfigParser):
vega_config = self.create_vega_config(vegafile)
if vega_config.get("configuration", "location") == "space":
use = False
else:
use = True
self.avega = sf.utilities.prepare_all(vega_config, update_params=False,
instrumental_errors=False,
compensate_chromatic=compensate_chromatic,
verbose=verbose,
update_start_end=use)
else :
raise TypeError()
self.thevegassflux = self.avega.src.star.ss.sum(axis=1)
[docs] def create_vega_config(self, config, tarloc="Kraz"):
"""
Builds the config file for a reference observation of Vega
**Arguments:**
* config: A parsed config file from the simulator itself.
* tarloc: A star used as a proxy to observe close to Zenith.
Kraz is a good fit for Paranal.
**Returns** a modified parsed ``config`` file that has the same settings but that
is designed to reproduce an observation of Vega near zenith. Most parameters
describing the instrument are the same. A shorter sequence is created (only
the central one is to be used: index 3). The target is chosen so that it passes
near zenith at observatory.
"""
from copy import deepcopy
veg_config = deepcopy(config)
veg_config.set("target", "target", "Kraz")
veg_config.set("target", "star_radius", "2.36")
veg_config.set("target", "star_distance", "7.68")
veg_config.set("target", "star_temperature", "9600.")
veg_config.set("target", "planet_radius", "0.000001")
veg_config.set("target", "planet_temperature", "0.")
veg_config.set("target", "n_points", "7")
return veg_config
[docs] def sflux_from_vegamag(self, amag):
"""inverse of vegamag_from_ss()"""
f = self.thevegassflux*10**(-amag/2.5)
return f
[docs] def vegamag_from_ss(self, flux):
"""inverse of sflux_form_vegamag()"""
amag = -2.5*np.log10(flux/self.thevegassflux)
return amag
[docs] def get_mags_of_sim(self, asim):
"""Magnitudes of planet and star of a simulator object
**Arguments:**
* asim : A simulator object
"""
ss_star = asim.src.star.ss
ss_planet = asim.src.planet.ss
if len(asim.src.star.ss.shape) == 2:
ss_star = asim.src.star.ss.sum(axis=1)
else :
ss_star = asim.src.star.ss
if len(asim.src.planet.ss.shape) == 2:
ss_planet = asim.src.planet.ss.sum(axis=1)
else :
ss_planet = asim.src.planet
m_star = np.mean(self.vegamag_from_ss(ss_star))
m_planet = np.mean(self.vegamag_from_ss(ss_planet))
return m_star, m_planet
[docs] def get_difmap_flux(self, sim, map=None, mode="e-", K=None,
planet=None, single_out=True):
"""
Get the map in terms of flux of a given planet
Not kernel-ready
"""
if K is None:
K = sim.combiner.K
if map is None:
map = sim.maps
if planet is None:
planet = sim.src.planet
# adiffmap = map[:,:,3,:,:] - map[:,:,4,:,:]
# flux_map = adiffmap / \
# sim.vigneting_map.ds_sr * planet.ss.sum(axis=1)[None,:,None,None]
# else:
adiffmap = np.einsum("k o , s w o x y -> s w k x y", K, map)
flux_map = adiffmap / \
sim.vigneting_map.ds_sr * planet.ss.sum(axis=1)[None,:,None,None,None]
if single_out:
flux_map = flux_map[:,:,0,:,:]
if mode == "e-":
return sim.integrator.eta * flux_map
elif mode == "ph":
return flux_map
[docs] def get_maxmap(self, map, time_index=None, max=True):
"""Returns the maximum of the map
**Arguments:**
* map: A map object (asim.maps)
* time_index: An index for the (asim.sequence)
observing series
* max : (True) Returns only the max signal of the map
"""
if time_index is None:
time_index = map.shape[0]//2
adiffmap = map[time_index,:,3,:,:] - map[time_index,:,4,:,:]
if max:
return np.max(adiffmap, axis=(1,2))
else:
return adiffmap
[docs] def get_planet_max_signal(self, sim, planet=None, max=True,
time_index=None):
"""Returns the differential flux signal ph/s collected
for a given planet
**Arguments:**
* sim : A director object
* planet : a source.
* time_index: An index for the (asim.sequence)
observing series
* max : (True) Returns only the max signal of the map
"""
if planet is None:
planet=sim.src.planet
if max:
amax = self.get_maxmap(sim.maps, time_index=time_index)
planet_signal = amax/sim.vigneting_map.ds_sr * planet.ss.sum(axis=1)
return planet_signal
else:
raise NotImplementedError
[docs]class simplified_context(spectral_context):
[docs] def __init__(self, mycontext):
self.thevegassflux = mycontext.thevegassflux
[docs] def to_pickle(self, apath):
"""
Saving the context to a pickle file.
"""
with open(apath, "wb") as thefile:
pickle.dump(self, thefile)
[docs]def mag2F(mag):
F = 1*10**(-mag/2.5)
return F
[docs]def F2mag(F):
mag = -2.5*np.log10(F/1)
return mag
[docs]class BasicETC(object):
[docs] def __init__(self, asim):
"""
**Parameters:**
* ipeak : []
* pdiam : [m]
* S_collecting [m^2]
* lambda_science_range [m]
* diffuse []
* throughput []
* eta [e-/ph]
* fiber_etendue [sr]
* cold_enclosure [e-/s]
* dark_current [e-/s]
* src_names []
* contribs [ph/s]
"""
asim.integrator.update_enclosure(asim.lambda_science_range)
self.peak_contributions = np.abs(asim.combiner.Mcn[:,3,:].mean(axis=0))
self.ipeak = np.sum(self.peak_contributions)**2
self.pdiam = asim.injector.pdiam * units.m
self.odiam = asim.injector.odiam * units.m
self.S_collecting = np.pi * self.pdiam**2/4 - np.pi * self.odiam**2/4
self.lambda_science_range = asim.lambda_science_range
self.diffuse = asim.diffuse
self.throughput = self.diffuse[0].get_downstream_transmission(self.lambda_science_range)
self.eta = asim.integrator.eta * units.electron / units.photon
self.fiber_etendue = np.pi * (self.lambda_science_range / self.pdiam)**2
contribs = []
self.src_names = []
self.cold_enclosure = asim.integrator.det_sources[0]
self.dark_current = asim.integrator.det_sources[1]
self.dit_0 = asim.config.getfloat("detector", "default_dit") * units.s
self.ron = asim.integrator.ron * units.electron
for anelement in self.diffuse:
contribs.append(self.fiber_etendue \
* anelement.get_own_brightness(self.lambda_science_range) \
* anelement.get_downstream_transmission(self.lambda_science_range))
self.src_names.append(anelement.__name__)
contribs.append(asim.integrator.det_sources[0] \
/ self.eta)
self.src_names.append(asim.integrator.det_labels[0])
contribs.append(asim.integrator.det_sources[1] \
* np.ones_like(asim.integrator.det_sources[0])
/ self.eta)
self.src_names.append(asim.integrator.det_labels[1])
self.contribs = np.array(contribs) * units.photon/units.s
self.contribs_current = self.eta * self.contribs
@property
def contribs_variance_rate(self):
"""The variance rate in e-**2/s"""
variance_rate = self.contribs_current.sum(axis=0).value \
* units.electron**2/units.s
return variance_rate
[docs] def get_min_exp_time(self, planet_mag, planet_T, snr=3., dit_0=None,
verbose=False):
"""
**Arguments**:
* planet_mag : [Vega mag]
* planet_T : [K]
* snr : (3) Signal to noise ratio to reach
* dit_0 : (None) [s]
* verbose : (False)
"""
if dit_0 is None:
dit_0 = self.dit_0
if not isinstance(dit_0, units.quantity.Quantity):
dit_0 = dit_0 * units.s
total_current_planet, total_noise = self.show_signal_noise(planet_mag, dit=1.,
T=planet_T, verbose=verbose,
plot=False, show=False)
at = snr**2* np.sqrt(2) * self.contribs_variance_rate \
/total_current_planet**2
at2 = snr**2 * np.sqrt(2) \
* ( self.contribs_variance_rate \
+ self.ron**2/dit_0) \
/total_current_planet**2
return at, at2
[docs] def planet_photons(self, planet_mag, dit=1., T=None, verbose=False):
"""
**Arguments**:
* planet_mag :
* dit : (1.) [s]
* T T (None) []
* verbose : (False)
"""
if not isinstance(dit, units.quantity.Quantity):
dit = dit * units.s
if T is None:
self.fd_planet = sf.sources.mag2flux_bin(
self.lambda_science_range, planet_mag
)
else:
self.fd_planet = sf.sources.magT2flux_bin(
self.lambda_science_range,
planet_mag,
T=T
)
return dit * self.fd_planet
[docs] def show_signal_noise(self, planet_mag, dit=1., T=None, verbose=True,
decompose=True, plot=True,
show=True):
"""
**Arguments**:
* planet_mag : The magnitude of the planet in the science band (Vega).
* dit : (1.) The individual detector integration time. Typically
adjusted for different spectral resolutions. [s]
* T : (None) Temperature [K]
* verbose : (True)
* decompose : (True)
* plot : (True)
* show : (True)
"""
planet_photons = self.planet_photons(planet_mag, dit=dit, T=T,
verbose=verbose)
planet_electrons = planet_photons \
* self.ipeak * self.throughput \
* self.S_collecting * self.eta
if not isinstance(dit, units.quantity.Quantity):
dit = dit * units.s
if verbose:
print(f"Planet photons {np.sum(planet_photons)/dit:.1e} [ph/s] \
for a total of {np.sum(planet_photons):.1e} [ph]")
print(f"Mean total transmission {100*np.mean(self.throughput):.2f} \%")
print(f"Collecting {self.S_collecting:.1e} [m^2]")
print(f"Quantum efficiency {self.eta:.2f}")
print(f"Planet e- {np.sum(planet_electrons)/dit:.1e} [e-/s] \
for a total of {np.sum(planet_electrons):.1e} [e-]")
background_photons = np.sum(self.contribs * dit, axis=0)
background_electrons = background_photons * self.eta
background_sigma = np.sqrt(background_electrons.value)
total_background_sigma = np.sqrt(background_electrons.sum())*units.electron
if verbose:
print(f"Background photons {background_photons.sum()/dit:.1e} [ph/s] \
for a total of {background_photons.sum():.1e} [ph]")
print(f"Background electrons {background_electrons.sum()/dit:.1e} [ph/s] \
for a total of {background_electrons.sum():.1e} [ph]")
print(f"Photon noise std {total_background_sigma:.1e} [e-]")
print(f"SNR min {np.min(planet_electrons/background_sigma)}\
max {np.max(planet_electrons/background_sigma)}")
if plot:
contribs_current = self.contribs_current * dit
import matplotlib.pyplot as plt
afig = plt.figure()
base = np.zeros_like(np.sqrt(contrib2variance(contribs_current[0])))
print(base.unit)
for i, acontrib in enumerate(contribs_current):
acontrib_variance = contrib2variance(acontrib)
print(acontrib_variance.unit)
plt.fill_between(self.lambda_science_range,
y1=base, y2=np.sqrt(base**2 + acontrib_variance).value,
label=f"$\\sigma$ {self.src_names[i]}")
base = np.sqrt(base**2 + acontrib_variance)
plt.plot(self.lambda_science_range,
np.sqrt(np.sum(contribs_current, axis=0)).value,
color="r", linestyle="--", label="Total background noise")
plt.plot(self.lambda_science_range,
planet_electrons,
label=f"Planet L={planet_mag:.1f}, {T:.0f}K", color="k",)
plt.legend(fontsize="x-small")
plt.xlabel("Wavelength [m]")
plt.ylabel("Planet [e-] \n $\\sigma_{background}$ [e-]")
plt.title(f"DIT = {dit} [s]")
if show:
plt.show()
if plot and not show:
return planet_electrons, background_sigma, afig
else:
return planet_electrons, background_sigma
[docs]def contrib2variance(acontrib, unit=units.electron):
"""
Assumes incoming signals proportional to electrons.
Tweaks the unit to a poisson variance (e.g. from e- to e-**2)
"""
avar = acontrib*unit
return avar
##################################
# The energy detector test
##################################
[docs]def pdet_e(lamb, xsi, rank):
"""
Computes the residual of Pdet
**Arguments:**
* lamb : The noncentrality parameter representing the feature
* xsi : The location of threshold
* rank : The rank of observable
**Returns** the Pdet difference
"""
respdet = 1 - ncx2.cdf(xsi,rank,lamb)
return respdet
[docs]def residual_pdet_Te(lamb, xsi, rank, targ):
"""
Computes the residual of Pdet
**Arguments:**
* lamb : The noncentrality parameter representing the feature
* targ : The target Pdet to converge to
* xsi : The location of threshold
* rank : The rank of observable
**Returns** the Pdet difference. See *Ceau et al. 2019* for more information
"""
respdet = 1 - ncx2.cdf(xsi,rank,lamb) - targ
return respdet
[docs]def get_sensitivity_Te_old(maps, pfa=0.002, pdet=0.9, postproc_mat=None, ref_mag=10.,
verbose=True):
"""
**Deprecated**
Magnitude map at which a companion is detectable for a given map and whitening matrix.
The throughput for the signal of interest is given by the map at a reference magnitude.
The effective noise floor is implicitly described by the whitening matrix.
**Parameters**
* maps : Differential map for a given reference magnitude
* pfa : The false alarm rate used to determine the threshold
* pdet : The detection probability used to determine the threshold
* postproc_mat : The matrix of the whitening transformation ($\Sigma^{-1/2}$)
* ref_mag : The reference mag for the given map
* verbose : Print some information along the way
See *Ceau et al. 2019* for more information
**Returns**
* mags : The magnitude at which Te crosses the threshold
* fluxs: The fluxes at which Te crosses the threshold
* Tes : The values of the test statistic on the map at the reference magnitude
"""
W = postproc_mat
if len(W.shape) == 3:
nbkp = W.shape[0]*W.shape[1]
elif len(W.shape)==2:
nbkp = W
else:
raise NotImplementedError("Wrong dimension for post-processing matrix")
if len(maps.shape) == 5:
nt, nwl, nout, ny, nx = maps.shape
elif len(maps.shape) == 4:
nt, nwl, nout, no = maps.shape
raise NotImplementedError("single_datacube")
else :
raise NotImplementedError("Shape not expected")
f0 = sf.analysis.mag2F(ref_mag) #flux_from_vegamag(ref_mag)
if postproc_mat is not None:
if len(postproc_mat.shape):
postproc_mat.shape[0]
#Xsi is the threshold
xsi = chi2.ppf(1.-pfa, nbkp)
lambda0 = 0.2**2 * nbkp
#The solution lamabda is the x^T.x value satisfying Pdet and Pfa
sol = leastsq(residual_pdet_Te, lambda0,args=(xsi,nbkp, pdet))# AKA lambda
lamb = sol[0][0]
fluxs = []
mags = []
Tes = []
if verbose:
print("xsi = ", xsi)
print("nbkp = ", nbkp)
print("lambda0 = ", lambda0)
print("lambda = ", lamb)
#print(W.shape)
for (i, j), a in np.ndenumerate(maps[0,0,0,:,:]):
wsig = []
for k in range(nt):
mapsig = maps[k,:,:,i,j].flatten()
wsig.append(1/f0 * W[k,:,:].dot(mapsig))
wsig = np.array(wsig).flatten()
#wsig = 1/f0 * np.array([W[k,:,:].dot(maps[k,:,:,i,j].flat) for k in range(nt) ]).reshape((nt*nwl*nout))
flux = np.sqrt(lamb) / np.sqrt(np.sum(wsig.T.dot(wsig)))
Te = np.sum((wsig*f0).T.dot(wsig*f0)).astype(np.float64)
Tes.append(Te)
fluxs.append(flux)
mags.append(sf.analysis.F2mag(flux))
fluxs = np.array(fluxs).reshape((ny, nx))
mags = np.array(mags).reshape((ny,nx))
Tes = np.array(Tes).reshape((ny,nx))
#print(wsig.shape)
return mags, fluxs, Tes
[docs]def get_sensitivity_Te(maps, mod=np, pfa=0.002, pdet=0.9, postproc_mat=None, ref_mag=10.,
verbose=True):
"""
Magnitude map at which a companion is detectable for a given map and whitening matrix.
The throughput for the signal of interest is given by the map at a reference magnitude.
The effective noise floor is implicitly described by the whitening matrix.
**Parameters**
* maps : Differential map for a given reference magnitude and exposure time
* mod : The math module to use for vector ops default=numpy
* pfa : The false alarm rate used to determine the threshold
* pdet : The detection probability used to determine the threshold
* postproc_mat : The matrix of the whitening transformation ($\Sigma^{-1/2}$)
* ref_mag : The reference mag for the given map
* verbose : Print some information along the way
See *Ceau et al. 2019* for more information
**Returns**
* mags : The magnitude at which Te crosses the threshold
* fluxs: The fluxes at which Te crosses the threshold
* Tes : The values of the test statistic on the map at the reference magnitude
"""
W = postproc_mat
if len(W.shape) == 3:
nbkp = W.shape[0]*W.shape[1]
elif len(W.shape)==2:
nbkp = W.shape[0]
else:
raise NotImplementedError("Wrong dimension for post-processing matrix")
if len(maps.shape) == 5:
nt, nwl, nout, ny, nx = maps.shape
elif len(maps.shape) == 4:
nt, nwl, nout, no = maps.shape
raise NotImplementedError("single_datacube")
else :
raise NotImplementedError("Shape not expected")
f0 = sf.analysis.mag2F(ref_mag) #flux_from_vegamag(ref_mag)
if postproc_mat is not None:
if len(postproc_mat.shape):
postproc_mat.shape[0]
#Xsi is the threshold
xsi = chi2.ppf(1.-pfa, nbkp)
if verbose:
print("nbkp = ", nbkp)
print(xsi)
lambda0 = 0.2**2 * nbkp
#The solution lambda is the x^T.x value satisfying Pdet and Pfa
sol = leastsq(residual_pdet_Te, lambda0,args=(xsi,nbkp, pdet))# AKA lambda
lamb = sol[0][0]
w2 = 1/f0 * W
# Concatenate the wavelengths
kmap = maps.reshape((maps.shape[0], maps.shape[1]*maps.shape[2], maps.shape[3], maps.shape[4]))
# Apply the whitening
wsig = mod.einsum("s u k , s k y x -> s u y x", w2, kmap)
# Concatenate the observing sequence blocks
wsig = wsig.reshape(wsig.shape[0]*wsig.shape[1], wsig.shape[2], wsig.shape[3])
xtx = mod.einsum(" k y x , k y x -> y x ", wsig, wsig)
fluxs = np.sqrt(lamb) / np.sqrt(xtx)
mags = sf.analysis.F2mag(fluxs)
Tes = mod.einsum("k y x , k y x -> y x ", wsig*f0, wsig*f0)
return mags, fluxs, Tes
##################################
# The Neyman-Pearson test
##################################
[docs]def get_Tnp_threshold(x, Pfa):
if len(x.shape) == 1:
xTx = x.T.dot(x)
elif len(x.shape) == 5:
xTx = np.einsum("ijklm, ijklm -> lm", x, x)
return np.sqrt(xTx)*norm.ppf(1-Pfa)
[docs]def get_Tnp_threshold_map(xTx_map, Pfa):
if len(xTx_map.shape) == 2:
#xTx = x.T.dot(x)
return np.sqrt(xTx_map)*norm.ppf(1-Pfa)
[docs]def residual_pdet_Tnp(xTx, xsi, targ):
"""
Computes the residual of Pdet in a NP test.
**Arguments:**
* xTx : The noncentrality parameter representing the feature
* targ : The target Pdet to converge to
* xsi : The location of threshold
**Returns** the Pdet difference.
"""
Pdet_Pfa = 1 - norm.cdf((xsi - xTx)/np.sqrt(xTx))
return Pdet_Pfa - targ
[docs]def get_sensitivity_Tnp(maps, pfa=0.002, pdet=0.9, postproc_mat=None, ref_mag=10.,
verbose=False, use_tqdm=True):
"""
Magnitude map at which a companion is detectable for a given map and whitening matrix.
The throughput for the signal of interest is given by the map at a reference magnitude.
The effective noise floor is implicitly described by the whitening matrix.
**Arguments:**
* maps : Differential map for a given reference magnitude
* pfa : The false alarm rate used to determine the threshold
* pdet : The detection probability used to determine the threshold
* postproc_mat : The matrix of the whitening transformation ($\Sigma^{-1/2}$)
* ref_mag : The reference mag for the given map
* verbose : Print some information along the way
**Returns**: A magnitude map
"""
from scipy.stats import norm
W = postproc_mat
if len(W.shape) == 3:
nbkp = W.shape[0]*W.shape[1]
elif len(W.shape)==2:
nbkp = W.shape[0]
else:
raise NotImplementedError("Wrong dimension for post-processing matrix")
if len(maps.shape) == 5:
nt, nwl, nout, ny, nx = maps.shape
elif len(maps.shape) == 4:
nt, nwl, nout, no = maps.shape
raise NotImplementedError("single_datacube")
else :
raise NotImplementedError("Shape not expected")
f0 = sf.analysis.mag2F(ref_mag) #flux_from_vegamag(ref_mag)
# Concatenation of the wavelengths:
rmap = maps.reshape((maps.shape[0], maps.shape[1]*maps.shape[2], maps.shape[3], maps.shape[4]))
x_map = 1 * np.einsum("iok, iklm-> iolm",
postproc_mat, rmap)
# Concatenation of the sequence
x_map = x_map.reshape((x_map.shape[0]*x_map.shape[1], x_map.shape[2], x_map.shape[3]))
#x_map = rearrange(x_map, "a b c d -> (a b) c d")
#Xsi is the threshold
xTx_map = np.einsum("olm, olm -> lm", x_map, x_map)
xsi_0 = get_Tnp_threshold_map(xTx_map, pfa)
F_map = f0/np.sqrt(xTx_map) * ( norm.ppf(1-pfa, loc=0) - norm.ppf(1-pdet, loc=xTx_map, scale=np.sqrt(xTx_map)))
mag_map = sf.analysis.F2mag(F_map)
if verbose:
plt.figure()
plt.imshow(mag_map)
plt.colorbar()
plt.show()
print(x_map.shape)
print(xTx_map.dtype)
plt.figure()
plt.imshow(xsi_0)
plt.colorbar()
plt.show()
return mag_map
[docs]def get_sensitivity_Tnp_old(maps, pfa=0.002, pdet=0.9, postproc_mat=None, ref_mag=10.,
verbose=False, use_tqdm=True):
"""
**Deprecated**
Magnitude map at which a companion is detectable for a given map and whitening matrix.
The throughput for the signal of interest is given by the map at a reference magnitude.
The effective noise floor is implicitly described by the whitening matrix.
**Arguments:**
* maps : Differential map for a given reference magnitude
* pfa : The false alarm rate used to determine the threshold
* pdet : The detection probability used to determine the threshold
* postproc_mat : The matrix of the whitening transformation ($\Sigma^{-1/2}$)
* ref_mag : The reference mag for the given map
* verbose : Print some information along the way
**Returns**: A magnitude map
"""
from scipy.stats import norm
W = postproc_mat
if len(W.shape) == 3:
nbkp = W.shape[0]*W.shape[1]
elif len(W.shape)==2:
nbkp = W
else:
raise NotImplementedError("Wrong dimension for post-processing matrix")
if len(maps.shape) == 5:
nt, nwl, nout, ny, nx = maps.shape
elif len(maps.shape) == 4:
nt, nwl, nout, no = maps.shape
raise NotImplementedError("single_datacube")
else :
raise NotImplementedError("Shape not expected")
f0 = sf.analysis.mag2F(ref_mag) #flux_from_vegamag(ref_mag)
x_map = 1 * np.einsum("iok, iklm-> iolm",
postproc_mat, rearrange(maps, "a b c d e -> a (b c) d e"))
x_map = rearrange(x_map, "a b c d -> (a b) c d")
#Xsi is the threshold
xTx_map = np.einsum("olm, olm -> lm", x_map, x_map)
xsi_0 = get_Tnp_threshold_map(xTx_map, pfa)
F_map = f0/np.sqrt(xTx_map) * ( norm.ppf(1-pfa, loc=0) - norm.ppf(1-pdet, loc=xTx_map, scale=np.sqrt(xTx_map)))
mag_map = sf.analysis.F2mag(F_map)
if verbose:
plt.figure()
plt.imshow(mag_map)
plt.colorbar()
plt.show()
print(x_map.shape)
print(xTx_map.dtype)
plt.figure()
plt.imshow(xsi_0)
plt.colorbar()
plt.show()
return mag_map
## Tools for interpretation
[docs]def correlation_map(signal, maps, postproc=None, K=None, n_diffobs=1, verbose=False):
"""
Returns the raw correlation map of a signal with a map.
**Arguments:**
* signal : The signal measured on sky shape:
(n_slots, n_wl, n_outputs)
* maps : The maps for comparison shape:
(n_slots, n_wl, n_outputs, x_resolution, y_resolution)
* postproc : The whitening matrix to use
* n_diffobs : The number of differential observables for this combiner
(1 for double bracewell, 3 for VIKiNG)
**Returns**
* cmap1 : The correlation map
* xtx_map : The normalization map
"""
assert (len(signal.shape) == 3), "Input shape (slots, wavelengths, outputs)"
assert (len(maps.shape) == 5), "Maps shape (slots, wavelengths, outputs, position, position)"
print("n_diffobs", n_diffobs)
print("shape2", maps.shape[2])
if signal.shape[2] != n_diffobs:
diffobs = np.einsum("ij, mkj->mki", K, signal)
elif signal.shape[2] == n_diffobs:
diffobs = signal
if maps.shape[2] != n_diffobs:
difmap = np.einsum("ij, kljmn -> klimn", K, maps)
elif maps.shape[2] == n_diffobs:
difmap = maps
if K is not None:
wmap = np.einsum("ijk, iklm -> ijlm", postproc, rearrange(difmap, "a b c d e -> a (b c) d e"))
wsig = np.einsum("ijk, ik -> ij", postproc, rearrange(diffobs, "a b c -> a (b c)"))
else:
wmap = rearrange(difmap, "a b c d e -> a (b c) d e")
wsig = rearrange(diffobs, "a b c -> a (b c)")
if verbose:
print(wmap.shape)
print(wsig.shape)
xsi = chi2.ppf(1.-0.01, wsig.flatten().shape[0])
print(f"xsi = {xsi}")
print(f"yty = {wsig.flatten().dot(wsig.flatten())}")
cmap1 = np.einsum("ijkl, ij -> kl", wmap, wsig)
xtx_map = np.einsum("ijkl, ijkl -> kl", wmap, wmap)
#cmap2 = np.einsum("ikl, i -> kl", rearrange(wmap, "a b c d -> (a b) c d"), wsig.flatten())
return cmap1, xtx_map
[docs]def make_source(params, lambda_range, distance):
"""
Creates a source from scratch for purpose of model fitting.
**Arguments:**
* params: An lmfit Parameters object containing:
- "Sep" separation in [mas]
- "PA" position angle in [deg]
- "Radius" The radius in [R_sun]
- "Temperature" The temperature [K]
* lambda_range : array of the wavelength channels used [m]
* distance: Distance to the system [pc]
"""
planet_separation = params["Sep"].value
planet_position_angle = params["PA"].value
planet_offsetx = -planet_separation*np.sin(planet_position_angle * np.pi/180)
planet_offsety = planet_separation*np.cos(planet_position_angle * np.pi/180)
planet_offset = (planet_offsetx, planet_offsety)
mysource = sf.sources.resolved_source(lambda_range, distance,
resolved=False,
radius=params["Radius"].value,
T=params["Temperature"].value,
offset=planet_offset)
return mysource
[docs]def get_planet_residual(my_params, target_signal, asim, dit, K, postproc, diffuse, notres=False):
"""
**arguments:**
* my_params : An lmfit Parameters object containing:
- "Sep" separation in [mas]
- "PA" position angle in [deg]
- "Radius" The radius in [R_sun]
- "Temperature" The temperature [K]
* target_signal : The observed signal to fit
* asim : The simulator object
* dit : The detector integration time
* K : The matrix that transforms the outputs into observables
(single row for double bracewell)
* postproc : An array of whitening matrices for each chunk (n_chunk, n_wl x n_k)
* notres : If ``True`` this will return the non-whitened model signal
if ``False`` this will return the difference between the whitened model signal
and the whitened target.
"""
theobs = copy.deepcopy(asim.obs)
interest = make_source(my_params, asim.lambda_science_range, asim.src.distance)
#theobs = copy.deepcopy(asim.obs)
combined = make_th_exps(asim, dit, interest, diffuse, obs=theobs)
#if not notres:
#print(K.shape)
#if len(K.shape)>2:
#set_trace()
diff = np.einsum("k o, n w o -> n w k", K, combined)
#print("postproc", postproc.shape)
#print("diff", diff.shape)
wsig = np.einsum("ijk, ik -> ij", postproc, rearrange(diff, "a b c -> a (b c)"))
# Not very efficient
#set_trace()
if notres:
return diff
else:
wtarg = np.einsum("ijk, ik -> ij", postproc, rearrange(target_signal, "a b c -> a (b c)"))
resvec = wsig - wtarg
return resvec
[docs]def make_th_exps(asim, dit, interest, diffuse, obs=None):
"""
Creates a model observation of an idealized source of interest.
Simulator/obs are required to account for array projection, pointing
and combination scheme.
**Arguments:**
* asim : Simulator object
* dit : Detector integration time
* interest : Synthetic source of interest
* diffuse : The diffuse light chain, used
to model absorption of instrument/sky
* obs : A given observatory object (default: None)
**Returns:** (n_chunk, n_wl, n_out) array recorded in the integration time.
"""
if obs is None:
obs = copy.deepcopy(asim.obs)
lights = []
for i, time in enumerate(asim.sequence):
obs.point(asim.sequence[i], asim.target)
injected = asim.injector.best_injection(asim.lambda_science_range)
injected = injected.T * asim.corrector.get_phasor(asim.lambda_science_range)
if obs.observatory_location == 'space':
array = obs.get_projected_array()
else:
array = obs.get_projected_array(obs.altaz, PA=obs.PA)
filtered_starlight = diffuse[0].get_downstream_transmission(asim.lambda_science_range)
collected = filtered_starlight * asim.injector.collecting * dit
#set_trace()
lights.append(asim.combine_light(interest, injected, array, collected))
lights = np.array(lights)
return lights