Source code for scifysim.injection

# injection.py

"""
This module revolves around the injector class
At ``__init__()``, injector builds

* one atmo object for each telescope (stored in a list)
* one focuser object (badly named) for each wavelength AND each telescope
* one fiber_head object that builds the map of LP01 mode.

It then creates a generator called ``self.it`` that will return
a series of complex numbers corresponding to injection complex phasors.

**Testing:**

- ``test_fiber`` for the ``fiber_head``
- ``test_injector`` for the ``injector``

"""



import numpy as np
import threading
import time
from pathlib import Path
from scipy.interpolate import interp2d, interp1d
from xaosim import zernike



from pdb import set_trace


# amto_screen is now provided in house to provide seed
#from xaosim import wavefront as wft

shift = np.fft.fftshift
fft   = np.fft.fft2
ifft  = np.fft.ifft2


import logging

logit = logging.getLogger(__name__)

from . import utilities

parent = Path(__file__).parent.absolute()


[docs]class atmo(object): '''Atmospheric Kolmogorov-type phase screen. **Class Attributes:** - csz : size (csz x csz) of the phase screen (in pixels) - pdiam : diameter of the aperture within this array (in pixels) - rndarr : uniformly distributed random array (csz x csz) - kolm : the original phase screen (csz x csz) - kolm2 : the oversized phase screen ((csz + pdiam) x (csz + pdiam)) - qstatic : an optional quasi static aberration (pdiam x pdiam) - rms : total phase screen rms value (in nanometers) - rms_i : instant rms inside the pupil (in nanometers) **Comment:** While the attributes are documented here for reference, the prefered way of interacting with them is via the functions defined within the class. ''' # ==================================================
[docs] def __init__(self, name="GRAVITY+", csz = 512, psz=200, lsz=8.0, r0=0.2, ro_wl=3.5e-6, L0=10.0, fc=24.5, correc=1.0, seed=None, wind_speed = 1., wind_angle = 0.1, step_time = 0.01, lo_excess = 0., pdiam=8.0, config=None): ''' Kolmogorov type atmosphere + qstatic error **Parameters:** - name : a string describing the instrument - csz : the size of the Fourier array - psz : the size of the pupil in pixels - lsz : the screen linear size (in meters) - r0 : the Fried parameter (in meters) - L0 : the outer scale parameter (in meters) - fc : the cutoff frequency [lambda/D] defined by the number of actuators - correc : the correction factor to apply to controlled spatial frequencies. - wind_speed : the speed of the phas screen in [m/s] - step_time : the time resolution of the simulation [s] - config : a parsed config file ''' if config is None: # Deprecated the use of config is preferred self.csz = csz self.psz = psz self.lsz = lsz self.pdiam = pdiam self.ppscale = self.pdiam/self.psz self.wind_speed = wind_speed self.step_time = step_time self.r0 = r0 self.L0 = L0 self.r0_wl = r0_wl self.rms_i = 0.0 self.correc = correc self.lo_excess = lo_excess self.fc = fc else : self.config = config pres = self.config.getint("atmo", "pup_res") screen_oversize = self.config.getfloat("atmo", "screen_size") pdiams = self.config.getarray("configuration","diam") pdiam = pdiams[0] fc = self.config.getfloat("atmo", "fc_ao") r0 = self.config.getfloat("atmo", "r0") L0 = self.config.getfloat("atmo", "Lout") correc = self.config.getfloat("atmo", "correc") lo_excess = self.config.getfloat("atmo", "lo_excess") wind_speed = self.config.getfloat("atmo", "vwind") step_time = self.config.getfloat("atmo", "step_time") wl_mean = self.config.getfloat("photon", "lambda_cen") self.csz = int(pres*screen_oversize) self.psz = pres self.pdiam = pdiam self.ppscale = self.pdiam/self.psz self.wind_speed = wind_speed self.step_time = step_time self.lsz = self.pdiam*screen_oversize self.r0 = r0 self.L0 = L0 self.r0_wl = wl_mean self.rms_i = 0.0 self.correc = correc self.lo_excess = lo_excess self.fc = fc kolm, self.modul = atmo_screen(screen_dimension=self.csz, screen_extent=self.lsz, r0=self.r0, L0=self.L0, fc=self.fc, correc=self.correc, lo_excess=self.lo_excess, pdiam=self.pdiam, seed=seed) self.kolm = kolm.real self.qstatic = np.zeros((self.psz, self.psz)) self.shm_phs = self.qstatic self.kolm2 = np.tile(self.kolm, (2,2)) #self.kolm2 = self.kolm2[:self.sz+self.pdiam,:self.sz+self.pdiam] self.offx = 0. # x-offset on the "large" phase screen array self.offy = 0. # y-offset on the "large" phase screen array self.t = 0. self.update_step_vector(wind_angle=wind_angle) self.ttc = False # Tip-tilt correction flag # auxilliary array (for tip-tilt correction) self.xx, self.yy = np.meshgrid(np.arange(self.csz)-self.csz//2, np.arange(self.csz)-self.csz//2) self.xxnorm2 = np.sum(self.xx**2) self.yynorm2 = np.sum(self.yy**2) self.it = self.give() # Must update screen to obtain the correct scaling and correction self.update_screen()
# ==============================================================
[docs] def update_step_vector(self,wind_angle=0.1, wind_speed=None, step_time=None): """ Refresh the step vector that will be applied **Parameters:** - wind_angle : the angle of incience of the wind (values around 0.1 rad are favoured since they provide long series before repeating) - wind_speed : The speed of the moving phase screen. - step_time : The time step of the simulation [s] """ if wind_speed is not None: self.wind_speed = wind_speed if step_time is not None: self.step_time = step_time shift_length = self.wind_speed*self.step_time/self.ppscale xstep, ystep = shift_length*np.cos(wind_angle), shift_length*np.sin(wind_angle) self.step_vector = np.array((ystep, xstep))
[docs] def set_qstatic(self, qstatic=None): """ Defines some quasistatic errors in the wavefront **Parameters:** - qstatic : A static phase screen """ if qstatic is not None: if qstatic.shape == (self.csz, self.csz): self.qstatic = qstatic print("quasi-static phase screen updated!") else: print("could not update quasi-static phase screen") print("array should be %d x %d (dtype=%s)" % ( self.csz, self.csz, str(self.qstatic.dtype))) else: print("no new quasi-static screen was provided!")
# ==============================================================
[docs] def update_screen(self, correc=None, fc=None, r0=None, L0=None, seed=None): ''' Generic update of the properties of the phase-screen **Parameters:** - correc : *float* The correction factor for the control region - fc : *float* The cutoff fequency - r0 : *flaot* r0 of the initial phase screen ''' if r0 is not None: self.r0 = r0 if L0 is not None: self.L0 = L0 if correc is not None: self.correc = correc if fc is not None: self.fc = fc # Converting to a piston screen in microns at this point # r0 is now expected at the mean of the wl band kolm, self.modul = atmo_screen(screen_dimension=self.csz, screen_extent=self.lsz, r0=self.r0, L0=self.L0, fc=self.fc, correc=self.correc, lo_excess=self.lo_excess, pdiam=self.pdiam, seed=seed) self.kolm = 1.0e6 * self.r0_wl / (2*np.pi) * kolm.real # converting to piston in microns self.kolm2 = np.tile(self.kolm, (2,2))
[docs] def reset(self): self.offx = 0. self.offy = 0.
[docs] def give(self): ''' Main loop: frozen screen slid over the aperture This returns a iterator that yelds the phase screen for one aperture. **use:** .. code-block:: a = atmo.give() phscreen = next(a) phscreen = next(a) **Options:** ''' while True: self.offx += self.step_vector[1] #2 self.offy += self.step_vector[0] #1 self.t += self.step_time self.intoffx = int(np.round(self.offx % self.csz)) self.intoffy = int(np.round(self.offy % self.csz)) subk = self.kolm2[self.intoffx:self.intoffx+self.psz, self.intoffy:self.intoffy+self.psz].copy() if self.ttc is True: ttx = np.sum(subk*self.xx) / self.xxnorm2 tty = np.sum(subk*self.yy) / self.yynorm2 subk -= ttx * self.xx + tty * self.yy self.rms_i = subk.std() self.shm_phs = subk + self.qstatic yield self.shm_phs.astype(np.float32)
# ==================================================================
[docs]def atmo_screen(screen_dimension, screen_extent, r0, L0, fc=25, correc=1.0, lo_excess=0., pdiam=None,seed=None): ''' The Kolmogorov - Von Karman phase screen generation algorithm. Adapted from the work of Carbillet & Riccardi (2010). `<http://cdsads.u-strasbg.fr/abs/2010ApOpt..49G..47C>`_ Kolmogorov screen can be altered by an attenuation of the power by a correction factor *correc* up to a cut-off frequency *fc* expressed in number of cycles across the phase screen **Parameters:** - screen_dimension : the size of the array to be computed (in pixels) - screen_extent : the physical extent of the phase screen (in meters) - r0 : the Fried parameter, measured at a given wavelength (in meters) - L0 : the outer scale parameter (in meters) - fc : DM cutoff frequency (in lambda/D) - correc : correction of wavefront amplitude (factor 10, 100, ...) - lo_excess: A factor introducing excess low-order averations (mosly tip-tilt) Must be striclty 0 =< lo_excess < 1. - pdiam : pupil diameter (in meters) - seed : random seed for the screen (default: None produces a new seed) Returns: two independent phase screens, available in the real and imaginary part of the returned array. **Remarks:** If pdiam is not specified, the code assumes that the diameter of the pupil is equal to the extent of the phase screen "screen_extent". ''' #phs = 2*np.pi * (np.random.rand(screen_dimension, screen_dimension) - 0.5) rng = np.random.default_rng(np.random.SeedSequence(seed)) phs = rng.uniform(low=-np.pi, high=np.pi, size=(screen_dimension,screen_dimension)) xx, yy = np.meshgrid(np.arange(screen_dimension)-screen_dimension/2, np.arange(screen_dimension)-screen_dimension/2) rr = np.hypot(yy, xx) rr = shift(rr) rr[0,0] = 1.0 modul = (rr**2 + (screen_extent/L0)**2)**(-11/12.) if pdiam is not None: in_fc = (rr < fc * screen_extent / pdiam) else: in_fc = (rr < fc) #if not np.isclose(lo_excess, 0): # set_trace() modul[in_fc] /= correc * (1 - lo_excess * np.exp(- 0.5*rr[in_fc])) # obtaining unique rr values rru, rru_indices = np.unique(rr, return_index=True) amodul = np.sqrt(2*0.0228)*(screen_extent/r0)**(5/6.)*modul.flat[rru_indices] screen = ifft(modul * np.exp(1j*phs)) * screen_dimension**2 screen *= np.sqrt(2*0.0228)*(screen_extent/r0)**(5/6.) screen -= screen.mean() return screen, np.array([rru, amodul]).T
# ====================================================================== dtor = np.pi/180.0 # to convert degrees to radians i2pi = 1j*2*np.pi # complex phase factor
[docs]def Hn(A): return np.conjugate(np.transpose(A))
[docs]class focuser(object): ''' Generic monochoromatic camera class This class simulates focusing optics. It can simulate injection either: - by computing the product of the image plane complex amplitude with the fiber mode field, or - by computing the product of the pupil plane complex amplitude with the conjugation of the fiber mode field. ''' # =========================================================================
[docs] def __init__(self, name="SCExAO_chuck", csz=200, ysz=256, xsz=256, pupil=None,screen=None, rm_inj_piston=False, pdiam=7.92, pscale=10.0, wl=1.6e-6): ''' Default instantiation of a cam object: **Parameters are:** - name : a string describing the camera ("instrument + camera name") - csz : array size for Fourier computations - (ys,xs) : the dimensions of the actually produced image - pupil : a csz x csz array containing the pupil - pscale : the plate scale of the image, in mas/pixel - wl : the central wavelength of observation, in meters ''' self.name = name self.csz = csz # Fourier computation size self.ysz = ysz # camera vertical dimension self.xsz = xsz # camera horizontal dimension self.isz = max(ysz, xsz) # max image dimension # possible crop values (to match true camera image sizes) self.x0 = (self.isz - self.xsz) // 2 self.y0 = (self.isz - self.ysz) // 2 self.x1 = self.x0 + self.xsz self.y1 = self.y0 + self.ysz if pupil is None: self.pupil = ud(csz, csz, csz//2, True) else: self.pupil = pupil self.flatpup = self.pupil.flatten() self.screen_bias = np.zeros_like(self.flatpup) self.pupilsum = np.sum(self.pupil).astype(np.float32) self.pdiam = pdiam # pupil diameter in meters self.pscale = pscale # plate scale in mas/pixel self.fov = self.isz * self.pscale self.wl = wl # wavelength in meters self.mu2phase = np.float32(2.0 * np.pi / self.wl / 1e6) # convert microns to phase self.frm0 = np.zeros((ysz, xsz)) # initial camera frame self.btwn_pixel = True # fourier comp. centering option self.phot_noise = False # photon noise flag self.signal = np.float32(1e6) # default # of photons in frame self.corono = False # if True: perfect coronagraph self.remove_injection_piston = rm_inj_piston self.npix = np.count_nonzero(self.pupil) # final tune-up self.update_cam() self.define_TT()
[docs] def define_TT(self): """ Defines tip and tilt phase screens based on the pupil """ self.tip = self.pupil*zernike.mkzer(*zernike.noll_2_zern(2), self.pupil.shape[0], 100, limit=False) self.tip = np.pi*self.tip/np.max(self.tip) self.ntip = self.tip.flatten()/self.tip.flatten().dot(self.tip.flatten()) self.tilt = self.pupil*zernike.mkzer(*zernike.noll_2_zern(3), self.pupil.shape[0], 100, limit=False) self.tilt = np.pi*self.tilt/np.max(self.tilt) self.ntilt = self.tilt.flatten()/self.tilt.flatten().dot(self.tilt.flatten())
# =========================================================================
[docs] def update_pupil(self, pupil, reference_pupil): assert (pupil.dtype==bool), "Given wrong pupil data type. Needs bool." assert (pupil.shape==self.pupil.shape) self.pupil = pupil self.flatpup = self.pupil.flatten() self.screen_bias = np.zeros_like(self.flatpup) self.pupilsum = np.sum(self.pupil).astype(np.float32) self.update_signal(self.pupilsum * 1./reference_pupil.sum())
[docs] def update_cam(self, wl=None, pscale=None, between_pixel=None): ''' Change the filter, the plate scale or the centering of the camera **Parameters:** - pscale : the plate scale of the image, in mas/pixel - wl : the central wavelength of observation, in meters - between_pixel : whether FT are centered between four pixels or not ''' wasgoing = False if wl is not None: self.wl = wl try: del self._A1 except AttributeError: print("sft aux array to be refreshed") pass if pscale is not None: self.pscale = pscale try: del self._A1 except AttributeError: print("SFT aux array to be refreshed") pass if between_pixel is not None: self.btwn_pixel = between_pixel try: del self._A1 except AttributeError: print("SFT aux array to be refreshed") pass self.ld0 = self.wl/self.pdiam*3.6e6/dtor/self.pscale # l/D (in pixels) self.nld0 = self.isz / self.ld0 # nb of l/D across the frame tmp = self.sft(np.zeros((self.csz, self.csz)))
# =========================================================================
[docs] def update_signal(self, nph=1e6): ''' Update the strength of the signal Automatically sets the *phot_noise* flag to *True* *IF* the value provided is negative, it sets the *phot_noise* flag back to *False* and sets the signal back to 1e6 photons **Parameters:** - nph: the total number of photons inside the frame ''' if (nph > 0): self.signal = np.float32(nph) self.phot_noise = True else: self.signal = np.float32(1e6) self.phot_noise = False
# =========================================================================
[docs] def sft(self, A2): ''' Class specific implementation of the explicit Fourier Transform The algorithm is identical to the function in the sft module, except that intermediate FT arrays are stored for faster computation. For a more generic implementation, refer to the sft module of this package. Assumes the original array is square. No need to "center" the data on the origin. ''' try: test = self._A1 # look for existence of auxilliary arrays except AttributeError: logit.info("updating the Fourier auxilliary arrays") NA = self.csz NB = self.isz m = self.nld0 self._coeff = m/(NA*NB) U = np.zeros((1, NB)) X = np.zeros((1, NA)) offset = 0 if self.btwn_pixel is True: offset = 0.5 X[0, :] = (1./NA)*(np.arange(NA)-NA/2.0+offset) U[0, :] = (m/NB)*(np.arange(NB)-NB/2.0+offset) sign = -1.0 self._A1 = np.exp(sign*i2pi*np.dot(np.transpose(U), X)) self._A3 = np.exp(sign*i2pi*np.dot(np.transpose(X), U)) self._A1 *= self._coeff B = (self._A1.dot(A2)).dot(self._A3) return np.array(B)
# =========================================================================
[docs] def getimage(self, phscreen=None): ''' Produces an image, given a certain number of phase screens, and updates the shared memory data structure that the camera instance is linked to with that image If you need something that returns the image, you have to use the class member method get_image(), after having called this method. **Parameters:** - phscreen: The piston map in µm ''' #from pdb import set_trace # Here phs = np.zeros((self.csz, self.csz), dtype=np.float64) # phase map if phscreen is not None: # a phase screen was provided if self.remove_injection_piston: phs += phscreen - np.mean(phscreen[self.pupil]) else: phs += phscreen wf = np.exp(1j*phs*self.mu2phase) wf *= np.sqrt(self.signal / self.pupilsum) # signal scaling wf *= self.pupil # apply the pupil mask self._phs = phs * self.pupil # store total phase self.fc_pa = self.sft(wf) # focal plane cplx ampl return self.fc_pa
[docs] def get_inv_image(self, image): B = Hn(self._A1).dot(image.dot(Hn(self._A3))) return B
[docs] def get_injection(self, phscreen): """ Computes the complex injection phasor based on fiber mode in the pupil plane. **Parameters:** - phscreen: The piston map in µm """ #set_trace() phs = phscreen.flatten() phs += -np.mean(phs[self.flatpup]) + self.screen_bias # We do only the needed operations thanks to masking wf = np.sqrt(self.signal / self.pupilsum) * np.exp(1j*phs[self.flatpup]*self.mu2phase) # signal scaling #wf *= self.pupil # apply the pupil mask #self._phs = phs * self.pupil # store total phase #self.fc_pa = self.sft(wf) # focal plane cplx ampl injected = self.flat_masked_lppup.dot(wf) return injected
[docs] def get_tilt(self, phscreen=None): ''' Measures the tip-tilt measurement corresponding to the wavefront provided. The tip-tilt is in a 2-array and the unit is lambda/D where D is the extent of the pupil mask. **Parameters:** - phscreen: The piston map in µm ''' #from pdb import set_trace # Here phs = np.zeros((self.csz, self.csz), dtype=np.float64) # phase map if phscreen is not None: # a phase screen was provided if self.remove_injection_piston: phs += phscreen - np.mean(phscreen[self.pupil]) else: phs += phscreen phase = self.pupil*phs*self.mu2phase tip = phase.flatten().dot(self.ntip) tilt = phase.flatten().dot(self.ntilt) #wf *= np.sqrt(self.signal / self.pupil.sum()) # signal scaling #wf *= self.pupil # apply the pupil mask #self._phs = phs * self.pupil # store total phase #self.fc_pa = self.sft(wf) # focal plane cplx ampl return np.array([tip, tilt])
[docs]class injector(object):
[docs] def __init__(self,pupil="VLT", pdiam=8.,odiam=1., ntelescopes=4, tt_correction=None, no_piston=False, lambda_range=None, fiber_mode="diameter", NA = 0.23, a = 4.25e-6, ncore = 2.7, wl_mfd = None, focal_hrange=20.0e-6, focal_res=50, pscale = 4.5, interpolation=None, rm_inj_piston=False, seed=None, atmo_config=None): """ Generates fiber injection object. **Parameters:** - pupil : The telescope pupil to consider - pdiam : The pupil diameter - ntelescopes : The number of telescopes to inject - pupil : Apupil name or definition - tt_correction : Amount of TT to correct (Not implemented yet) - no_piston : Remove the effect of piston at the injection (So that it is handled only by the FT.) - NA : Then numerical aperture of the fiber - a : The radius of the core (m) - ncore : The refractive index of the core - focal_hrange : The half-range of the focal region to simulate (m) - focal_res : The total resolution of the focal plane to simulate - pscale : The pixel scale for imager setup (mas/pix) - seed : Value to pass for random phase screen initialization - atmo_config: A parsed config file Use: call ``next(self.it)`` that returns injection phasors For more information, look into the attributes. To get the ideal injection: ``self.best_injection(lambdas)`` """ if lambda_range is None: self.lambda_range = np.linspace(3.0e-6, 4.2e-6, 6) else: self.lambda_range = lambda_range self.ntelescopes = ntelescopes self.pdiam = pdiam self.odiam = odiam self.collecting = np.pi/4*(self.pdiam**2 - self.odiam**2) self.atmo_config = atmo_config ########## self.fiber_mode = fiber_mode self.NA = NA # 0.23#0.21 self.a = a # 4.25e-6#3.0e-6 self.ncore = ncore # 2.7 self.wl_mfd = wl_mfd self.focal_hrange = focal_hrange # 20.0e-6 self.focal_res = focal_res # 50 self.pscale = pscale #Now calling the common part of the config self.pupil = pupil self.interpolation = interpolation self.rm_inj_piston = rm_inj_piston self._setup(seed=seed) # Preparing iterators self.it = self.give_fiber() if interpolation is not None: self.compute_best_injection(interpolation=interpolation) self.get_efunc = self.give_interpolated(interpolation=interpolation) self.focal_planes = None self.injected = None
[docs] @classmethod def from_config_file(cls, file=None, fpath=None, focal_res=50, pupil=None, seed=None): """ Construct the injector object from a config file **Parameters:** file : A pre-parsed config file fpath : The path to a config file nwl : The number of wl channels focal_res : The total resolution of the focal plane to simulate Gathers the variables from the config file then calls for a class instance (``__init__()``) """ from scifysim import parsefile if file is None: logit.debug("Need to read the file") assert fpath is not None , "Need to provide at least\ a path to a config file" logit.debug("Loading the parsefile module") theconfig = parsefile.parse_file(fpath) confpath = Path(fpath).parent else: logit.debug("file provided") assert isinstance(file, parsefile.ConfigParser), \ "The file must be a ConfigParser object" theconfig = file if fpath is None: logit.error("Now we use fpath to provide the root for appendix config files") else: confpath = Path(fpath) # Numerical aperture fiber_mode = theconfig.get("fiber", "fiber_mode") wl_mfd = theconfig.get("fiber", "fib_wav") NA = theconfig.getfloat("fiber", "num_app") # Core radius a = theconfig.getfloat("fiber", "core") if np.isclose(a, 0., atol=1.0e-10): logit.warning("Core radius is set to 0 : optimize") logit.warning("Not implemented in scifysim: set to 4.25") a = 4.25e-6 else: a = theconfig.getfloat("fiber", "core") # Core index ncore = theconfig.getfloat("fiber", "core_index") # Wavelength coverage lambcen = theconfig.getfloat("photon", "lambda_cen") lambwidth = theconfig.getfloat("photon", "bandwidth") interpolation = theconfig.get("photon", "injection_spectral_interp") #nwls = theconfig.getint("photon", "n_spectral_science") nwl = theconfig.getint("photon", "n_spectral_injection") if "None" in interpolation: nwl = nwls elif "nearest": pass elif "linear" in interpolation: if nwl<1: logit.warning("Need minimum 2 spectral chanels at injection") logit.warning("to get quadratic interpolation (selected %d)"%(nwl)) nwl = max(2, nwl) elif "quadratic" in interpolation: if nwl<3: logit.warning("Need minimum 3 spectral chanels at injection") logit.warning("to get quadratic interpolation (selected %d)"%(nwl)) nwl = max(3, nwl) elif "cubic" in interpolation: if nwl<4: logit.warning("Need minimum 4 spectral chanels at injection") logit.warning("to get cubic interpolation (selected %d)"%(nwl)) else : print("Interpolation: ", interpolation) raise ValueError("Unrecognised interpolation") lambmin = lambcen - lambwidth/2 lambmax = lambcen + lambwidth/2 lambda_range = np.linspace(lambmin, lambmax, nwl) pdiams = theconfig.getarray("configuration", "diam") pdiam = pdiams[0] odiams = theconfig.getarray("configuration", "cen_obs") odiam = odiams[0] separate_atmo_file = theconfig.getboolean("appendix", "use_atmo") if separate_atmo_file: rel_path = theconfig.get("appendix", "atmo_file") if confpath is not None: logit.warning("file for atmo configuration:") logit.warning((confpath/rel_path).absolute()) atmo_config = parsefile.parse_file(confpath/rel_path) else: logit.error("confp was not provided") raise NameError("Need to provide fpath") else: logit.warning("Using same file for atmo configuration") atmo_config = theconfig #atmo_mode = theconfig.get("atmo", "atmo_mode") #if "seeing" in atmo_mode: # r0 = seeing_to_r0(theconfig.getfloat("atmo","seeing"), lambcen) #else: # r0 = theconfig.getfloat("atmo", "r0") # Focal scale and range focal_res = focal_res focal_hrange = theconfig.getfloat("fiber", "focal_hrange") pscale = theconfig.getfloat("fiber", "pscale") if pupil is None: pres = theconfig.getint("atmo", "pup_res") radius = pres//2 pupil = tel_pupil(pres, pres, radius, file=theconfig, tel_index=0) ntelescopes = theconfig.getint("configuration", "n_dish") rm_inj_piston = theconfig.getboolean("atmo", "remove_injection_piston") obj = cls(pupil=pupil, pdiam=pdiam, odiam=odiam, ntelescopes=ntelescopes, tt_correction=None, no_piston=False, lambda_range=lambda_range, atmo_config=atmo_config, fiber_mode=fiber_mode, NA=NA, a=a, wl_mfd=wl_mfd, ncore=ncore, focal_hrange=focal_hrange, focal_res=focal_res, pscale=pscale, interpolation=interpolation, rm_inj_piston=rm_inj_piston, seed=seed) obj.config = theconfig return obj
[docs] def _setup(self, seed=None): """ Common part of the setup Nota: the seed for creation of the screen is incremented by 1 between pupils """ self.phscreensz = self.pupil.shape[0] self.screen = [] self.focal_plane = [] for i in range(self.ntelescopes): if seed is not None: theseed = seed + i else: theseed = seed self.screen.append(atmo(config=self.atmo_config, seed=theseed, wind_angle=0.08+0.01*i)) self.focal_plane.append([focuser(csz=self.phscreensz, xsz=self.focal_res, ysz=self.focal_res, pupil=self.pupil, pscale=self.pscale, pdiam=self.pdiam, wl=wl, rm_inj_piston=self.rm_inj_piston) for wl in self.lambda_range]) if self.fiber_mode == "gaussian": self.fiber = gaussian_fiber_head(NA=self.NA) elif self.fiber_mode == "diameter": self.fiber = fiber_head() else : self.fibfiber = fiber_head() # Caluclating the focal length of the focuser self.focal_length = self.focal_hrange/utilities.mas2rad(self.focal_plane[0][0].fov/2) for fp in self.focal_plane: for wl in range(self.lambda_range.shape[0]): fp[wl].signal = 1. fp[wl].phot_noise = False self.LP01 = self.fiber self.LP01.full_consolidation(self.NA, self.a, self.ncore) ### Still need to figure out the focal scale self.lpmap = self.LP01.numerical_evaluation(self.focal_hrange, self.focal_res, self.lambda_range) self.lpmap = np.abs(self.lpmap) quartiles = [] for i, amap in enumerate(self.lpmap): quartiles.append([i*np.max(amap)/4 for i in range(4)]) self.map_quartiles = np.array(quartiles) # Computing the mode of the fiber in the pupil plane for all scopes and wavelengths # Although they should be all the same lppup = [] for i, scope in enumerate(self.focal_plane): # Iterating ont the scopes focal_wl = [] for i, fiberwl in enumerate(scope): # Iterating on the wavelengths a_lp_pup = fiberwl.pupil * fiberwl.get_inv_image(self.lpmap[i,:,:]) fiberwl.lppup = a_lp_pup fiberwl.flat_masked_lppup = fiberwl.lppup.flatten()[fiberwl.flatpup] # This is pre-computation/ to optimize the computation focal_wl.append(a_lp_pup) lppup.append(focal_wl) self.lppup = np.array(lppup)
[docs] def reset_screen(self): for ascreen in self.screen: ascreen.reset()
[docs] def give_fiber(self,): while True: focal_planes = [] for i, scope in enumerate(self.focal_plane): thescreen = next(self.screen[i].it) focal_wl = [] for fiberwl in scope: focal_wl.append(fiberwl.getimage(thescreen)) focal_planes.append(focal_wl) focal_planes = np.array(focal_planes) self.focal_planes = focal_planes self.injected = np.sum(self.focal_planes*self.lpmap[None,:,:,:], axis=(2,3)) yield self.injected
[docs] def give_comparison(self,): """ A test comparison between image and pupil plane injection """ while True: focal_planes = [] thescreens = [] for i, scope in enumerate(self.focal_plane): thescreen = next(self.screen[i].it) thescreens.append(thescreen) focal_wl = [] for fiberwl in scope: focal_wl.append(fiberwl.getimage(thescreen)) focal_planes.append(focal_wl) focal_planes = np.array(focal_planes) #self.focal_planes = focal_planes orig_injected = np.sum(focal_planes*self.lpmap[None,:,:,:], axis=(2,3)) focal_planes = [] for i, scope in enumerate(self.focal_plane): thescreen = thescreens[i] focal_wl = [] for ascope in scope: focal_wl.append(ascope.get_injection(thescreen)) #pupil_injected = self.pupil focal_planes.append(focal_wl) focal_planes = np.array(focal_planes) pupil_injected = focal_planes #self.focal_planes = focal_planes #self.injected = np.sum(self.focal_planes*self.lpmap[None,:,:,:], axis=(2,3)) yield orig_injected, pupil_injected
[docs] def all_inj_phasors(self,lambdas): """ Convenience function that applies interpolation for all the inputs """ outs = np.array([self.einterp[i](lambdas) for i in range(self.ntelescopes)]) return outs
[docs] def best_injection(self,lambdas): """ Convenience function that applies interpolation for all the inputs """ outs = np.array([self.best_einterp[i](lambdas) for i in range(self.ntelescopes)]) return outs
[docs] def give_interpolated_injection_image_plane(self,interpolation): """ This one will yield the method that interpolates all the injection phasors """ while True: focal_planes = [] for i, scope in enumerate(self.focal_plane): thescreen = next(self.screen[i].it) focal_wl = [] for fiberwl in scope: focal_wl.append(fiberwl.getimage(thescreen)) focal_planes.append(focal_wl) focal_planes = np.array(focal_planes) self.focal_planes = focal_planes self.injected = np.sum(self.focal_planes*self.lpmap[None,:,:,:], axis=(2,3)) #self.injected = np.array([]) self.einterp = [interp1d(self.lambda_range, self.injected[i,:],kind=interpolation, fill_value="extrapolate")\ for i in range(self.ntelescopes)] yield self.all_inj_phasors
[docs] def give_interpolated(self,interpolation): """ This one will yield the method that interpolates all the injection phasors **Arguments:** * interpolation: type of interpolation to use based on the computed n (typically n=3) wavefronts. """ while True: injection_values = [] for i, scope in enumerate(self.focal_plane): thescreen = next(self.screen[i].it) inj_wl = [] for fiberwl in scope: inj_wl.append(fiberwl.get_injection(thescreen)) injection_values.append(inj_wl) injection_values = np.array(injection_values) #self.focal_planes = focal_planes self.injected = injection_values #self.injected = np.array([]) self.einterp = [interp1d(self.lambda_range, self.injected[i,:],kind=interpolation, fill_value="extrapolate")\ for i in range(self.ntelescopes)] yield self.all_inj_phasors
[docs] def compute_best_injection(self,interpolation): """ This one will yield the method that interpolates ideal injection **Arguments:** * interpolation: type of interpolation to use based on the computed n (typically n=3) wavefronts. """ from scipy.interpolate import interp1d focal_planes = [] for i, scope in enumerate(self.focal_plane): thescreen = np.zeros_like(next(self.screen[i].it)) focal_wl = [] for fiberwl in scope: focal_wl.append(fiberwl.getimage(thescreen)) focal_planes.append(focal_wl) focal_planes = np.array(focal_planes) self.focal_planes = focal_planes self.injected = np.sum(self.focal_planes*self.lpmap[None,:,:,:], axis=(2,3)) # Here we dump the first interpolation which returns nans for some reason. # This seeems to be a bug in the library. dump = interp1d(self.lambda_range, self.injected[0,:],kind=interpolation, fill_value="extrapolate") self.best_einterp = [interp1d(self.lambda_range, self.injected[i,:],kind=interpolation, fill_value="extrapolate")\ for i in range(self.ntelescopes)]
#from pdb import set_trace #set_trace()
[docs] def compute_injection_function(self, interpolation="linear", tilt_res=50, tilt_range=2.): """ Computes an interpolation of the injection as a function of a tip-tilt ``injector.injection_abs(wl [m], offset [lambda/D])`` """ from scipy.interpolate import interp2d meanwl = np.mean(self.lambda_range) # tilt_vector goes for 1 lambda/D tilt_vector = np.linspace(-meanwl/2*1e6, meanwl/2*1e6, self.phscreensz)[None,:] * np.ones(self.phscreensz)[:,None] offset = np.linspace(0., tilt_range, tilt_res) injecteds = [] for k, theoffset in enumerate(offset): thescreen = theoffset * tilt_vector focal_planes = [] for i, scope in enumerate(self.focal_plane): focal_wl = [] for fiberwl in scope: focal_wl.append(fiberwl.getimage(thescreen)) # focal_wl.append(fiberwl.get_injection(thescreen)) focal_planes.append(focal_wl) focal_planes = np.array(focal_planes) injected = np.sum(focal_planes*self.lpmap[None,:,:,:], axis=(2,3))[0] # print(focal_planes.shape) # injected = np.sum(focal_planes[0]) #print(injected.dtype) injecteds.append(injected) injecteds = np.array(injecteds) points_x, points_y = np.array(np.meshgrid(self.lambda_range, offset)) print(points_x.shape) print(injecteds.flatten().shape) self.injection_rate = LinearNDInterpolator(points=list(zip(points_x.flatten(), points_y.flatten())), values=np.abs(injecteds.flatten())**2, fill_value=0.) # self.injection_rate = unsorted_intepr2d(self.lambda_range, offset, np.abs(injecteds)**2, kind=interpolation, fill_value=0.) self.injection_rate.__doc__ = """rate(wavelength[m], offset[lambda/D])""" self.injection_arg = LinearNDInterpolator(points=list(zip(points_x.flatten(), points_y.flatten())), values=np.angle(injecteds.flatten()), fill_value=0.) # self.injection_arg = unsorted_interp2d(self.lambda_range, offset, np.angle(injecteds), kind=interpolation, fill_value=0.) self.injection_arg.__doc__ = """phase(wavelength[m], offset[lambda/D])""" return
[docs] def update_screens(self): """ Reset all the screens for the injection """ for ascreen in self.screen: ascreen.update_screen() # Discarding the first value : quick fix for python 3.9?? discard = next(self.get_efunc)(self.lambda_range)
[docs]class fringe_tracker(object):
[docs] def __init__(self, theconfig, seed=None, precompute=False): """ Creates the module that simulates the OPD residual from fring tracker performance theconfig : A parsed config file seed : Seed for the generation of random OPDs """ self.config = theconfig self.seed = seed self.precompute = precompute self.reference_file = self.config.get("fringe tracker", "reference_file") self.wet_atmosphere = self.config.getboolean("fringe tracker", "wet_atmosphere") self.dry_scaling = self.config.getfloat("fringe tracker", "dry_scaling") self.wet_scaling = self.config.getfloat("fringe tracker", "wet_scaling") self.static_bias_scaling = self.config.getfloat("fringe tracker", "static_bias_scaling") self.n_tel = self.config.getint("configuration","n_dish") data = np.loadtxt(parent/self.reference_file, comments="#") self.ref_freqs = data[:,0] self.ref_ps_phase = data[:,1] self.ref_ps_disp = data[:,2] self.ref_dt = 1/(2*np.max(self.ref_freqs)) # timestep: the one that will be used in the simulator logit.warning("Loading keyword step_time from [atmo] in fringe_tracking") self.timestep = self.config.getfloat("atmo", "step_time")
[docs] def prepare_time_series(self,lamb, duration=10, replace=True): """ Call to refresh the time series to use duration : The duration of the time series to prepare replace : Replace the time series (otherwise append) """ logit.warning("Preparing a fringe tracking residual time series") element_duration = self.ref_dt * self.ref_ps_phase.shape[0] elements_needed = int(duration/element_duration) + 1 dryps = [] disps = [] for k in range(self.n_tel): #Building up to the required length dryp = None disp = None for i in range(elements_needed + 1): dryp = utilities.random_series_fft(self.ref_ps_disp, matchto=dryp, keepall=True, seed=self.seed) dryp = dryp -(1-self.static_bias_scaling)*np.mean(dryp) # Make sure to increment the seed every use if self.seed is not None: self.seed = self.seed+1 disp = utilities.random_series_fft(self.ref_ps_disp, matchto=disp, keepall=True, seed=self.seed) if self.seed is not None: self.seed = self.seed+1 dryps.append(dryp) disps.append(disp) # Assumbling into an array of columns dryps = np.array(dryps).T * self.dry_scaling disps = np.array(disps).T * self.wet_scaling logit.warning("Dry pistons and dispersion residuals scaling refreshed") self.ref_sample_times = np.arange(0, self.ref_dt * dryps.shape[0], self.ref_dt) #desired_sample_times = np.arange(0, duration, self.timestep) if replace or (): self.dry_piston_series = dryps self.dispersion_series = disps else: self.dry_piston_series = np.concatenate((self.dry_piston_series, self.dry_piston_series), axis=0) self.dispersion_series = np.concatenate((self.dispersion_series, self.dispersion_series), axis=0) self.prepare_interpolation() self.phasor = self.iterator(lamb)
[docs] def iterator(self, lamb): """ Iterator that yields the phasors of fringe tracker residuals The iterator sets for precomputed or direct interpolation depending on the configuration. It also sets for wet or dry computation depending on the configuration. """ if not self.precompute: available = int(np.max(self.ref_sample_times)/self.timestep) if not self.wet_atmosphere: i = 0 while True: if i>=available: i = 0 self.prepare_time_series(lamb, duration=10, replace=True) yield self.get_phasor_dry(i, lamb) i += 1 else: logit.error("Wet atmosphere not implemented") raise NotImplementedError("Wet atmosphere not implemented") else: self.interpolate_batch(np.max(self.ref_sample_times)) if not self.wet_atmosphere: precomp_length = self.precomputed_series_piston.shape[0] i = 0 while True: if i>=precomp_length: i = 0 self.prepare_time_series(lamb, duration=10, replace=True) yield self.get_phasor_precomputed_dry(i,lamb) i += 1 else: logit.error("Wet atmosphere not implemented") raise NotImplementedError("Wet atmosphere not implemented")
[docs] def prepare_interpolation(self): """ Mandatory Computes the interpolation functions to be used later save: ``self.piston_interpolation()`` ``self.dispersion_interpolation`` """ self.piston_interpolation = interp1d(self.ref_sample_times, self.dry_piston_series, axis=0, kind="linear") self.dispersion_interpolation = interp1d(self.ref_sample_times, self.dispersion_series, axis=0, kind="linear")
[docs] def interpolate_batch(self, duration): """ optional: Prepares lookup tables for direct lookup of piston values """ desired_sample_times = np.arange(0, duration, self.timestep) self.precomputed_series_piston = self.piston_interpolation(desired_sample_times) self.precomputed_series_dispersion = self.dispersion_interpolation(desired_sample_times)
[docs] def get_phasor_precomputed_dry(self, i, lamb): """ Precomputed computation are recommended for longer time series when trying to factor-in long timescale effects. One can prepare the "low" resolution series, and dump the original dataset. """ phase = self.precomputed_series_dispersion[i,:][:,None] *2*np.pi / lamb[None,:] return np.ones_like(phase) * np.exp(1j*phase)
[docs] def get_phasor_dry(self, i, lamb): phase = self.piston_interpolation(i*self.timestep)[:,None] * 2 * np.pi / lamb[None,:] return np.ones_like(phase) * np.exp(1j*phase)
# TODO : try legacy: bisplev or RectBivariateSpline # For modern try: interpn, LineraNDInterpolator from scipy.interpolate import LinearNDInterpolator # class unsorted_interp2d(interp2d): # def __call__(self, x, y, dx=0, dy=0): # if (len(x) == 1) and (len(y) == 1): # return interp2d.__call__(self, x, y, dx=dx, dy=dy, assume_sorted=True) # asx = np.argsort(x) # usx = np.argsort(asx) # asy = np.argsort(y) # usy = np.argsort(asy) # return interp2d.__call__(self, x[asx], y[asy], dx=dx, dy=dy, assume_sorted=True)[usy,:][:,usx] from scipy.special import j0, k0 from scipy.constants import mu_0, epsilon_0 import matplotlib.pyplot as plt import numpy as np import sympy as sp import logging logit = logging.getLogger(__name__) from scipy.special import j0, j1, k0 from sympy.utilities.lambdify import implemented_function J0 = implemented_function("J_0", j0) J1 = implemented_function("J_1", j1) K0 = implemented_function("K_0", k0) from sympy.functions.elementary.piecewise import Piecewise from kernuller import fprint
[docs]class gaussian_fiber_head(object):
[docs] def __init__(self, NA=None, mfd=None, wl_mfd=None, apply=True): """ A class that constructs a gaussian approximation for a fiber mode. Numerical evaluations are based on NA or mfd and wl_mfd, depending on which is provided. ufuncs are stored in: ``Hy_r`` ``Hy_xy`` **Parameters:** - NA : The numerical aperture - mfd : The mode field diameter [m] - wl_mfd : The wavelength at which the mfd is measured [m] """ self.r, self.x, self.y = sp.symbols("r, x, y", real=True) self.lamb, self.lamb0 = sp.symbols("lambda, lambda_0", real=True, positive=True) self.w, self.R, self.k = sp.symbols("w, R, k", real=True) # q, the beam parameter self.q = 1/(1/self.R - sp.I*self.lamb/(sp.pi*self.w**2)) qwaist = self.q.subs([(self.R, sp.oo)]) self.u = 1/self.q*sp.exp(-sp.I*self.k/2*(self.x**2+self.y**2)*1/self.q) self.ur = 1/self.q*sp.exp(-sp.I*self.k/2*self.r**2*1/self.q) uwaist = self.u.subs([(self.R, sp.oo), (self.k, 2*sp.pi/self.lamb)]) #self.nclad, self.ncore, self.eps0, self.mu = sp.symbols("n_clad n_core epsilon_0, mu", real=True) #self.lamb, self.r, self.a = sp.symbols("lambda r a", real=True) #self.U, self.V, self.W = sp.symbols("U V W", real=True) #self.c_H = sp.symbols("c_H", real=True) self.NA = sp.symbols("N.A.", real=True, positive=True) # ? taken from Shaklan and Roddier? self.d = sp.symbols("d", real=True) # pupil diameter self.define_substitutions() self.build_equation() self.thesubs = []#self.subcore_na if NA is not None: self.NA_value = NA self.mfd_value = None self.wl_mfd_value = None self.thesubs.append((self.NA, self.NA_value)) elif mfd is not None: self.mfd_value = mfd self.wl_mfd_value = wl_mfd self.NA_value = sp.N((self.lamb0/(sp.pi*self.w_0)).subs([(self.lamb0, self.wl_mfd_value), (self.w_0, self.mfd_value/2)])) self.thesubs.append((self.NA, self.NA_value)) #self.consolidate_equation(self.thesubs) if apply: self.consolidate_equation(self.thesubs)
[docs] def define_substitutions(self): self.w_0 = sp.symbols("w_0", real=True, positive=True) self.z = sp.symbols("z", real=True) self.zr = sp.pi*self.w_0**2/self.lamb self.Rz = (self.z**2 + self.zr**2)/self.z # that one is not used for now #fprint(Rz, "R(z) = ") self.wz = self.w_0*sp.sqrt(1+self.z**2/self.zr**2) #fprint(wz, "w(z) = ") # This one is the cool one! self.ur_lamb = self.ur.subs([(self.k, 2*sp.pi/self.lamb), (self.w, self.wz), (self.R, self.Rz)]) self.ncore, self.nclad = sp.symbols("n_{core}, c_{clad}", real=True, positive=True)
[docs] def build_equation(self): self.Hy = self.ur_lamb.subs([(self.z, 0), (self.w_0, self.lamb/self.NA)])
#fprint(Hy,"H_y = ")
[docs] def consolidate_equation(self, thesubs): self.Hy_consolidated = self.Hy.subs(thesubs) # Apply the subsitutions #self.Hy_r = self.Hy_r = sp.lambdify((self.r, self.NA, self.lamb),self.Hy_consolidated) self.subr = (self.r, sp.sqrt(self.x**2+self.y**2)) self.Hy_xy = sp.lambdify((self.x, self.y, self.NA, self.lamb), self.Hy_consolidated.subs([self.subr]))
[docs] def full_consolidation(self, NA=None, mfd=None, wl_mfd=None): """ Completes the consolidation of the lambda function with the application parameters: **Parameters:** NA : The numerical aperture mfd : The mode field diameter in [m] wl_mfd : The wavelength at which mfd is measured in [m] """ if NA is not None: thesubs = [(self.NA, NA), self.subr] else : thesubs = [(self.w_0, mfd/2), (self.lamb0, wl_mfd), self.subr] self.Hy_xylambda = self.Hy_consolidated.subs(thesubs) self.Hy_xy_full = sp.lambdify((self.x, self.y,self.lamb),self.Hy_xylambda)
[docs] def numerical_evaluation(self, half_range, nsamples, lambs): xx, yy = np.meshgrid(np.linspace(-half_range, half_range, nsamples), np.linspace(-half_range, half_range, nsamples)) amap = self.Hy_xy_full(xx[None,:,:], yy[None,:,:], lambs[:,None,None]) map_total = np.sqrt(np.sum(np.abs(amap)**2, axis=(1,2))) self.map = amap / map_total[:,None,None] #from pdb import set_trace #set_trace() return self.map
[docs]class fiber_head(object):
[docs] def __init__(self, ): """ A class that constructs helps compute the LP01 mode of a fiber. **ufuncs are stored in:** - ``Hy_r`` - ``Hy_xy`` """ self.nclad, self.ncore, self.eps0, self.mu = sp.symbols("n_clad n_core epsilon_0, mu", real=True) self.lamb, self.r, self.a = sp.symbols("lambda r a", real=True) self.U, self.V, self.W = sp.symbols("U V W", real=True) self.c_H = sp.symbols("c_H", real=True) self.NA = sp.symbols("N.A.", real=True) # ? taken from Shaklan and Roddier? self.d = sp.symbols("d", real=True) # pupil diameter self.intensity_E0 = 8/(sp.pi*self.d**2)*sp.sqrt(self.mu/self.eps0)*1/self.nclad self.define_substitutions() self.build_equation() self.thesubs = [self.subU_VW, self.subru_neu, self.subv, self.subch, self.subcore_na, (self.mu, mu_0), (self.eps0, epsilon_0)] self.consolidate_equation(self.thesubs)
[docs] def define_substitutions(self): # Defining some basic substitutions from the paper self.subv = (self.V, ((2*sp.pi)/self.lamb)*self.a*self.NA) #Expr. of V from N.A. self.subna = (self.NA, sp.sqrt(self.ncore**2 -self.nclad**2)) #Expr of NA from nclad & ncore self.subcore_na = (self.nclad, sp.sqrt(self.ncore**2-self.NA**2)) #Expr nclad from N.A. self.subch = (self.c_H, sp.sqrt(2*self.nclad/sp.pi)*(self.eps0/self.mu)**(1/4)) #c_H from Shaklan&Roddier # From Rudolph and Neumann 1976 (cited in Coudé du Forest 1994) self.subU_VW = (self.U, sp.sqrt(self.V**2 - self.W**2)) # U from V and W self.subru_neu = (self.W, 1.1428*self.V - 0.9960) # The approximation of the transcendental eq.
[docs] def build_equation(self): self.piece1 = J0(self.U*self.r/self.a)/J0(self.U) self.piece2 = K0(self.W*self.r/self.a)/K0(self.W) self.common = self.c_H/sp.sqrt(2)*self.W*J0(self.U)/(self.a*self.V*J1(self.U)) self.Hy = Piecewise((self.common*self.piece1, self.r<self.a), (self.common*self.piece2, self.r>self.a))
#fprint(Hy,"H_y = ")
[docs] def consolidate_equation(self, thesubs): self.Hy_consolidated = self.Hy.subs(thesubs) # Apply the subsitutions self.Hy_r = sp.lambdify((self.r, self.NA, self.a, self.lamb, self.ncore),self.Hy_consolidated) self.x, self.y = sp.symbols("x, y") self.subr = (self.r, sp.sqrt(self.x**2+self.y**2)) self.Hy_xy = sp.lambdify((self.x, self.y, self.NA, self.a, self.lamb, self.ncore), self.Hy_consolidated.subs([self.subr]))
[docs] def full_consolidation(self, NA, a, ncore): """ Completes the consolidation of the lambda function with the application parameters: - NA : The numerical aperture - a : The core radius in meters - ncore : the refractive index of the core """ thesubs = [(self.NA, NA), (self.a, a), (self.ncore, ncore), self.subr] self.Hy_xylambda = self.Hy_consolidated.subs(thesubs) self.Hy_xy_full = sp.lambdify((self.x, self.y,self.lamb),self.Hy_xylambda)
[docs] def numerical_evaluation(self, half_range, nsamples, lambs): xx, yy = np.meshgrid(np.linspace(-half_range, half_range, nsamples), np.linspace(-half_range, half_range, nsamples)) amap = self.Hy_xy_full(xx[None,:,:], yy[None,:,:], lambs[:,None,None]) map_total = np.sqrt(np.sum(np.abs(amap)**2, axis=(1,2))) self.map = amap / map_total[:,None,None] #from pdb import set_trace #set_trace() return self.map
import astropy.units as units
[docs]class injection_vigneting(object): """ A shortcut to injection simulation in the case of diffuse sources injector : an injector object to emulate res : """
[docs] def __init__(self, injector, res, crop=0.8): """ A shortcut to injection simulation in the case of diffuse sources **Parameters:** - injector : an injector object to emulate - res : The resolution of the map - crop : A factor that scales the FoV of the map """ # First build a grid of coordinates lambond = (np.mean(injector.lambda_range) / injector.pdiam)*units.rad.to(units.mas) self.mas2lambond = 1/lambond hskyextent = (injector.focal_hrange/injector.focal_length)*units.rad.to(units.mas) hskyextent = hskyextent*crop self.resol = res #injector.focal_res xx, yy = np.meshgrid( np.linspace(-hskyextent, hskyextent, self.resol), np.linspace(-hskyextent, hskyextent, self.resol)) self.collecting = np.pi/4*(injector.pdiam**2 - injector.odiam**2) self.ds = np.mean(np.gradient(xx)[1]) * np.mean(np.gradient(yy)[0]) #In mas^2 self.ds_sr = (self.ds*units.mas**2).to(units.sr).value # In sr self.xx = xx.flatten() self.yy = yy.flatten() self.rr = np.sqrt(self.xx**2 + self.yy**2) self.rr_lambdaond = self.rr*self.mas2lambond print(self.mas2lambond) if not hasattr(injector, "injection_rate"): injector.compute_injection_function("linear", tilt_range=1.) mygrid = np.meshgrid(injector.lambda_range, self.rr_lambdaond) self.vig = injector.injection_rate(*mygrid) del mygrid # self.vig = injector.injection_rate(injector.lambda_range, self.rr_lambdaond) self.vig_func = injector.injection_rate self.norm = 1/np.max(self.vig)
[docs] def vigneted_spectrum(self, spectrum, lambda_range, exptime, transmission=None): """ Includes collecting area, throughput (in transmission is not None) and pixel solid angle transmission: The first object of the transmission-emission chain to be used to compute the instrument transmission. This allows to account for throughtput, including airmass. spectrum : Flux density in ph/s/sr/m^2 """ if transmission is not None: throughput = transmission.get_downstream_transmission(lambda_range) else: throughput = 1. factor = self.collecting * self.ds_sr * exptime mygrid = np.meshgrid(lambda_range, self.rr_lambdaond) vigneted_spectrum = self.vig_func(*mygrid) *\ (spectrum * factor * throughput)[None,:] del mygrid # vigneted_spectrum = self.vig_func(lambda_range, self.rr_lambdaond) *\ # (spectrum * factor * throughput)[None,:] return vigneted_spectrum
[docs]class injection_cloud(object): """ A shortcut to injection simulation in the case of diffuse sources injector : an injector object to emulate res : """
[docs] def __init__(self, injector, res, crop=1.): """ A shortcut to injection simulation in the case of diffuse sources **Parameters:** - injector : an injector object to emulate - res : The resolution of the map - crop : A factor that scales the FoV of the map """ # First build a grid of coordinates lambond = (np.mean(injector.lambda_range) / injector.pdiam)*units.rad.to(units.mas) self.mas2lambond = 1/lambond hskyextent = (injector.focal_hrange/injector.focal_length)*units.rad.to(units.mas) hskyextent = hskyextent*crop self.resol = res #injector.focal_res xys_mas = np.random.uniform(low=-hskyextent, high=+hskyextent, size=(2,res**2)) xx = xys_mas[0,:] yy = xys_mas[1,:] self.collecting = np.pi/4*(injector.pdiam**2 - injector.odiam**2) self.ds = hskyextent**2 / res**2#In mas^2 self.ds_sr = (self.ds*units.mas**2).to(units.sr).value # In sr self.xx = xx.flatten() self.yy = yy.flatten() self.rr = np.sqrt(self.xx**2 + self.yy**2) self.rr_lambdaond = self.rr*self.mas2lambond print(self.mas2lambond) if not hasattr(injector, "injection_rate"): injector.compute_injection_function("linear", tilt_range=1.) mygrid = np.meshgrid(injector.lambda_range, np.linspace(0, np.max(self.rr_lambdaond), res)) self.vig = injector.injection_rate(*mygrid) del mygrid # self.vig = injector.injection_rate(injector.lambda_range, self.rr_lambdaond) self.vig_func = injector.injection_rate self.norm = 1/np.max(self.vig)
[docs] def vigneted_spectrum(self, spectrum, lambda_range, exptime, transmission=None): """ transmission: The first object of the transmission-emission chain to be used to compute the instrument transmission. This allows to account for throughtput, including airmass. spectrum : Flux density in ph/s/sr/m^2 """ if transmission is not None: throughput = transmission.get_downstream_transmission(lambda_range) else: throughput = 1. factor = self.collecting * self.ds_sr * exptime mygrid = np.meshgrid(lambda_range, self.rr_lambdaond) vigneted_spectrum = self.vig_func(*mygrid) *\ (spectrum * factor * throughput)[None,:] del mygrid # vigneted_spectrum = self.vig_func(lambda_range, self.rr_lambdaond) *\ # (spectrum * factor * throughput)[None,:] return vigneted_spectrum
[docs]def test_fiber(): NA = 0.21 a = 3.0e-6 lamb0 = 3.0e-6 lambmax = 4.2e-6 focrange = 8.0e-6 lamb_range = np.linspace(lamb0, lambmax, 10) xx, yy = np.meshgrid(np.linspace(-focrange, focrange, 100), np.linspace(-focrange, focrange, 100)) myfiber = fiber_head() for asub in myfiber.thesubs: fprint(asub[1], sp.latex(asub[0])+" = ") fprint(myfiber.Hy, "H_y = ") print("Fiber for the following parameters") print(r"N.A. = %.2f, a = %.1f µm, \lambda = [%.1f..%.1f] (µm)"% (NA, a*1e6, lamb0*1e6, lambmax*1e6)) LPMAP = myfiber.Hy_xy(xx[None,:,:], yy[None,:,:], 0.21, 3.0e-6, lamb_range[:,None, None], 2.7) LPMAP = LPMAP/LPMAP.sum() myfiber.full_consolidation(0.21, 3.0e-6, 2.7) LPMAP2 = myfiber.numerical_evaluation(10.0e-6, 50, lamb_range) plt.figure() plt.imshow(np.abs(LPMAP2[0,:,:])) plt.colorbar() plt.show() plt.figure() plt.imshow(np.abs(LPMAP2[9,:,:])) plt.colorbar() plt.show() rn = np.linspace(0., 8.0e-6, 1000) H1D = myfiber.Hy_r(rn, 0.21, 3.0e-6, 3.6e-6, 2.7) plt.figure() plt.plot(rn, H1D) plt.show() return myfiber
[docs]def test_injection(phscreensz=200, r0=8., interpolation=None, seed=20127): """ Remember to pass seed=None if you want a random initialization """ import xaosim # Construct a pupil using xaosim apup = xaosim.pupil.VLT(phscreensz, phscreensz,phscreensz/2) myinst = injector(pupil=apup, r0=r0, interpolation=interpolation, seed=seed) import matplotlib.pyplot as plt injected = next(myinst.it) #contourlabels = ["0.","0.25","0.5", "O.75"] #tweaking the colormap showing the pupil cutout current_cmap = plt.matplotlib.cm.get_cmap("coolwarm") current_cmap.set_bad(color='black') plt.figure(figsize=(8,4),dpi=100) for i in range(myinst.ntelescopes): plt.subplot(1,myinst.ntelescopes,i+1) plt.imshow((myinst.focal_plane[i][0]._phs/myinst.focal_plane[i][0].pupil), cmap=current_cmap) plt.show() plt.figure(figsize=(8,4),dpi=100) for i in range(myinst.ntelescopes): plt.subplot(2,myinst.ntelescopes,i+1) plt.imshow(np.abs(myinst.focal_planes[i,0]), cmap="Blues") CS = plt.contour(myinst.lpmap[0], levels=myinst.map_quartiles[0], colors="black") plt.clabel(CS, inline=1, fontsize=6) plt.subplot(2,myinst.ntelescopes,i+1+myinst.ntelescopes) plt.imshow(np.abs(myinst.focal_planes[i,-1]), cmap="Reds") CS = plt.contour(myinst.lpmap[-1], levels=myinst.map_quartiles[-1], colors="black") plt.clabel(CS, inline=1, fontsize=6) plt.suptitle("Injection focal plane (contours: LP01 mode quartiles)") plt.show() tindexes = ["Telescope %d"%(i) for i in range(injected.shape[0])] plt.figure() width = 0.1 for i in range(injected.shape[1]): plt.bar(np.arange(4)+i*width,np.abs(injected[:,i]), width, label="%.1f µm"%(myinst.lambda_range[i]*1e6)) plt.legend(loc="lower right",fontsize=7, title_fontsize=8) plt.ylabel("Injection amplitude") plt.title("Injection amplitude for each telescope by WL") plt.show() plt.figure() width = 0.1 for i in range(injected.shape[1]): plt.bar(np.arange(4)+i*width,np.angle(injected[:,i]), width, label="%.1f µm"%(myinst.lambda_range[i]*1e6)) plt.legend(loc="lower right",fontsize=7, title_fontsize=8) plt.ylabel("Injection phase (radians)") plt.title("Injection phase for each telescope by WL") plt.show() return myinst
[docs]def test_injection_fromfile(phscreensz=200, fpath="/home/rlaugier/Documents/hi5/SCIFYsim/scifysim/config/default_new_4T.ini", seed=20127): """ **Remember** to pass ``seed=None`` if you want a **random initialization** """ import xaosim # Construct a pupil using xaosim apup = xaosim.pupil.VLT(phscreensz, phscreensz,phscreensz/2) myinst = injector.from_config_file(fpath=fpath, pupil=apup, seed=seed) import matplotlib.pyplot as plt injected = next(myinst.it) #contourlabels = ["0.","0.25","0.5", "O.75"] #tweaking the colormap showing the pupil cutout current_cmap = plt.matplotlib.cm.get_cmap("coolwarm") current_cmap.set_bad(color='black') plt.figure(figsize=(8,4),dpi=100) for i in range(myinst.ntelescopes): plt.subplot(1,myinst.ntelescopes,i+1) plt.imshow((myinst.focal_plane[i][0]._phs/myinst.focal_plane[i][0].pupil), cmap=current_cmap) plt.show() plt.figure(figsize=(8,4),dpi=100) for i in range(myinst.ntelescopes): plt.subplot(2,myinst.ntelescopes,i+1) plt.imshow(np.abs(myinst.focal_planes[i,0]), cmap="Blues") CS = plt.contour(myinst.lpmap[0], levels=myinst.map_quartiles[0], colors="black") plt.clabel(CS, inline=1, fontsize=6) plt.subplot(2,myinst.ntelescopes,i+1+myinst.ntelescopes) plt.imshow(np.abs(myinst.focal_planes[i,-1]), cmap="Reds") CS = plt.contour(myinst.lpmap[-1], levels=myinst.map_quartiles[-1], colors="black") plt.clabel(CS, inline=1, fontsize=6) plt.suptitle("Injection focal plane (contours: LP01 mode quartiles)") plt.show() tindexes = ["Telescope %d"%(i) for i in range(injected.shape[0])] plt.figure() width = 0.1 for i in range(injected.shape[1]): plt.bar(np.arange(4)+i*width,np.abs(injected[:,i]), width, label="%.1f µm"%(myinst.lambda_range[i]*1e6)) plt.legend(loc="lower right",fontsize=7, title_fontsize=8) plt.ylabel("Injection amplitude") plt.title("Injection amplitude for each telescope by WL") plt.show() plt.figure() width = 0.1 for i in range(injected.shape[1]): plt.bar(np.arange(4)+i*width,np.angle(injected[:,i]), width, label="%.1f µm"%(myinst.lambda_range[i]*1e6)) plt.legend(loc="lower right",fontsize=7, title_fontsize=8) plt.ylabel("Injection phase (radians)") plt.title("Injection phase for each telescope by WL") plt.show() return myinst
# An implementation from Ruiliier 2005 (thèse) # WIP
[docs]def optimize_a(): import sympy as sp rho, beta, alpha = sp.symbols("rho, beta, alpha", real=True, positive=True) D, omega0, lamb, f = sp.symbols("D, omega_0, lambda, f", real=True, positive=True) rhoexpr = 2*((sp.exp(-beta**2) - sp.exp(-beta**2*alpha**2) ) / (beta*sp.sqrt(1-alpha**2) ) )**2 betaexpr = sp.pi*D*omega0/(2*lamb*f) rhoexpr = rhoexpr.subs([(beta, betaexpr)]) sp.solve(rhoexpr.diff(omega0), omega0) return
[docs]def tel_pupil(n,m, radius, file=None, pdiam=None, odiam=None, spiders=True, between_pix=True, tel_index=0): ''' returns an array that draws the pupil of the VLT at the center of an array of size (n,m) with radius "radius". This is an approximation to reduce number of parameters: offset an angle are approximated. See xaosim.pupil.VLT for original Parameters describing the pupil were deduced from a pupil mask description of the APLC coronograph of SPHERE, by Guerri et al, 2011. `<http://cdsads.u-strasbg.fr/abs/2011ExA....30...59G>`_ ''' import xaosim if file is not None: if pdiam is None: pdiam = file.getarray("configuration","diam")[tel_index] if odiam is None: odiam = file.getarray("configuration","cen_obs")[tel_index] else: if (odiam is None) or (pdiam is None): raise ValueError("Provide either a file or kw values") # Those are fill values, and should not be very important thick = 0.04 # adopted spider thickness (meters) offset = odiam #1.11 # spider intersection offset (meters) beta = 50. #50.5 # spider angle beta apupil = xaosim.pupil.four_spider_mask(m, n, radius, pdiam, odiam, beta, thick, offset, spiders=spiders, between_pix=between_pix) return apupil
[docs]def test_injection_function(asim): import matplotlib.pyplot as plt asim.injector.compute_injection_function("linear", tilt_range=1.) os = np.linspace(0,1., 500) plt.figure() wl = np.linspace(3e-6, 4e-6, 8) rates = asim.injector.injection_rate(wl, os) args = asim.injector.injection_arg(wl, os) for i, awl in enumerate(wl) : plt.plot(os, rates[:,i], label=r"%.1f $\mu m$"%(awl*1e6)) plt.legend() plt.show() plt.figure() for i, awl in enumerate(wl) : plt.plot(os, args[:,i], label=r"%.1f $\mu m$"%(awl*1e6)) plt.legend() plt.show()
[docs]def seeing_to_r0(seeing, wl): """ seeing : seeing in arcseconds wl : wl in m """ r0 = wl / (seeing/3600*np.pi/180) return r0
# =========================================================== # ===========================================================