Source code for pyuvsim.telescope

# -*- mode: python; coding: utf-8 -*
# Copyright (c) 2018 Radio Astronomy Software Group
# Licensed under the 3-clause BSD License
"""Definition of Telescope objects, for metadata common to all antennas in an array."""
from __future__ import annotations

import warnings

import numpy as np
from pyuvdata import UVBeam, parameter

try:
    from . import mpi
except ImportError:
    mpi = None

from .analyticbeam import AnalyticBeam

weird_beamfits_extension_warning = (
    "This beamfits file does not have a '.fits' or '.beamfits' extension, so "
    "UVBeam does not recognize it as a beamfits file. Either change the file "
    "extension or specify the beam_type. This is currently handled with a "
    "try/except clause in pyuvsim, but this will become an error in version 1.4"
)


class BeamConsistencyError(Exception):
    """An Exception class to mark inconsistencies among beams in a BeamList."""

    pass


[docs]class BeamList: """ A container for the set of beam models and related parameters. Each rank in the simulation gets a copy of the set of beam objects used for calculating Jones matrices. Rather than broadcasting the objects themselves, the list is composed of strings which provide either a complete description of an analytic beam or the path to a UVBeam readable file. After the broadcast, the beams are initialized from these strings. This class provides methods to transform between the string and object representations, and also carries parameters used globally by all UVBeams, including their frequency / angular interpolation options. This behaves just as a normal Python list in terms of indexing, appending, etc. Attributes ---------- string_mode : bool Is True if the beams are represented as strings, False if they're objects spline_interp_opts : dict Degrees of the bivariate spline for the angular interpolation (passed directly to numpy.RectBivariateSpline, e.g. use {'kx' : 2, 'ky' : 2} for 2nd degree splines in each direction.) Default is 3rd order in each direction. freq_interp_kind : str The type of frequency interpolation, anything accepted by scipy.interpolate.interp1d. Default is "cubic". Parameters ---------- beam_list : list (optional) A list of UVBeam/AnalyticBeam objects or strings describing beams. If beam_list consists of objects, then the BeamList will be initialized in object mode with string_mode == False. If beam_list consists of strings, the BeamList will be initialized in string mode. Passing in a mixture of strings and objects will error. uvb_read_kwargs : dict (optional) A nested dictionary that can contain parameters to pass to the UVBeam read method for each UVBeam in the beam dict. The top-level keys are beam_id integers, the value for each beam id is a dict with the kwargs specified for each beam. This can include ``file_type`` or any other parameter accepted by the UVBeam.read method. Note that this can in principal overlap with entries in select_params. Entries in uvb_read_kwargs will supercede anything that is also in select_params. select_params : dict (optional) A dictionary that can contain parameters for selecting parts of the beam to read. Example keys include ``freq_range`` and ``za_range``. Note that these will only be used for UVBeam readable files and they apply to all such beams unless they are superceded by uvb_read_kwargs. check : bool Whether to perform a consistency check on the beams (i.e. asserting that several of their defining parameters are the same for all beams in the list). force_check : bool The consistency check is only possible for object-beams (not string mode). If `force_check` is True, it will convert beams to object-mode to force the check to run (then convert back to string mode). Raises ------ ValueError For an invalid beam_list (mix of strings and objects). Notes ----- If an object beam is added while in string mode, it will be converted to a string before being inserted, and vice versa. ***In order to be converted to a string, a UVBeam needs to have the entry 'beam_path' in its extra_keywords, giving the path to the beam file it came from.*** """ _float_params = { "sig": "sigma", "diam": "diameter", "reff": "ref_freq", "ind": "spectral_index", } string_mode = True def __init__( self, beam_list=None, uvb_read_kwargs: dict[str : tuple[float, float]] | None = None, select_params: dict[str : tuple[float, float]] | None = None, spline_interp_opts: dict[str:int] | None = None, freq_interp_kind: str = "cubic", check: bool = True, force_check: bool = False, ): self.spline_interp_opts = spline_interp_opts self.freq_interp_kind = freq_interp_kind self._str_beam_list = [] self._obj_beam_list = [] if beam_list is not None: if all(isinstance(b, str) for b in beam_list): self._str_beam_list[:] = beam_list[:] self.string_mode = True elif all(not isinstance(b, str) for b in beam_list): self._obj_beam_list[:] = beam_list[:] for beam in self._obj_beam_list: # always use future shapes if isinstance(beam, UVBeam) and not beam.future_array_shapes: beam.use_future_array_shapes() self.string_mode = False else: raise ValueError("Invalid beam list: " + str(beam_list)) self.uvb_read_kwargs = uvb_read_kwargs or {} self.select_params = select_params or {} self.is_consistent = False if check: self.check_consistency(force=force_check) def _get_beam_basis_type(self): if self.string_mode: raise ValueError("Cannot get beam basis type from a string-mode BeamList.") beam_basis = {} for index, bm in enumerate(self): if isinstance(bm, UVBeam): if bm.pixel_coordinate_system not in ["az_za", "healpix"]: raise ValueError( "pyuvsim currently only supports UVBeams with 'az_za' or " "'healpix' pixel coordinate systems." ) if ( np.all(bm.basis_vector_array[0, 0] == 1) and np.all(bm.basis_vector_array[0, 1] == 0) and np.all(bm.basis_vector_array[1, 0] == 0) and np.all(bm.basis_vector_array[1, 1] == 1) ): beam_basis[index] = "az_za" else: raise ValueError( "pyuvsim currently only supports beams with basis vectors that" "are aligned with the azimuth and zenith angle in each pixel." "Work is in progress to add other basis vector systems." ) else: # AnalyticBeams all have the az_za basis by construction beam_basis[index] = "az_za" return beam_basis
[docs] def check_consistency(self, force: bool = False): """ Check the consistency of all beams in the list. This checks basic parameters of the objects for consistency, eg. the ``beam_type``. It is meant to be manually called by the user. Parameters ---------- force : bool Whether to force the consistency check even if the object is in string mode. This wil convert to to object mode, perform the check, then convert back. Raises ------ BeamConsistencyError If any beam is inconsistent with the rest of the beams. """ if self.string_mode and len(self._str_beam_list) > 0: if not force: warnings.warn( "Cannot check consistency of a string-mode BeamList! Set force=True" " to force consistency checking." ) return else: self.set_obj_mode(check=True) self.set_str_mode() return if len(self) == 0: return def check_thing(item): if item == "basis": items = self._get_beam_basis_type() else: items = { i: getattr(b, item) for i, b in enumerate(self) if hasattr(b, item) } if not items: # If none of the beams has this item, move on. return min_index = min(items.keys()) for indx, thing in items.items(): if np.any(thing != items[min_index]): raise BeamConsistencyError( f"{item} of beam {indx + 1} is not consistent with beam " f"{min_index + 1}: {thing} vs. {items[min_index]}" ) check_thing("beam_type") check_thing("x_orientation") if self[0].beam_type == "efield": check_thing("Nfeeds") check_thing("feed_array") check_thing("basis") elif self[0].beam_type == "power": check_thing("Npols") check_thing("polarization_array") self.is_consistent = True
[docs] def check_all_azza_beams_full_sky(self): """ Check if all az_za UVBeams cover the full sky. Used to decide whether to turn off checking that the beam covers the interpolation location. """ all_azza_full_sky = True for beam in self: if isinstance(beam, UVBeam) and beam.pixel_coordinate_system == "az_za": # regular gridding is enforced in UVBeam checking, so can use first diff axis1_diff = np.diff(beam.axis1_array)[0] axis2_diff = np.diff(beam.axis2_array)[0] max_axis_diff = np.max([axis1_diff, axis2_diff]) if not ( np.max(beam.axis2_array) >= np.pi / 2.0 - max_axis_diff * 2.0 and np.min(beam.axis1_array) <= max_axis_diff * 2.0 and np.max(beam.axis1_array) >= 2.0 * np.pi - max_axis_diff * 2.0 ): all_azza_full_sky = False break return all_azza_full_sky
@property def x_orientation(self): """ Return the x_orientation of all beams in list (if consistent). Raises ------ BeamConsistencyError If the beams in the list are not consistent with each other. """ if not self.is_consistent: self.check_consistency(force=True) if len(self) == 0: return None if self.string_mode: # Can't get xorientation from beams in string mode. # Convert to obj mode, get xorient, and then convert back. self.set_obj_mode() xorient = self.x_orientation self.set_str_mode() return xorient for b in self: if hasattr(b, "x_orientation"): xorient = b.x_orientation break else: # None of the constituent beams has defined x_orientation. # Here we return None, instead of raising an attribute error, to maintain # backwards compatibility (pyuvsim access the beamlist.x_orientation without # checking for its existence). return None if xorient is None: warnings.warn( "All polarized beams have x_orientation set to None. This will make it " "hard to interpret the polarizations of the simulated visibilities." ) return xorient @property def beam_type(self): """ Return the beam_type of all beams in list (if consistent). Raises ------ BeamConsistencyError If the beams in the list are not consistent with each other. """ if not self.is_consistent: self.check_consistency(force=True) if len(self) == 0: return None return getattr(self._obj_beam_list[0], "beam_type", None) def __len__(self): """Define the length of a BeamList.""" # Note that only one of these lists has nonzero length at a given time. return len(self._obj_beam_list) + len(self._str_beam_list) def __iter__(self): """Get the list as an iterable.""" lst = self._str_beam_list if self.string_mode else self._obj_beam_list return iter(lst) def __getitem__(self, ind): """Get a particular beam from the list.""" if self.string_mode: return self._str_beam_list[ind] return self._obj_beam_list[ind] def __setitem__(self, ind, value, uvb_read_kwargs=None): """ Insert into the beam list. Converts objects to strings if in object mode, or vice versa if in string mode. """ if self.string_mode: self._str_beam_list[ind] = self._obj_to_str(value) else: if uvb_read_kwargs is not None: self.uvb_read_kwargs[ind] = uvb_read_kwargs try: value = self._str_to_obj(ind, value) except FileNotFoundError as err: raise ValueError(f"Invalid file path: {value}") from err self._obj_beam_list[ind] = value def __eq__(self, other): """Define BeamList equality.""" if self.string_mode: return self._str_beam_list == other._str_beam_list return self._obj_beam_list == other._obj_beam_list
[docs] def append(self, value, uvb_read_kwargs=None): """ Append to the beam list. Converts objects to strings if in object mode, or vice versa if in string mode. """ if self.string_mode: self._str_beam_list.append("") else: self._obj_beam_list.append("") try: self.__setitem__(-1, value, uvb_read_kwargs=uvb_read_kwargs) except ValueError as err: if self.string_mode: # This line is hard to cover with a test. I've verified that I # can get here, but I when I wrap it in pytest.raises it stops # on the earlier error. self._str_beam_list.pop() # pragma: no cover else: self._obj_beam_list.pop() raise err
def _str_to_obj(self, beam_id, beam_model, use_shared_mem=False): """Convert beam strings to objects.""" if isinstance(beam_model, (AnalyticBeam, UVBeam)): return beam_model if beam_model.startswith("analytic"): bspl = beam_model.split("_") model = bspl[1] to_set = {} for extra in bspl[2:]: par, val = extra.split("=") full = self._float_params[par] to_set[full] = float(val) return AnalyticBeam(model, **to_set) if use_shared_mem and mpi is None: raise ImportError( "You need mpi4py to use shared memory. Either call this method with " "use_shared_mem=False or install mpi4py. You can install it by running " "pip install pyuvsim[sim] or pip install pyuvsim[all] if you also want " "the line_profiler installed." ) path = beam_model # beam_model = path to UVBeam readable file uvb = UVBeam() read_kwargs = {**self.select_params, **self.uvb_read_kwargs.get(beam_id, {})} # always use future shapes read_kwargs["use_future_array_shapes"] = True if ( (use_shared_mem and mpi.world_comm is not None and mpi.rank == 0) or not use_shared_mem or mpi is None or mpi.world_comm is None ): try: uvb.read(path, **read_kwargs) except ValueError: # If file type is not recognized, assume beamfits, # which was originally the only option. uvb.read_beamfits(path, **read_kwargs) warnings.warn(weird_beamfits_extension_warning, DeprecationWarning) uvb.peak_normalize() if use_shared_mem and (mpi.world_comm is not None): for key, attr in uvb.__dict__.items(): if not isinstance(attr, parameter.UVParameter): continue if key == "_data_array": uvb.__dict__[key].value = mpi.shared_mem_bcast(attr.value, root=0) else: uvb.__dict__[key].value = mpi.world_comm.bcast(attr.value, root=0) mpi.world_comm.Barrier() uvb.extra_keywords["beam_path"] = path return uvb def _obj_to_str(self, beam_model): """Convert beam objects to strings that may generate them.""" if isinstance(beam_model, str): return beam_model if isinstance(beam_model, AnalyticBeam): btype = beam_model.type bm_str = "analytic_" + btype for abbrv, full in self._float_params.items(): val = getattr(beam_model, full) if val is not None: bm_str += "_" + abbrv + "=" + str(val) return bm_str # If not AnalyticBeam, it's UVBeam. try: path = beam_model.extra_keywords["beam_path"] except KeyError: raise ValueError( "Need to set 'beam_path' key in extra_keywords for UVBeam objects." ) return path
[docs] def set_str_mode(self): """ Convert object beams to their strings representations. Convert AnalyticBeam objects to a string representation that can be used to generate them. Replaces UVBeam objects with the path to the UVBeam readable file they originate from. Any UVBeams in the list must have the path to their beam files stored as 'beam_path' in the `extra_keywords` dictionary. Sets :attr:`~.string_mode` to True. Raises ------ ValueError: If UVBeam objects in the list have inconsistent UVBeam parameters. ValueError: If any UVBeam objects lack a "beam_path" keyword in the "extra_keywords" attribute, specifying the path to the UVBeam readable file that generates it. """ if self._obj_beam_list != []: # Convert object beams to string definitions self._str_beam_list = [ self._obj_to_str(bobj) for bobj in self._obj_beam_list ] self._obj_beam_list = [] self.string_mode = True
[docs] def set_obj_mode(self, use_shared_mem: bool = False, check: bool = False): """ Initialize AnalyticBeam and UVBeam objects from string representations. Sets :attr:`~.string_mode` to False. Parameters ---------- use_shared_mem : bool Whether to use shared memory for the beam data. check : bool Whether to perform consistency checks on all the beams in the list after conversion to object mode. """ if self._str_beam_list != []: beam_list = [] for bid, bstr in enumerate(self._str_beam_list): beam_list.append( self._str_to_obj(bid, bstr, use_shared_mem=use_shared_mem) ) self._obj_beam_list = beam_list self._str_beam_list = [] self.string_mode = False if check: self.check_consistency()
[docs]class Telescope: """ Container for data common to all antennas in the array. Defines the location and name of the observing site, and holds the list of beam objects used by the array. Parameters ---------- telescope_name : str Name of the telescope. telescope_location : :class:`astropy.coordinates.EarthLocation`, :class:`lunarsky.MoonLocation` Location of the telescope. beam_list : :class:`pyuvsim.BeamList` BeamList carrying beam models. """ def __init__(self, telescope_name, telescope_location, beam_list): # telescope location (EarthLocation object) self.location = telescope_location self.name = telescope_name # list of UVBeam objects, length of number of unique beams self.beam_list = beam_list def __eq__(self, other): """Define Telescope equality.""" this_vector_loc = np.array( [ self.location.x.to("m").value, self.location.y.to("m").value, self.location.z.to("m").value, ] ) other_vector_loc = np.array( [ other.location.x.to("m").value, other.location.y.to("m").value, other.location.z.to("m").value, ] ) Nbeams_self = len(self.beam_list) Nbeams_other = len(other.beam_list) if Nbeams_self == Nbeams_other: return ( np.allclose(this_vector_loc, other_vector_loc, atol=1e-3) and np.all( [ self.beam_list[bi] == other.beam_list[bi] for bi in range(Nbeams_self) ] ) and self.name == other.name ) return False