Source code for scifysim.observatory


import numpy as np
import sympy as sp
from scipy.interpolate import interp1d

import kernuller

from scifysim import utilities


from astropy.time import Time
import astropy.units as u

import astroplan
from astroplan import plots
from astropy.coordinates import SkyCoord, EarthLocation, AltAz, get_sun

from itertools import combinations

import logging

logit = logging.getLogger(__name__)


"""
Basic usage:

.. code-block::

    import kernuller.observatory
    myobs = kernuller.observatory.observatory(kernuller.VLTI)
    tarnames = "Spica"
    targets = [kernuller.observatory.astroplan.FixedTarget.from_name(tar) for tar in tarnames]
    obstimes = myobs.build_observing_sequence()
    target_positions = myobs.get_positions(targets[0], obstimes)
    newarray = myobs.get_projected_array(myobs.get_positions(targets, obstimes)[0,0])
"""

[docs] class observatory(object): """ This class help define the properties of the observatory infrastructure, especially the uv coverage. """
[docs] def __init__(self, statlocs=None, location=None, verbose=False, multi_dish=True, config=None): """ Parameters: * statlocs : The station locations (optional) (east, north) for each aperture shape is (Na, 2) * location : An astropy.coordinatEarthLocation (default = Paranal) example: myloc = astroplan.Observer.at_site("Paranal", timezone="UTC") * multi_dish : When True, the geometry of the pupil varies depending on the relative position of the target, expecially in terms of projection of the pupil on the plane orthogonal to the line of sight. When False, (not implemented yet) the array is expected to be always facing the line of sight, as is the case for example with a systme like GLINT. * config : A parsed config object. * verbose : Activate verbosity in the log """ self.verbose = verbose self.config = config if location is None: location = self.config.get("configuration", "location") self.observatory_location = astroplan.Observer.at_site(location, timezone="UTC") else : self.observatory_location = astroplan.Observer.at_site(location, timezone="UTC") # self.array_config = self.config.get("configuration", "config") raw_array, array_config = utilities.get_raw_array(self.config) self.array_config = array_config; del array_config # raw_array = eval("kernuller.%s"%(self.array_config)) self.order = self.config.getarray("configuration", "order").astype(np.int16) if statlocs is None: self.statlocs = raw_array[self.order] else: self.statlocs = statlocs self.pdiams = self.config.getarray("configuration","diam") self.theta = sp.symbols("self.theta") #R handles the azimuthal rotation self.Rs = sp.Matrix([[sp.cos(self.theta), sp.sin(self.theta)], [-sp.sin(self.theta), sp.cos(self.theta)]]) self.R = sp.lambdify(self.theta, self.Rs, modules="numpy") #P handles the projection due to elevation rotation self.Ps = sp.Matrix([[1, 0], [0, sp.sin(self.theta)]]) self.P = sp.lambdify(self.theta, self.Ps, modules="numpy") #C handles the piston due to elevation. self.Cs = sp.Matrix([[0, sp.cos(self.theta)]]) self.C = sp.lambdify(self.theta, self.Cs, modules="numpy") n_tel = self.statlocs.shape[0] bl_list = list(combinations(np.arange(n_tel), 2)) bl_mat = [] for abl in bl_list: avec = np.zeros(n_tel) avec[abl[0]] = -1 avec[abl[1]] = 1 bl_mat.append(avec) bl_mat = np.array(bl_mat) self.bl_mat = bl_mat del bl_mat del bl_list
[docs] def point(self, obstime, target): """ Points the array towards the target, updating its position angle (PA) and altaz (used for airmass). These are later used by other methods to compute the projection of the array. **Parameters:** * obstime : The astropy.time.Time object corresponding to the moment of observation (usually picked from the sequence provided by self.build_observing_sequence()) * target : The astroplan.FixedTarget object of interest. Usually resolved in the self.build_observign_sequence() routine with astroplan.FixedTarget.from_name() """ self.time = obstime self.altaz = self.observatory_location.altaz(target=target, time=obstime) self.PA = self.observatory_location.parallactic_angle(obstime, target=target) current_proj_array = self.get_projected_array(self.altaz, self.PA) self.uv = self.bl_mat.dot(current_proj_array)
[docs] def build_observing_sequence(self, times=None, npoints=None, remove_daytime=False): """ **Parameters:** * times : a list of UTC time strings ("2020-04-13T00:00:00") that define an interval (if npoints is not None), or the complete list of times (if npoints is None) * npoints : The number of samples to take on the interval None means that the times is the whole list * remove_daytime : Whether to remove the points that fall during the day **Returns** the series of obstimes needed to compute the altaz positions """ if times is None: times = ["2020-04-13T00:00:00","2020-04-13T10:30:00"] #npoints is defined which means we work from define the sampling from an interval if npoints is not None: obs2502 = Time(times) dt = obs2502[1] - obs2502[0] obstimes = obs2502[0] + dt * np.linspace(0.,1., npoints) #npoints is None means the times represent a list of times else: obstimes = np.array([Time(times[i]) for i in range(len(times))]) totaltime = (obstimes[-1]-obstimes[0]).to(u.s).value if remove_daytime: logit.info("Removing daytime observations") totaltime = (obstimes[-1]-obstimes[0]).to(u.s).value halfhourpoints = int(npoints / (totaltime / 900)) totaltime = (obstimes[-1]-obstimes[0]).to(u.s) mask = self.observatory_location.sun_altaz(obstimes).alt<0 sunelev = self.observatory_location.sun_altaz(obstimes).alt rawsunelev = np.array([el.value for el in sunelev]) midnight = np.argmin(rawsunelev) sunrise = np.argmin(np.abs(rawsunelev)) logit.error("Removing daytime obs is not finallized") raise NotImplementedError #Record parallactic angles return obstimes
[docs] def get_positions(self, targets, obstimes): """ Deprecated **Parameters:** * targets: A list of SkyCoord objects * obstimes: A list of astropy.Times to make the observations **Returns** the astropy.coordinates.AltAz for a given target """ taraltaz = self.observatory_location.altaz(target=targets, time=obstimes, grid_times_targets=True) tarPA = self.observatory_location.parallactic_angle(obstimes, target=targets) return taraltaz#, tarPA
[docs] def get_position(self, target, time, grid_times_targets=False): """ **Parameters:** * target: one or a list of of targets * obstimes: one or a list of astropy.Times to make the observations **Returns** the ``astropy.coordinates.AltAz`` for a given target """ taraltaz = self.observatory_location.altaz(time, target=target, grid_times_targets=grid_times_targets) tarPA = self.observatory_location.parallactic_angle(time, target=target, grid_times_targets=grid_times_targets) logit.debug("target altaz") logit.debug(str(taraltaz.alt)+str(taraltaz.az)) logit.debug("target PA") logit.debug(tarPA) return taraltaz, tarPA
[docs] def get_projected_array(self, taraltaz=None, PA=True, loc_array=None): """ **Parameters:** * taraltaz : the astropy.coordinates.AltAz of the target * PA : parallactic angle to derotate * loc_array: the array of points to use (None: use self.statlocs) **Returns** the new coordinates for the projected array """ if taraltaz is None: taraltaz = self.altaz if loc_array is None: loc_array = self.statlocs else: print(f"Using loc_array {loc_array}") arrayaz = self.R((180 - taraltaz.az.value)*np.pi/180).dot(loc_array.T).T altazarray = self.P(taraltaz.alt.value * np.pi/180).dot(arrayaz.T).T # if PA is True: # PA = self.PA # radecarray = self.R(PA.rad).dot(altazarray.T).T # newarray = radecarray if PA is False: newarray = altazarray elif PA is True: radecarray = self.R(self.PA.rad).dot(altazarray.T).T newarray = radecarray elif isinstance(PA, u.quantity.Quantity): radecarray = self.R(PA.rad).dot(altazarray.T).T newarray = radecarray elif isinstance(PA, float): # raise TypeError("Provide PA as a quantity in [rad]") radecarray = self.R(u.rad * PA).dot(altazarray.T).T newarray = radecarray if PA is None: raise AttributeError("This path is deprecated: ") newarray = altazarray if self.verbose: logit.debug("=== AltAz position:") logit.debug("az "+ str(taraltaz.az.value -180)) logit.debug("alt "+ str(taraltaz.alt.value)) logit.debug("old array "+ str(loc_array)) logit.debug("new array "+ str(newarray)) return newarray
[docs] def get_projected_geometric_pistons(self, taraltaz=None): """ **Parameters:** * taraltaz : the astropy.coordinates.AltAz of the target **Returns** the geomtric piston resutling from the pointing of the array. """ if taraltaz is None: taraltaz = self.altaz arrayaz = self.R((180 - taraltaz.az.value)*np.pi/180).dot(self.statlocs.T).T pistons = self.C(taraltaz.alt.value * np.pi/180).dot(arrayaz.T).T if self.verbose: logit.debug("=== pistons:") logit.debug("az "+ str(taraltaz.az.value -180)) logit.debug("alt "+ str(taraltaz.alt.value)) logit.debug("old array "+ str(self.statlocs)) logit.debug("new array "+ str(pistons)) return pistons
[docs] class ObservatoryAltAz(observatory): """ For observatories that are mounted on a single alt-az mount. """
[docs] def get_projected_array(self, taraltaz=None, PA=True, loc_array=None): """ **Parameters:** * taraltaz : the astropy.coordinates.AltAz of the target * PA : parallactic angle to derotate * loc_array: the array of points to use (None: use self.statlocs) **Returns** the new coordinates for the projected array """ if taraltaz is None: taraltaz = self.altaz if loc_array is None: loc_array = self.statlocs else: print(f"Using loc_array {loc_array}") # For an AltAz mount, no need to project or rotate on Azimuth if PA is False: newarray = loc_array elif PA is True: radecarray = self.R(self.PA.rad).dot(loc_array.T).T newarray = radecarray elif isinstance(PA, u.quantity.Quantity): radecarray = self.R(PA.rad).dot(loc_array.T).T newarray = radecarray elif isinstance(PA, float): # raise TypeError("Provide PA as a quantity in [rad]") radecarray = self.R(u.rad * PA).dot(loc_array.T).T newarray = radecarray if PA is None: raise AttributeError("This path is deprecated: ") newarray = loc_array if self.verbose: logit.debug("=== AltAz position:") logit.debug("az "+ str(taraltaz.az.value -180)) logit.debug("alt "+ str(taraltaz.alt.value)) logit.debug("old array "+ str(loc_array)) logit.debug("new array "+ str(newarray)) return newarray
[docs] def get_projected_geometric_pistons(self, taraltaz=None): """ **Parameters:** * taraltaz : the astropy.coordinates.AltAz of the target **Returns** the geomtric piston resutling from the pointing of the array. """ if taraltaz is None: taraltaz = self.altaz pistons = self.C(taraltaz.alt.value * np.pi/180).dot(self.statlocs.T).T if self.verbose: logit.debug("=== pistons:") logit.debug("az "+ str(taraltaz.az.value -180)) logit.debug("alt "+ str(taraltaz.alt.value)) logit.debug("old array "+ str(self.statlocs)) logit.debug("new array "+ str(pistons)) return pistons
[docs] class SpaceObservatory(observatory):
[docs] def __init__(self, statlocs=None, location=None, verbose=False, multi_dish=True, config=None): """ Parameters: * statlocs : The station locations (optional) (east, north) for each aperture shape is (Na, 2) * location : An astropy.coordinatEarthLocation (default = Paranal) example: myloc = astroplan.Observer.at_site("Paranal", timezone="UTC") * multi_dish : When True, the geometry of the pupil varies depending on the relative position of the target, expecially in terms of projection of the pupil on the plane orthogonal to the line of sight. When False, (not implemented yet) the array is expected to be always facing the line of sight, as is the case for example with a systme like GLINT. * config : A parsed config object. * verbose : Activate verbosity in the log """ self.verbose = verbose self.config = config if location is None: location = self.config.get("configuration", "location") self.observatory_location = location self.dummy_location = astroplan.Observer.at_site("paranal", timezone="UTC") self.n_tel = self.config.getint("configuration", "n_dish") self.pdiams = self.config.getarray("configuration","diam") # self.array_config = self.config.get("configuration", "config") raw_array, array_config = utilities.get_raw_array(self.config) self.array_config = array_config; del array_config # raw_array = eval("kernuller.%s"%(self.array_config)) self.order = self.config.getarray("configuration", "order").astype(np.int16) if statlocs is None: self.statlocs = raw_array[0][self.order] else: self.statlocs = statlocs self.theta = sp.symbols("self.theta") #R handles the azimuthal rotation self.Rs = sp.Matrix([[sp.cos(self.theta), sp.sin(self.theta)], [-sp.sin(self.theta), sp.cos(self.theta)]]) self.R = sp.lambdify(self.theta, self.Rs, modules="numpy") #P handles the projection due to elevation rotation self.Ps = sp.Matrix([[1, 0], [0, sp.sin(self.theta)]]) self.P = sp.lambdify(self.theta, self.Ps, modules="numpy") #C handles the piston due to elevation. self.Cs = sp.Matrix([[0, sp.cos(self.theta)]]) self.C = sp.lambdify(self.theta, self.Cs, modules="numpy") self.x_M = np.zeros(3) self.P_M = np.diag([1, 1, 0]) self.rotation_rate = self.config.getfloat("configuration", "rotation_rate") self.time_0 = Time(self.config.get("target", "seq_start")) self.t_0 = self.time_0.to_value("unix") self.motion_type = self.config.get("configuration", "motion_type") if self.motion_type == "rotation": self.motion = self.rotation elif self.motion_type == "interpolation": self.motion = self.interpolation # TODO: switch on "array_time_type" : array or file. self.interp_t = self.config.getarray("configuration", "array_time_steps") self.interp_y = raw_array self.interpolation_function = interp1d(self.interp_t, self.interp_y, axis=0, bounds_error=False, fill_value="extrapolate") self.motion = self.interpolation # Creating the baseline matrix bl_list = list(combinations(np.arange(self.n_tel), 2)) bl_mat = [] for abl in bl_list: avec = np.zeros(self.n_tel) avec[abl[0]] = -1 avec[abl[1]] = 1 bl_mat.append(avec) bl_mat = np.array(bl_mat) self.bl_mat = bl_mat # Initializing by pointing to instant 0 self.point(self.time_0)
[docs] def time2t(self, time): """ Convenience conversion from astropy.Time object to seconds since the start of observations used internally """ return time.to_value("unix") - self.t_0
[docs] def interpolation(self, t, loc_array=None, full_output=False): """ **Arguments:** * t : time [s] * loc_array : irrelevant hereo * full_output : irrelevant here **returns:** * x_A_t [m] The 3D location of the array of apertures **Computes:** * x_A_t [m] The 3D location of the array of apertures """ if loc_array is not None: logit.warning("Passed loc_array irrelevant argument") if full_output is not None: logit.warning("Passed full_output: irrelevant argument") return self.interpolation_function(t)
[docs] def rotation(self, t, loc_array=None, full_output=False): """ Einsum: * i : Input space * o : Output space * a : Aperture """ if loc_array is None: loc_array = self.statlocs theta = self.rotation_rate * t R_rotation = basic_z_rotation(theta) assert R_rotation.shape == (3,3), f"Got shape R_rotation {R_rotation.shape}" assert loc_array.shape == (self.n_tel, 3,), f"Got shape x_M {loc_array.shape}" x_A_t = np.einsum("o i, a i -> a o", R_rotation, loc_array) if full_output: return x_A_t, theta, R_rotation else: return x_A_t
[docs] def point(self, time, target=None): """ Refreshes the parameters related to pointing and motion in particular: * `self.x_A_t` * `self.R_rotation` when relevant """ self.time = time t = self.time2t(time) motion_result = self.motion(t, full_output=True) if self.motion_type == "rotation": self.x_A_t = motion_result[0] self.theta = motion_result[1] self.R_rotation = motion_result[2] else: self.x_A_t = motion_result current_proj_array = self.get_projected_array() self.uv = self.bl_mat.dot(current_proj_array[:,:2])
# def build_observing_sequence(self): # pass
[docs] def get_position(self, target, time, grid_times_targets=False): """ Computes the position of the collectors as a function of time **Arguments** : * target : Ignored * time : The time of the observations (astropy.Time) such as found in `simulator.sequence` * grid_times_target : Ignored """ dummy_taraltaz = self.dummy_location.altaz(time, target=target, grid_times_targets=grid_times_targets) t = time.to_value("unix") - self.t_0 x_A_t = self.motion(t) aPA = np.arctan2(x_A_t[0,1], x_A_t[0,0]) return dummy_taraltaz, aPA
[docs] def get_projected_array(self, taraltaz=None, time=None, PA=None, loc_array=None, c3d=False,): """ Returns the projected collector array **Arguments : ** * tarltaz : Ignored * time : (optional) A given time at which to point * PA : (Optional) Ignored * loc_array : (Optional) Overrides the current array configuration from pointing to use this one instead. * c3d : (optional) When True, returns the array as a set of 3D vector. When False (default) 2D vectors are given """ if time is not None: self.point(time) if loc_array is None: loc_array = self.x_A_t x_P = np.einsum("o i , a i -> a o", self.P_M, loc_array) if c3d: return x_P else: return x_P[:,:2]
[docs] def get_projected_geometric_pistons(self): """ Computes the distance traveled past the reference plane: $P_A - A + AM$ """ P_A = self.get_projected_array(c3d=True) P_A_A = self.x_A_t - P_A norm_PAA = np.linalg.norm(P_A_A, axis=1) M_A = self.x_M - self.x_A_t norm_M_A = np.linalg.norm(M_A, axis=1) optical_path = (norm_M_A + norm_PAA)[:,None] assert optical_path.shape == (self.n_tel, 1), f"shape of optical path {norm_M_A.shape}" return optical_path
[docs] def basic_interpolation(t,): pass
[docs] def basic_z_rotation(theta): """ basic rotation matrix around z **Returns**: The rotation matrix. """ M_R = np.array([[np.cos(theta), np.sin(theta), 0,], [-np.sin(theta), np.cos(theta),0,], [0, 0, 1]]) return M_R
[docs] def test_observatory(tarname="Tau cet", startend=["2020-10-20T00:00:00", "2020-10-20T10:00:00"], nblocs=20): """ Runs some test for the core functions of observatory **Parameters:** * tarname : The name of the target to use * startend : A list or tuple of two time strings inspressed in UTC matching the format: "2020-10-20T00:00:00" * nblocs : The number of observing blocs to compute """ from kernuller import VLTI from . import plot_tools as pt testobs = observatory(VLTI) tarnames = [tarname] targets = [astroplan.FixedTarget.from_name(tar) for tar in tarnames] mytimes = startend obstimes = testobs.build_observing_sequence(times=mytimes, remove_daytime=False, npoints=nblocs) target_positions, PAs = testobs.get_position(targets[0], obstimes, grid_times_targets=True) print(target_positions) testobs.verbose=True print("PAs",PAs) #thearrays = [testobs.get_projected_array(aposition) for aposition in target_positions[0,:]] thearrays = [] for aposition, aPA in zip(target_positions[0,:], PAs[0,:]): logit.warning("aPA") logit.warning(aPA) projarray = testobs.get_projected_array(aposition, PA=aPA) thearrays.append(projarray) thearrays = np.array(thearrays) thepistons = [testobs.get_projected_geometric_pistons(aposition) for aposition in target_positions[0,:]] thepistons = np.array(thepistons) perspective = True for seqindex in range(20): fig = pt.plot_pupil(thearrays[seqindex], thepistons=thepistons[seqindex])