# Copyright (c) 2018 Radio Astronomy Software Group
# Licensed under the 3-clause BSD License
"""
Functions for setting up simulations based on input config files.
This provides a configuration interface of human-readable yaml and csv files to specify
all the required simulation parameters. This configuration interface is used by other
simulators as well.
This module contains methods to create configuration files from :class:`pyuvdata.UVData`
objects and empty :class:`pyuvdata.UVData` objects from configuration files.
"""
import ast
import contextlib
import copy
import logging
import os
import shutil
import warnings
import astropy.units as units
import numpy as np
import numpy.typing as npt
import yaml
from astropy.coordinates import (
ICRS,
AltAz,
Angle,
EarthLocation,
Latitude,
Longitude,
SkyCoord,
)
from astropy.time import Time
from astropy.utils.data import cache_contents, is_url_in_cache
from pyradiosky import SkyModel
from pyuvdata import (
AiryBeam,
GaussianBeam,
ShortDipoleBeam,
Telescope,
UniformBeam,
UVBeam,
UVData,
utils as uvutils,
)
from pyuvdata.analytic_beam import AnalyticBeam
from pyuvsim.data import DATA_PATH as SIM_DATA_PATH
from .telescope import BeamList
try:
import astropy_healpix
except ImportError:
astropy_healpix = None
try:
import analytic_diffuse
except ImportError:
analytic_diffuse = None
try:
from . import mpi
from .mpi import get_rank
except ImportError:
def get_rank():
"""Mock to prevent import errors."""
return 0
mpi = None
from .utils import check_file_exists_and_increment
__all__ = [
"create_mock_catalog",
"SkyModelData",
"initialize_catalog_from_params",
"initialize_uvdata_from_params",
"initialize_uvdata_from_keywords",
"uvdata_to_telescope_config",
"uvdata_to_config_file",
]
logger = logging.getLogger(__name__)
# this dict can go away in version 1.5 when we require the !AnalyticBeam
# tag in yaml files to specify analytic beams
analytic_beams = {
"airy": AiryBeam,
"gaussian": GaussianBeam,
"short_dipole": ShortDipoleBeam,
"uniform": UniformBeam,
}
def _parse_layout_csv(layout_csv):
"""
Interpret the layout csv file.
Columns in the file provide, in order from left to right, the antenna name, antenna number,
a beam ID number, and the antenna positions relative to the telescope location in
east, north, up (ENU) in meters.
See https://pyuvsim.readthedocs.io/en/latest/parameter_files.html for more details.
Parameters
----------
layout_csv : str
Filename of a layout csv.
"""
with open(layout_csv) as fhandle:
header = fhandle.readline()
header = [h.strip() for h in header.split()]
str_format_code = "U"
# get data types for each column
dtypes = {
"name": str_format_code + "10",
"number": "i4",
"beamid": "i4",
"e": "f8",
"n": "f8",
"u": "f8",
}
# check columns in file
lower_header = [col.lower() for col in header]
columns = ["name", "number", "beamid", "e", "n", "u"]
col_exist = [col for col in columns if col in lower_header]
dt = np.rec.format_parser([dtypes[col] for col in col_exist], col_exist, header)
return np.genfromtxt(layout_csv, autostrip=True, skip_header=1, dtype=dt.dtype)
def _write_layout_csv(
filepath, antpos_enu, antenna_names, antenna_numbers, beam_ids=None
):
"""
Write a layout csv file.
Columns in the file provide, in order from left to right, the antenna name, antenna number,
a beam ID number, and the antenna positions relative to the telescope location in
east, north, up (ENU) in meters.
See https://pyuvsim.readthedocs.io/en/latest/parameter_files.html for more details.
Parameters
----------
filepath : str
Filename to save the file to.
antpos_enu : array_like of float
East, North, Up positions of antennas in meters relative to the telescope location.
antenna_names : array_like of str
Names of antennas.
antenna_numbers : array_like of int
Antenna numbers.
beam_ids : array_like of int
Beam ID numbers to associate each antenna with a beam from a BeamList object.
Defaults to all zeros (all antennas have the same beam) if nothing is passed in.
"""
col_width = max(len(name) for name in antenna_names)
header = ("{:" + str(col_width) + "} {:8} {:8} {:10} {:10} {:10}\n").format(
"Name", "Number", "BeamID", "E", "N", "U"
)
if beam_ids is None:
beam_ids = np.zeros(len(antenna_names), dtype=int)
with open(filepath, "w") as lfile:
lfile.write(header + "\n")
for i, (e, n, u) in enumerate(antpos_enu):
beam_id = beam_ids[i]
name = antenna_names[i]
num = antenna_numbers[i]
line = ("{:{}} {:8d} {:8d} {:10.4f} {:10.4f} {:10.4f}\n").format(
name, col_width, num, beam_id, e, n, u
)
lfile.write(line)
def _config_str_to_dict(config_str):
"""
Read a yaml file into a dictionary, also add the file path info to the dictionary.
Parameters
----------
config_str : str
Filename of a configuration yaml file to read.
"""
with open(config_str) as pfile:
param_dict = yaml.safe_load(pfile)
config_str = os.path.abspath(config_str)
param_dict["config_path"] = os.path.dirname(config_str)
param_dict["obs_param_file"] = os.path.basename(config_str)
return param_dict
def _create_catalog_diffuse(
map_nside, diffuse_model, diffuse_params, time, array_location
):
"""
Make a :class:`pyradiosky.SkyModel` object from an analog diffuse map function.
Only used for internal testing, should not be called by users.
Parameters
----------
map_nside : int
HEALPix nside of desired map.
diffuse_model : str
Name of diffuse model.
diffuse_params : dict
Dictionary of parameters corresponding to the `diffuse_model`.
time : :class:`astropy.time.Time` object
Time to use in creating map (to assign RA/Decs based on Alt/Az).
array_location : :class:`astropy.coordinates.EarthLocation` or :class:`lunarsky.MoonLocation`
Location to use in creating map (to assign RA/Decs based on Alt/Az).
Returns
-------
catalog : class:`pyradiosky.SkyModel`
SkyModel object containing the diffuse map.
icrs_coord : :class:`astropy.coordinates.SkyCoord`
Astropy SkyCoord object containing the coordinates for the pixels in ICRS.
alts : array_like of float
Altitudes of the pixels in radians.
azs : array_like of float
Azimuths of the pixels in radians.
fluxes : array_like of float
Brightness temperature of the pixels in K.
"""
# Make the map, calculate AltAz positions of pixels, evaluate the model.
npix = 12 * map_nside**2
pixinds = np.arange(npix)
hpix = astropy_healpix.HEALPix(nside=map_nside, frame=ICRS())
icrs_coord = hpix.healpix_to_skycoord(pixinds)
use_altaz = True
if not isinstance(array_location, EarthLocation):
with contextlib.suppress(ImportError):
from lunarsky import LunarTopo, MoonLocation, SkyCoord as LunarSkyCoord
if isinstance(array_location, MoonLocation):
localframe = LunarTopo(location=array_location, obstime=time)
icrs_coord = LunarSkyCoord(icrs_coord)
use_altaz = False
if use_altaz:
localframe = AltAz(location=array_location, obstime=time)
source_coord = icrs_coord.transform_to(localframe)
alts = source_coord.alt.rad
azs = source_coord.az.rad
modfunc = analytic_diffuse.get_model(diffuse_model)
fluxes = modfunc(source_coord.az.rad, source_coord.zen.rad, **diffuse_params)
stokes = np.zeros((4, 1, npix))
stokes[0, :] = fluxes
stokes *= units.K
catalog = SkyModel(
stokes=stokes,
nside=map_nside,
hpx_inds=pixinds,
spectral_type="flat",
frame="icrs",
)
return catalog, icrs_coord, alts, azs, fluxes
def _create_catalog_discrete(Nsrcs, alts, azs, fluxes, time, array_location):
"""
Make a :class:`pyradiosky.SkyModel` object from a set of point sources.
Only used for internal testing, should not be called by users.
Parameters
----------
Nsrcs : int
Number of sources
alts : array_like of float
Altitudes or Latitudes (depending on array_location) of the sources in radians.
azs : array_like of float
Azimuths or Longitues (depending on array_location) of the sources in radians.
fluxes : array_like of float
Brightnesses of the sources in Jy.
time : :class:`astropy.time.Time` object
Time to use in defining the positions.
array_location : :class:`astropy.coordinates.EarthLocation` or :class:`lunarsky.MoonLocation`
Location to use in defining the positions.
Returns
-------
catalog : class:`pyradiosky.SkyModel`
SkyModel object containing the diffuse map.
icrs_coord : :class:`astropy.coordinates.SkyCoord`
Astropy SkyCoord object containing the coordinates for the sources in ICRS.
"""
localframe = AltAz
coord_class = SkyCoord
if not isinstance(array_location, EarthLocation):
with contextlib.suppress(ImportError):
from lunarsky import LunarTopo, MoonLocation, SkyCoord as LunarSkyCoord
if isinstance(array_location, MoonLocation):
localframe = LunarTopo
coord_class = LunarSkyCoord
source_coord = coord_class(
alt=Angle(alts, unit=units.deg),
az=Angle(azs, unit=units.deg),
obstime=time,
frame=localframe,
location=array_location,
)
icrs_coord = source_coord.transform_to("icrs")
names = np.array(["src" + str(si) for si in range(Nsrcs)])
stokes = np.zeros((4, 1, Nsrcs))
stokes[0, :] = fluxes
stokes *= units.Jy
catalog = SkyModel(
name=names, skycoord=icrs_coord, stokes=stokes, spectral_type="flat"
)
return catalog, icrs_coord
[docs]def create_mock_catalog(
time,
arrangement="zenith",
array_location=None,
Nsrcs=None,
alt=None,
save=False,
min_alt=None,
rseed=None,
return_data=False,
diffuse_model=None,
diffuse_params=None,
map_nside=None,
):
"""
Create a mock catalog.
SkyModels are defined in an AltAz frame at the given time, then returned in
ICRS ra/dec coordinates. They always have a 'flat' spectral type.
Parameters
----------
time: float or :class:`astropy.time.Time` object
Julian date, if a float it is interpreted as a JD.
arrangement: str
Point source pattern (default = 1 source at zenith).
Accepted arrangements:
* `triangle`: Three point sources forming a triangle around the zenith
* `cross`: An asymmetric cross
* `zenith`: Some number of sources placed at the zenith.
* `off-zenith`: A single source off zenith
* `long-line`: Horizon to horizon line of point sources
* `hera_text`: Spell out HERA around the zenith
* `random`: Randomly distributed point sources near zenith
* `diffuse`: Choice of solved models in the analytic_diffuse module.
Nsrcs: int
Number of sources to make (used for zenith, off-zenith, long-line, and random arrangements).
array_location: :class:`astropy.coordinates.EarthLocation`
Location of observer. Source positions are defined with respect to a particular zenith.
Can also be a :class:`lunarsky.MoonLocation` object to make catalogs for
observers on the Moon.
Default = HERA site
alt: float
For off-zenith and triangle arrangements, altitude to place sources.
In degrees.
min_alt: float
For random and long-line arrangements, minimum altitude at which to place sources.
In degrees.
save: bool
Save mock catalog as npz file.
rseed: int
If using the random configuration, pass in a RandomState seed.
return_data: bool
If True, return a :class:`~.SkyModelData` object instead of
:class:`pyradiosky.SkyModel`.
diffuse_model: str
If arrangement is 'diffuse', name of the diffuse model to generate.
See documentation in `analytic_diffuse.models` for details.
diffuse_params: dict
If arrangement is 'diffuse', extra parameters accepted by the chosen model.
See documentation in `analytic_diffuse.models` for details.
map_nside: int
If arrangement is 'diffuse', nside parameter for the healpix map to make.
Returns
-------
:class:`pyradiosky.SkyModel` or :class:`~.SkyModelData`
The catalog, as either a SkyModel or a SkyModelData (if `return_data` is True)
dict
A dictionary of keywords used to define the catalog.
Notes
-----
Generating diffuse models requires the `analytic_diffuse` and `astropy_healpix` modules.
"""
if isinstance(time, float):
time = Time(time, scale="utc", format="jd")
if array_location is None:
array_location = EarthLocation(
lat="-30d43m17.5s", lon="21d25m41.9s", height=1073.0
)
if arrangement not in [
"off-zenith",
"zenith",
"cross",
"triangle",
"long-line",
"hera_text",
"random",
"diffuse",
]:
raise KeyError("Invalid mock catalog arrangement: " + str(arrangement))
mock_keywords = {
"time": time.jd,
"arrangement": arrangement,
"array_location": repr(
(
float(array_location.lat.deg),
float(array_location.lon.deg),
float(array_location.height.value),
)
),
}
mock_keywords["world"] = "earth"
if not isinstance(array_location, EarthLocation):
with contextlib.suppress(ImportError):
from lunarsky import MoonLocation
if isinstance(array_location, MoonLocation):
mock_keywords["world"] = "moon"
if arrangement == "off-zenith":
if alt is None:
alt = 85.0 # Degrees
mock_keywords["alt"] = alt
Nsrcs = 1
alts = [alt]
azs = [90.0] # 0 = North pole, 90. = East pole
fluxes = [1.0]
if arrangement == "triangle":
Nsrcs = 3
if alt is None:
alt = 87.0 # Degrees
mock_keywords["alt"] = alt
alts = [alt, alt, alt]
azs = [0.0, 120.0, 240.0]
fluxes = [1.0, 1.0, 1.0]
if arrangement == "cross":
Nsrcs = 4
alts = [88.0, 90.0, 86.0, 82.0]
azs = [270.0, 0.0, 90.0, 135.0]
fluxes = [5.0, 4.0, 1.0, 2.0]
if arrangement == "zenith":
if Nsrcs is None:
Nsrcs = 1
mock_keywords["Nsrcs"] = Nsrcs
alts = np.ones(Nsrcs) * 90.0
azs = np.zeros(Nsrcs, dtype=float)
fluxes = np.ones(Nsrcs) * 1 / Nsrcs
# Divide total Stokes I intensity among all sources
# Test file has Stokes I = 1 Jy
if arrangement == "random":
if Nsrcs is None:
Nsrcs = 1
if min_alt is None:
min_alt = 30 # Degrees
mock_keywords["Nsrcs"] = Nsrcs
np.random.seed(seed=rseed)
mock_keywords["rseed"] = np.random.get_state()[1][0]
# Necessary to get uniform distribution per solid angle.
min_alt_rad = np.radians(min_alt)
rv = np.random.uniform(-1, np.cos(min_alt_rad + np.pi / 2), Nsrcs)
alts = np.degrees(np.arccos(rv) - np.pi / 2)
azs = np.degrees(np.random.uniform(0, 2 * np.pi, Nsrcs))
fluxes = np.ones(Nsrcs, dtype=float)
if arrangement == "long-line":
if Nsrcs is None:
Nsrcs = 10
if min_alt is None:
min_alt = 5
mock_keywords["Nsrcs"] = Nsrcs
mock_keywords["alt"] = alt
fluxes = np.ones(Nsrcs, dtype=float)
if Nsrcs % 2 == 0:
length = 180 - min_alt * 2
spacing = length / (Nsrcs - 1)
max_alt = 90.0 - spacing / 2
alts = np.linspace(min_alt, max_alt, Nsrcs // 2)
alts = np.append(alts, np.flip(alts, axis=0))
azs = np.append(
np.zeros(Nsrcs // 2, dtype=float) + 180.0,
np.zeros(Nsrcs // 2, dtype=float),
)
else:
alts = np.linspace(min_alt, 90, (Nsrcs + 1) // 2)
alts = np.append(alts, np.flip(alts[1:], axis=0))
azs = np.append(
np.zeros((Nsrcs + 1) // 2, dtype=float) + 180.0,
np.zeros((Nsrcs - 1) // 2, dtype=float),
)
if arrangement == "hera_text":
# fmt: off
azs = np.array([-254.055, -248.199, -236.310, -225.000, -206.565,
-153.435, -123.690, -111.801, -105.945, -261.870,
-258.690, -251.565, -135.000, -116.565, -101.310,
-98.130, 90.000, 90.000, 90.000, 90.000, 90.000,
-90.000, -90.000, -90.000, -90.000, -90.000,
-90.000, 81.870, 78.690, 71.565, -45.000, -71.565,
-78.690, -81.870, 74.055, 68.199, 56.310, 45.000,
26.565, -26.565, -45.000, -56.310, -71.565])
zas = np.array([7.280, 5.385, 3.606, 2.828, 2.236, 2.236, 3.606,
5.385, 7.280, 7.071, 5.099, 3.162, 1.414, 2.236,
5.099, 7.071, 7.000, 6.000, 5.000, 3.000, 2.000,
1.000, 2.000, 3.000, 5.000, 6.000, 7.000, 7.071,
5.099, 3.162, 1.414, 3.162, 5.099, 7.071, 7.280,
5.385, 3.606, 2.828, 2.236, 2.236, 2.828, 3.606, 6.325])
# fmt: on
alts = 90.0 - zas
Nsrcs = alts.size
fluxes = np.ones_like(alts)
if arrangement == "diffuse":
if analytic_diffuse is None or astropy_healpix is None:
raise ValueError(
"analytic_diffuse and astropy_healpix "
"modules are required to use diffuse mock catalog."
)
if diffuse_model is None:
raise ValueError(
"Diffuse arrangement selected, but diffuse model not chosen."
)
if diffuse_params is None:
diffuse_params = {}
if map_nside is None:
warnings.warn("No nside chosen. Defaulting to 32")
map_nside = 32
mock_keywords["diffuse_model"] = diffuse_model
mock_keywords["map_nside"] = map_nside
mock_keywords["diffuse_params"] = repr(diffuse_params)
catalog, icrs_coord, alts, azs, fluxes = _create_catalog_diffuse(
map_nside, diffuse_model, diffuse_params, time, array_location
)
else:
catalog, icrs_coord = _create_catalog_discrete(
Nsrcs, alts, azs, fluxes, time, array_location
)
if return_data:
catalog = SkyModelData(catalog)
if get_rank() == 0 and save:
np.savez(
"mock_catalog_" + arrangement,
ra=icrs_coord.ra.rad,
dec=icrs_coord.dec.rad,
alts=alts,
azs=azs,
fluxes=fluxes,
)
return catalog, mock_keywords
[docs]class SkyModelData:
"""
Carries immutable SkyModel data in simple ndarrays.
This is to facilitate sharing SkyModel objects in MPI, without
excessive copying and memory bloat.
When used with MPI, this can be initialized simultaneously on all
processes such that sky_in is provided only on the root process.
The data can then be shared across all processes by running the `share`
method.
Parameters
----------
sky_in: :class:`pyradiosky.SkyModel`
A valid SkyModel object.
filename : str or list of str, optional
The filename (or other string identifier) of the input catalog. This overrides
the filename set on the sky_in object (if it has one). If not set, this defaults
to the filename set on the sky_in object (if it has one)
"""
filename = None
pyradiosky_version_str = None
Ncomponents = None
component_type = None
spectral_type = None
ra = None
dec = None
name = None
Nfreqs = None
stokes_I = None # noqa should be all lower case
stokes_Q = None # noqa should be all lower case
stokes_U = None # noqa should be all lower case
stokes_V = None # noqa should be all lower case
freq_array = None
freq_edge_array = None
reference_frequency = None
spectral_index = None
polarized = None
nside = None
hpx_inds = None
flux_unit = None
put_in_shared = [
"stokes_I",
"stokes_Q",
"stokes_U",
"stokes_V",
"polarized",
"ra",
"dec",
"reference_frequency",
"spectral_index",
"hpx_inds",
]
def __init__(self, sky_in=None, filename=None):
# Collect relevant attributes.
if sky_in is not None:
if sky_in.name is not None:
self.name = np.asarray(sky_in.name)
else:
self.name = None
self.nside = sky_in.nside
self.hpx_inds = sky_in.hpx_inds
self.component_type = sky_in.component_type
self.spectral_type = sky_in.spectral_type
self.Ncomponents = sky_in.Ncomponents
if sky_in.frame != "icrs":
sky_in.transform_to(ICRS)
sky_ra, sky_dec = sky_in.get_lon_lat()
self.ra = sky_ra.deg
self.dec = sky_dec.deg
self.Nfreqs = sky_in.Nfreqs
stokes_in = sky_in.stokes
if isinstance(stokes_in, units.Quantity):
self.flux_unit = stokes_in.unit.to_string()
stokes_in = stokes_in.value
self.stokes_I = stokes_in[0, ...]
if sky_in._n_polarized > 0:
self.polarized = sky_in._polarized
Q, U, V = stokes_in[1:, :, sky_in._polarized]
self.stokes_Q = Q
self.stokes_U = U
self.stokes_V = V
if sky_in._freq_array.required:
self.freq_array = sky_in.freq_array.to("Hz").value
if sky_in.spectral_type == "subband" and sky_in.freq_edge_array is not None:
self.freq_edge_array = sky_in.freq_edge_array.to("Hz").value
if sky_in._reference_frequency.required:
self.reference_frequency = sky_in.reference_frequency.to("Hz").value
if sky_in._spectral_index.required:
self.spectral_index = sky_in.spectral_index
self.pyradiosky_version_str = sky_in.pyradiosky_version_str
if filename is not None:
if isinstance(filename, str):
filename_use = [filename]
else:
filename_use = filename
self.filename = filename_use
elif sky_in is not None and sky_in.filename is not None:
self.filename = sky_in.filename
[docs] def subselect(self, inds):
"""
Subselect, returning a new SkyModelData object.
Parameters
----------
inds: range or index array
Indices to select along the component axis.
Returns
-------
SkyModelData
A new SkyModelData with Ncomp axes downselected.
Notes
-----
If inds is a range object, this method will avoid copying data in numpy arrays,
such that the returned SkyModelData object carries views into the current object's arrays.
"""
new_sky = SkyModelData()
new_sky.Ncomponents = len(inds)
new_sky.nside = self.nside
new_sky.component_type = self.component_type
if self.name is not None:
new_sky.name = self.name[inds]
if isinstance(inds, range):
new_sky.stokes_I = self.stokes_I[:, slice(inds.start, inds.stop, inds.step)]
else:
new_sky.stokes_I = self.stokes_I[:, inds]
new_sky.ra = self.ra[inds]
new_sky.dec = self.dec[inds]
new_sky.Nfreqs = self.Nfreqs
new_sky.spectral_type = self.spectral_type
new_sky.flux_unit = self.flux_unit
if self.reference_frequency is not None:
new_sky.reference_frequency = self.reference_frequency[inds]
if self.spectral_index is not None:
new_sky.spectral_index = self.spectral_index[inds]
if self.freq_array is not None:
new_sky.freq_array = self.freq_array
if self.freq_edge_array is not None:
new_sky.freq_edge_array = self.freq_edge_array
if self.hpx_inds is not None:
new_sky.hpx_inds = self.hpx_inds[inds]
if self.polarized is not None:
sub_inds = np.isin(self.polarized, inds)
new_sky.stokes_Q = self.stokes_Q[..., sub_inds]
new_sky.stokes_U = self.stokes_U[..., sub_inds]
new_sky.stokes_V = self.stokes_V[..., sub_inds]
new_sky.polarized = np.where(np.isin(inds, self.polarized))[0]
return new_sky
[docs] def share(self, root=0):
"""
Share across MPI processes (requires mpi4py to use).
All attributes are put in shared memory.
Parameters
----------
root : int
Root rank on COMM_WORLD, from which data will be broadcast.
"""
if mpi is None:
raise ImportError(
"You need mpi4py to use this method. "
"Install it by running pip install pyuvsim[sim] "
"or pip install pyuvsim[all] if you also want the "
"line_profiler installed."
)
mpi.start_mpi()
self.Ncomponents = mpi.world_comm.bcast(self.Ncomponents, root=root)
# Get list of attributes that are set.
isset = None
if mpi.rank == root:
isset = [key for key, value in self.__dict__.items() if value is not None]
isset = mpi.world_comm.bcast(isset, root=root)
for key in isset:
attr = getattr(self, key)
if key in self.put_in_shared:
val = mpi.shared_mem_bcast(attr, root=root)
else:
val = mpi.world_comm.bcast(attr, root=root)
if val is not None:
setattr(self, key, val)
mpi.world_comm.Barrier()
[docs] def get_skymodel(self, inds=None):
"""
Initialize :class:`pyradiosky.SkyModel` from current settings.
Parameters
----------
inds : range or index array
Indices to select along the component axis.
"""
if inds is not None:
obj = self.subselect(inds)
return obj.get_skymodel()
stokes_use = np.zeros((4, self.Nfreqs, self.Ncomponents), dtype=float)
stokes_use[0, ...] = self.stokes_I
if self.polarized is not None:
stokes_use[1, :, self.polarized] = self.stokes_Q.T
stokes_use[2, :, self.polarized] = self.stokes_U.T
stokes_use[3, :, self.polarized] = self.stokes_V.T
if self.flux_unit is not None:
stokes_use *= units.Unit(self.flux_unit)
other = {}
if self.spectral_type in ["full", "subband"]:
other["freq_array"] = self.freq_array * units.Hz
if self.freq_edge_array is not None:
other["freq_edge_array"] = self.freq_edge_array * units.Hz
if self.spectral_type == "spectral_index":
other["reference_frequency"] = self.reference_frequency * units.Hz
other["spectral_index"] = self.spectral_index
if self.component_type == "healpix":
other["nside"] = self.nside
other["hpx_inds"] = self.hpx_inds
other["frame"] = "icrs"
else:
other["name"] = self.name
other["skycoord"] = SkyCoord(
ra=Longitude(self.ra, unit="deg"),
dec=Latitude(self.dec, unit="deg"),
frame="icrs",
)
other["filename"] = self.filename
return SkyModel(stokes=stokes_use, spectral_type=self.spectral_type, **other)
def _sky_select_calc_rise_set(sky, source_params, telescope_lat_deg=None):
"""
Apply flux and non-rising source cuts, calculate rise and set lsts.
Parameters
----------
sky : pyradiosky.Skymodel
SkyModel object to apply cuts to.
source_params : dict
Dict specifying flux cut and horizon buffer parameters.
telescope_lat_deg : float
Latitude of telescope in degrees, used for horizon calculations.
"""
# do other selections before non-rising to avoid warnings about nans and
# negatives that are filtered in this step in the cut_nonrising step
select_params = {}
if "min_flux" in source_params:
min_brightness = float(source_params["min_flux"]) * units.Jy
select_params["min_brightness"] = min_brightness
if "max_flux" in source_params:
max_brightness = float(source_params["max_flux"]) * units.Jy
select_params["max_brightness"] = max_brightness
for par in ["non_nan", "non_negative"]:
if par in source_params:
select_params[par] = source_params[par]
sky.select(**select_params)
if telescope_lat_deg is not None:
telescope_lat = Latitude(telescope_lat_deg * units.deg)
sky.cut_nonrising(telescope_lat)
calc_kwargs = {}
if "horizon_buffer" in source_params:
calc_kwargs["horizon_buffer"] = float(source_params["horizon_buffer"])
sky.calculate_rise_set_lsts(telescope_lat, **calc_kwargs)
return sky
[docs]def initialize_catalog_from_params(
obs_params, input_uv=None, filetype=None, return_catname=False
):
"""
Make catalog from parameter file specifications.
Default behavior is to do coarse horizon cuts.
Parameters
----------
obs_params: str or dict
Either an obsparam file name or a dictionary of parameters.
input_uv: :class:`pyuvdata.UVData`
Used to set location for mock catalogs and for horizon cuts.
filetype : str
One of ['skyh5', 'gleam', 'vot', 'text', 'fhd'] or None.
If None, the code attempts to guess what the file type is.
return_catname: bool
Return the catalog name. Default is False.
Returns
-------
sky: :class:`pyradiosky.SkyModel`
Source catalog filled with data.
source_list_name: str
Catalog identifier for metadata. Only returned if return_catname is True.
"""
if input_uv is not None and not isinstance(input_uv, UVData):
raise TypeError("input_uv must be UVData object")
if isinstance(obs_params, str):
with open(obs_params) as pfile:
param_dict = yaml.safe_load(pfile)
param_dict["config_path"] = os.path.dirname(obs_params)
else:
param_dict = obs_params
source_params = param_dict["sources"]
if "catalog" in source_params:
catalog = source_params["catalog"]
else:
raise KeyError("No catalog defined.")
if catalog == "mock":
mock_keywords = {"arrangement": source_params["mock_arrangement"]}
extra_mock_kwds = [
"time",
"Nsrcs",
"zen_ang",
"save",
"min_alt",
"array_location",
"diffuse_model",
"diffuse_params",
"map_nside",
]
for k in extra_mock_kwds:
if k in source_params:
if k == "array_location":
# String -- lat, lon, alt in degrees
latlonalt = [float(s.strip()) for s in source_params[k].split(",")]
lat, lon, alt = latlonalt
mock_keywords[k] = EarthLocation.from_geodetic(lon, lat, alt)
else:
mock_keywords[k] = source_params[k]
# time, arrangement, array_location, save, Nsrcs, min_alt
if "array_location" not in mock_keywords:
if input_uv is not None:
mock_keywords["array_location"] = input_uv.telescope.location
else:
warnings.warn(
"No array_location specified. Defaulting to the HERA site."
)
if "time" not in mock_keywords:
if input_uv is not None:
mock_keywords["time"] = input_uv.time_array[0]
warnings.warn(
"No julian date given for mock catalog. Defaulting to first time step."
)
else:
raise ValueError(
"input_uv must be supplied if using mock catalog without specified julian date"
)
time = mock_keywords.pop("time")
sky, mock_keywords = create_mock_catalog(time, **mock_keywords)
mock_keyvals = [str(key) + str(val) for key, val in mock_keywords.items()]
source_list_name = "mock_" + "_".join(mock_keyvals)
elif isinstance(catalog, str):
# if catalog file is not found, first check relative to config_path,
# then check astropy cache treating catalog input as url
if not os.path.isfile(catalog):
# create boolean to determine if cache should be checked
check_cache = True
if "config_path" in param_dict:
relative_path = os.path.join(param_dict["config_path"], catalog)
if os.path.isfile(relative_path):
catalog = relative_path
# if file is found no need to check cache
check_cache = False
if check_cache and is_url_in_cache(catalog, pkgname="pyuvsim"):
catalog = cache_contents("pyuvsim")[catalog]
allowed_read_params = [
"spectral_type",
"table_name",
"id_column",
"ra_column",
"dec_column",
"lon_column",
"lat_column",
"frame",
"flux_columns",
"reference_frequency",
"freq_array",
"spectral_index_column",
]
read_params = {}
if filetype is not None:
read_params["filetype"] = filetype
elif "filetype" in source_params:
read_params["filetype"] = source_params["filetype"]
for param, value in source_params.items():
if param in allowed_read_params:
read_params[param] = value
detect_gleam = filetype == "gleam" or (
os.path.splitext(catalog)[1] == ".vot" and "gleam" in catalog.casefold()
)
if detect_gleam and "spectral_type" not in source_params:
read_params["spectral_type"] = "subband"
source_list_name = os.path.basename(catalog)
# defer the acceptibility check until after selections are done.
sky = SkyModel.from_file(catalog, **read_params, run_check_acceptability=False)
if input_uv is not None:
telescope_lat_deg = input_uv.telescope.location.lat.to("deg").value
else:
telescope_lat_deg = None
# Do source selections, if any.
sky = _sky_select_calc_rise_set(
sky, source_params, telescope_lat_deg=telescope_lat_deg
)
# do the full check after selections to avoid warnings about nans and
# negatives that are removed in the selection
sky.check()
# If the filename parameter is None (e.g. for mock skies)
# add the source_list_name to the object so it can be put in the UVData history later
if sky.filename is None:
sky.filename = [source_list_name]
if return_catname:
return sky, source_list_name
else:
return sky
def _check_uvbeam_file(beam_file):
if not os.path.exists(beam_file):
testdata_beam_file = os.path.join(SIM_DATA_PATH, beam_file)
if os.path.exists(testdata_beam_file):
beam_file = testdata_beam_file
else:
raise ValueError(f"Unrecognized beam file or model: {beam_file}")
return beam_file
def _construct_beam_list(
beam_ids: npt.NDArray[int] | list[float] | tuple[float],
telconfig: dict,
freq_array: npt.NDArray[float],
freq_range: (npt.NDArray[float] | list[float] | tuple[float] | None) = None,
):
"""
Construct the BeamList from the telescope parameter dict.
Parameters
----------
beam_ids : list of int
List of beam ID integers to include from the telconfig. All integers in
this list must be included in the telconfig.
telconfig : dict
Telescope parameters dict, typically parsed from the telescope yaml file.
See pyuvsim documentation for allowable keys.
https://pyuvsim.readthedocs.io/en/latest/parameter_files.html#telescope-configuration
freq_array : array-like of float
Simulation frequency array.
freq_range : array-like of float
Range of frequencies to keep in the beam object, shape (2,). Should
include all frequencies in the simulation with a buffer to support
interpolation.
Returns
-------
BeamList
The constructed beam list object.
"""
beam_list = []
assert isinstance(beam_ids, np.ndarray | list | tuple), (
"beam_ids must be a list, tuple or numpy array."
)
beam_ids = np.asarray(beam_ids, dtype=int)
assert isinstance(telconfig, dict), "telconfig must be a dict."
if freq_range is not None:
assert isinstance(freq_range, np.ndarray | list | tuple), (
"If passed, freq_range must be a list, tuple or numpy array"
)
freq_range = np.asarray(freq_range, dtype=float).squeeze()
if freq_range.size != 2:
raise ValueError("If passed, freq_range have 2 elements")
# possible global shape options
if "diameter" in telconfig or "sigma" in telconfig:
raise ValueError(
"Beam shape options diameter and sigma should be specified per beamID "
"in the 'beam_paths' section not as globals. For examples see the "
"parameter_files documentation."
)
select = telconfig.pop("select", {})
freq_buffer = select.pop("freq_buffer", None)
if freq_range is not None and "freq_range" not in select and freq_buffer is None:
select["freq_range"] = freq_range
if freq_buffer is not None and "freq_range" not in select:
freq_arr_val = freq_array
freq_range = (
freq_arr_val.min() - freq_buffer,
freq_arr_val.max() + freq_buffer,
)
select["freq_range"] = freq_range
for beam_id in beam_ids:
if beam_id not in telconfig["beam_paths"]:
raise ValueError(f"beam_id {beam_id} is not listed in the telconfig.")
beam_model = telconfig["beam_paths"][beam_id]
if not isinstance(beam_model, dict | AnalyticBeam | UVBeam):
raise ValueError(
"Beam model is not properly specified in telescope config file. "
f"It is specified as {beam_model}."
)
if not isinstance(beam_model, AnalyticBeam | UVBeam) and not (
isinstance(beam_model, dict) and "filename" in beam_model
):
warnings.warn(
"Entries in 'beam_paths' should be specified using either the "
"UVBeam or AnalyticBeam constructors or using a dict syntax for "
"UVBeams. For examples see the parameter_files documentation. "
"Specifying analytic beam without the AnalyticBeam constructors "
"will cause an error in version 1.6",
DeprecationWarning,
)
if isinstance(beam_model, AnalyticBeam | UVBeam):
if len(select) > 0 and isinstance(beam_model, UVBeam):
if "freq_range" in select:
freq_range = select.pop("freq_range")
freq_chans = np.nonzero(
(beam_model.freq_array >= freq_range[0])
& (beam_model.freq_array <= freq_range[1])
)[0]
select["freq_chans"] = freq_chans
beam_model.select(**select)
beam_list.append(beam_model)
elif isinstance(beam_model, dict) and "filename" in beam_model:
# this a UVBeam readable file with (possibly) some read kwargs
beam_file = beam_model.pop("filename")
read_kwargs = beam_model
beam_file = _check_uvbeam_file(beam_file)
uvb = UVBeam()
if "freq_range" in select:
read_kwargs["freq_range"] = select["freq_range"]
uvb.read(beam_file, **read_kwargs)
beam_list.append(uvb)
else:
if "type" in beam_model:
beam_type = beam_model["type"]
else:
raise ValueError(
"Beam model must have either a 'filename' field for UVBeam "
"files or a 'type' field for analytic beams."
)
if beam_type not in analytic_beams:
raise ValueError(f"Undefined beam model type: {beam_type}")
this_beam_opts = {}
if isinstance(beam_model, dict):
for key in beam_model:
if key != "type":
this_beam_opts[key] = beam_model[key]
# Gaussian beam requires either diameter or sigma
# Airy beam requires diameter
# Default to None for diameter and sigma.
# Values in the "beam_paths" override globally-defined options.
if beam_type == "gaussian":
shape_opts = {"diameter": None, "sigma": None}
elif beam_type == "airy":
shape_opts = {"diameter": None}
else:
shape_opts = {}
for opt in shape_opts:
shape_opts[opt] = this_beam_opts.get(opt)
ab = analytic_beams[beam_type](**shape_opts)
beam_list.append(ab)
bl_options = {}
if "spline_interp_opts" in telconfig:
bl_options["spline_interp_opts"] = telconfig["spline_interp_opts"]
if "freq_interp_kind" in telconfig:
bl_options["freq_interp_kind"] = telconfig["freq_interp_kind"]
beam_list_obj = BeamList(beam_list, **bl_options)
return beam_list_obj
def _get_tel_loc(tel_dict: dict):
"""Get the telescope location info out of a dict."""
telescope_location_latlonalt = tel_dict["telescope_location"]
if isinstance(telescope_location_latlonalt, str):
telescope_location_latlonalt = ast.literal_eval(telescope_location_latlonalt)
world = tel_dict.pop("world", None)
# get the lunar ellipsoid. Default to None for earth or SPHERE for moon
if world == "moon":
ellipsoid = tel_dict.pop("ellipsoid", "SPHERE")
else:
ellipsoid = tel_dict.pop("ellipsoid", None)
return telescope_location_latlonalt, world, ellipsoid
def _get_array_layout(tele_params, config_path):
# get array layout
if "array_layout" not in tele_params:
raise KeyError("array_layout must be provided.")
array_layout = tele_params.pop("array_layout")
if not isinstance(array_layout, str | dict):
raise ValueError(
"array_layout must be a string or have options that parse as a dict."
)
if isinstance(array_layout, str):
# Interpet as file path to layout csv file.
layout_csv = array_layout
# if array layout is a str, parse it as .csv filepath
if isinstance(layout_csv, str):
if not os.path.exists(layout_csv) and isinstance(config_path, str):
layout_csv = os.path.join(config_path, layout_csv)
if not os.path.exists(layout_csv):
raise ValueError(
f"layout_csv file {layout_csv} from yaml does not exist"
)
ant_layout = _parse_layout_csv(layout_csv)
east, north, up = ant_layout["e"], ant_layout["n"], ant_layout["u"]
antnames = ant_layout["name"]
antnums = np.array(ant_layout["number"])
beam_ids = ant_layout["beamid"]
elif isinstance(array_layout, dict):
# Receiving antenna positions directly
antnums = tele_params.pop("antenna_numbers")
antnames = tele_params.pop("antenna_names")
east, north, up = np.array([array_layout[an] for an in antnums]).T
layout_csv = "user-fed dict"
beam_ids = None
return antnames, antnums, east, north, up, layout_csv, beam_ids
def parse_telescope_params(
tele_params: dict,
*,
config_path: str = "",
freq_array: npt.NDArray[float],
freq_range: (npt.NDArray[float] | list[float] | tuple[float] | None) = None,
return_beams: bool = True,
):
"""
Parse the "telescope" section of obsparam.
Parameters
----------
tele_params : dict
Telescope parameters
See pyuvsim documentation for allowable keys.
https://pyuvsim.readthedocs.io/en/latest/parameter_files.html#telescope-configuration
config_path : str
Path to directory holding configuration and layout files.
freq_array : array-like of float
Simulation frequency array.
freq_range : float
If given, select frequencies on reading the beam.
return_beams : bool
Option to return the beam_list and beam_dict.
Returns
-------
param_dict : dict
Parameters related to the telescope and antenna layout, to be included in the
:class:`pyuvdata.UVData` object.
* `Nants_data`: Number of antennas
* `Nants_telescope`: Number of antennas
* `antenna_names`: list of antenna names
* `antenna_numbers`: corresponding list of antenna numbers
* `antenna_positions`: Array of ECEF antenna positions
* `telescope_location`: ECEF array center location
* `telescope_location_lat_lon_alt`: Lat Lon Alt array center location
* `telescope_config_file`: Path to configuration yaml file
* `antenna_location_file`: Path to csv layout file
* `telescope_name`: observatory name
* `x_orientation`: old, prefer feed_array + feed_angle + mount_type.
physical orientation of the dipole labelled as "x"
* `feed_array`: array of feeds (e.g. ['x', 'y']). Must be
* `feed_angle`: array of feed angles. Interpretation depends on mount_type
see pyuvdata docs.
* `mount_type`: antenna mount type, see pyuvdata docs. Defaults to "fixed"
if feed_array and feed_angle are passed.
* `world`: Either "earth" or "moon" (requires lunarsky).
beam_list : :class:`pyuvsim.BeamList`
This is a BeamList object, only returned if return_beams is True.
beam_dict : dict
Antenna numbers to beam indices, only returned if return_beams is True.
"""
tele_params = copy.deepcopy(tele_params)
# check for telescope config
tele_config = "telescope_config_name" in tele_params
if tele_config:
# parse telescope config
if not os.path.isdir(config_path):
config_path = os.path.dirname(config_path)
if not os.path.isdir(config_path):
raise ValueError(f"config_path {config_path} is not a directory")
telescope_config_name = tele_params["telescope_config_name"]
if not os.path.exists(telescope_config_name):
telescope_config_name = os.path.join(config_path, telescope_config_name)
if not os.path.exists(telescope_config_name):
raise ValueError(
f"telescope_config_name file from yaml does not exist: {telescope_config_name}"
)
with open(telescope_config_name) as yf:
telconfig = yaml.safe_load(yf)
telescope_location_latlonalt, world, ellipsoid = _get_tel_loc(telconfig)
tele_params["telescope_name"] = telconfig["telescope_name"]
else:
# if not provided, get bare-minumum keys from tele_params
if "telescope_location" not in tele_params:
raise KeyError(
"If telescope_config_name not provided in `telescope` obsparam section, "
"you must provide telescope_location"
)
if "telescope_name" not in tele_params:
raise KeyError(
"If telescope_config_name not provided in `telescope` obsparam section, "
"you must provide telescope_name"
)
telescope_location_latlonalt, world, ellipsoid = _get_tel_loc(tele_params)
lat_rad = telescope_location_latlonalt[0] * np.pi / 180.0
long_rad = telescope_location_latlonalt[1] * np.pi / 180.0
alt = telescope_location_latlonalt[2]
if world == "moon":
frame = "mcmf"
elif world == "earth" or world is None:
frame = "itrs"
else:
raise ValueError(f"Invalid world {world}")
tele_params["telescope_location"] = uvutils.XYZ_from_LatLonAlt(
latitude=lat_rad,
longitude=long_rad,
altitude=alt,
frame=frame,
ellipsoid=ellipsoid,
)
telescope_name = tele_params["telescope_name"]
antnames, antnums, east, north, up, layout_csv, beam_ids = _get_array_layout(
tele_params=tele_params, config_path=config_path
)
# fill in outputs with just array info
return_dict = {}
return_dict["Nants_data"] = antnames.size
return_dict["Nants_telescope"] = antnames.size
return_dict["antenna_names"] = np.array(antnames.tolist())
return_dict["antenna_numbers"] = np.array(antnums)
antpos_enu = np.vstack((east, north, up)).T
return_dict["antenna_positions"] = (
uvutils.ECEF_from_ENU(
antpos_enu,
latitude=lat_rad,
longitude=long_rad,
altitude=alt,
frame=frame,
ellipsoid=ellipsoid,
)
- tele_params["telescope_location"]
)
if world is not None:
return_dict["world"] = world
return_dict["telescope_frame"] = frame
return_dict["ellipsoid"] = ellipsoid
return_dict["array_layout"] = layout_csv
return_dict["telescope_location"] = np.asarray(tele_params["telescope_location"])
return_dict["telescope_location_lat_lon_alt"] = np.asarray(
telescope_location_latlonalt
)
return_dict["telescope_name"] = telescope_name
if not tele_config:
beam_info = False
for key in ["feed_array", "feed_angle", "mount_type"]:
if key in tele_params:
return_dict[key] = tele_params[key]
beam_info = True
if not beam_info:
# if no info on beams, just return what we have
# if telescope has feed angle, warn
msg = (
"No beam information, so cannot determine telescope mount_type, "
"feed_array or feed_angle. Specify a telescope config file in "
"the obs param file to get beam information or, if calling "
"from `initialize_uvdata_from_keywords`, specify mount_type, "
"feed_array and feed_angle."
)
warnings.warn(msg)
if not return_beams:
return return_dict
else:
return return_dict, BeamList([]), {}
# if provided, parse sections related to beam files and types
return_dict["telescope_config_name"] = telescope_config_name
beam_ids_inc = np.unique(beam_ids)
beam_dict = {}
beam_list = _construct_beam_list(
beam_ids_inc, telconfig, freq_array=freq_array, freq_range=freq_range
)
# construct feed_angles if appropriate
# use dtype=object so strings don't get truncated.
mount_type = np.full((antnames.size,), None, dtype=object)
feed_angle = np.zeros((antnames.size, beam_list[0].beam.Nfeeds), dtype=float)
# feed_arrays have to be the same -- checked in consistency checker
feed_array = np.repeat(
beam_list[0].beam.feed_array[np.newaxis, :], antnames.size, axis=0
)
for beam_ind, beam_id in enumerate(beam_ids_inc):
wh_this_beam = np.nonzero(beam_ids == beam_id)
which_ants = antnames[wh_this_beam]
for ant in which_ants:
beam_dict[str(ant)] = beam_ind
mount_type[wh_this_beam] = beam_list[beam_ind].beam.mount_type
feed_angle[wh_this_beam, :] = beam_list[beam_ind].beam.feed_angle
if np.any(mount_type):
if not np.all(mount_type):
mount_type[np.nonzero(mount_type == np.array(None))] = "other"
return_dict["mount_type"] = mount_type
return_dict["feed_angle"] = feed_angle
return_dict["feed_array"] = feed_array
_ = return_dict.pop("x_orientation", None)
if not return_beams:
return return_dict
else:
return return_dict, beam_list, beam_dict
def _setup_coord_arrays(coord_params, param_map, tols):
kws_given = ", ".join(sorted([param_map[coord_key] for coord_key in coord_params]))
if "coord_array" in coord_params:
# If the coordinate array is provided just use it.
kwds_used = ["coord_array"]
coord_params["coord_array"] = np.asarray(coord_params["coord_array"])
coord_params["coord_n"] = coord_params["coord_array"].size
coord_params["coord_start"] = coord_params["coord_array"][0]
coord_params["coord_end"] = coord_params["coord_array"][-1]
if "coord_delta" in coord_params:
# If the deltas are provided use them
coord_params["coord_delta"] = np.atleast_1d(coord_params["coord_delta"])
if coord_params["coord_delta"].size == 1:
coord_params["coord_delta"] = np.full(
(coord_params["coord_n"],),
coord_params["coord_delta"][0],
dtype=float,
)
elif coord_params["coord_delta"].size != coord_params["coord_n"]:
raise ValueError(
f"If {param_map['coord_delta']} has multiple elements, the "
f"{param_map['coord_delta']} must be the same length as "
f"{param_map['coord_array']}."
)
kwds_used.append("coord_delta")
else:
# Can only calculate the delta if coords are evenly spaced
if coord_params["coord_n"] > 1:
if not uvutils.tools._test_array_constant_spacing(
coord_params["coord_array"], tols=tols
):
raise ValueError(
f"{param_map['coord_delta']} must be specified if spacing "
f"in {param_map['coord_array']} is uneven."
)
coord_params["coord_delta"] = np.full(
(coord_params["coord_n"],),
np.mean(np.diff(coord_params["coord_array"])),
dtype=float,
)
else:
raise ValueError(
f"{param_map['coord_delta']} must be specified if "
f"{param_map['coord_array']} has length 1"
)
# length is the *full length including widths* so delta is needed to
# calculate the lengths
coord_params["coord_length"] = (
coord_params["coord_array"][-1] + coord_params["coord_delta"][-1] * 0.5
) - (coord_params["coord_array"][0] - coord_params["coord_delta"][0] * 0.5)
else:
# The final linspace call needs coord_start, coord_end and coord_n.
# Calculate any of these we need from the other available parameters
# Also calculate the other parameters so we can test for consistency
# on any extra parameters that are provided
if "coord_start" not in coord_params and "coord_end" not in coord_params:
raise ValueError(
f"Either {param_map['coord_start']} or {param_map['coord_end']} "
"must be specified. The parameters that were specified were: "
+ kws_given
)
if (
"coord_delta" in coord_params
and np.asarray(coord_params["coord_delta"]).size > 1
):
raise ValueError(
f"{param_map['coord_delta']} must be a scalar if "
f"{param_map['coord_array']} is not specified"
)
if "coord_start" in coord_params and "coord_end" in coord_params:
# Both start and end are specified
kwds_used = ["coord_start", "coord_end"]
if "coord_n" not in coord_params and "coord_delta" not in coord_params:
raise ValueError(
f"If both {param_map['coord_start']} and {param_map['coord_end']} "
f"are specified, either {param_map['coord_n']} or "
f"{param_map['coord_delta']} must be specified as well. The "
"parameters that were specified were: " + kws_given
)
if "coord_n" in coord_params:
kwds_used.append("coord_n")
extent = coord_params["coord_end"] - coord_params["coord_start"]
if coord_params["coord_n"] > 1:
coord_delta = extent / (coord_params["coord_n"] - 1)
coord_length = extent + coord_delta
coord_params["coord_delta"] = coord_delta
elif (
"coord_delta" not in coord_params
and "coord_length" not in coord_params
):
raise ValueError(
f"If {param_map['coord_n']} is 1 then either "
f"{param_map['coord_delta']} or {param_map['coord_length']} "
"must be specified."
)
elif "coord_delta" in coord_params:
kwds_used.remove("coord_end")
kwds_used.append("coord_delta")
coord_params["coord_delta"] = np.atleast_1d(
coord_params["coord_delta"]
)
coord_length = coord_params["coord_delta"][0]
else:
kwds_used.remove("coord_end")
kwds_used.append("coord_length")
coord_length = coord_params["coord_length"]
coord_params["coord_delta"] = np.atleast_1d(
coord_params["coord_length"]
)
else:
kwds_used.append("coord_delta")
coord_length = (
coord_params["coord_end"]
- coord_params["coord_start"]
+ coord_params["coord_delta"]
)
coord_params["coord_n"] = float(
coord_length / coord_params["coord_delta"]
)
coord_params["coord_length"] = coord_length
else:
# Only one of start & end are specified
other_specs = [
"coord_n" in coord_params,
"coord_delta" in coord_params,
"coord_length" in coord_params,
]
if np.sum(other_specs) < 2:
raise ValueError(
f"If only one of {param_map['coord_start']} and "
f"{param_map['coord_end']} is specified, two more of "
f"{param_map['coord_n']}, {param_map['coord_delta']} and "
f"{param_map['coord_length']} must be specified as well. "
"The parameters that were specified were: " + kws_given
)
if "coord_n" in coord_params:
kwds_used = ["coord_n"]
if "coord_delta" in coord_params:
kwds_used.append("coord_delta")
coord_length = coord_params["coord_n"] * coord_params["coord_delta"]
coord_params["coord_length"] = coord_length
else:
kwds_used.append("coord_length")
coord_params["coord_delta"] = (
coord_params["coord_length"] / coord_params["coord_n"]
)
else:
kwds_used = ["coord_delta", "coord_length"]
coord_params["coord_n"] = (
coord_params["coord_length"] / coord_params["coord_delta"]
)
if "coord_start" in coord_params:
kwds_used.append("coord_start")
coord_params["coord_end"] = (
coord_params["coord_start"]
+ coord_params["coord_length"]
- coord_params["coord_delta"]
)
else:
kwds_used.append("coord_end")
coord_params["coord_start"] = (
coord_params["coord_end"]
- coord_params["coord_length"]
+ coord_params["coord_delta"]
)
if not np.isclose(coord_params["coord_n"], np.round(coord_params["coord_n"])):
raise ValueError(
f"{param_map['coord_end']} - {param_map['coord_start']} must be "
f"evenly divisible by {param_map['coord_delta']}"
)
coord_params["coord_n"] = int(np.round(coord_params["coord_n"]))
coord_params["coord_array"], coord_delta = np.linspace(
coord_params["coord_start"],
coord_params["coord_end"],
coord_params["coord_n"],
retstep=True,
endpoint=True,
)
if coord_params["coord_n"] > 1:
# Set the delta to be the linspace step size unless there's only
# one value, in which case use the value that was passed in or
# calculated earlier
coord_params["coord_delta"] = np.full(
(coord_params["coord_n"],), coord_delta, dtype=float
)
else:
# ensure changes in start/end are detected
coord_params["coord_start"] = coord_params["coord_array"][0]
coord_params["coord_end"] = coord_params["coord_array"][-1]
# ensure delta is an array
coord_params["coord_delta"] = np.atleast_1d(coord_params["coord_delta"])
coord_params["params_used"] = kwds_used
return coord_params
def parse_frequency_params(freq_params):
"""
Parse the "freq" section of obsparam.
Parameters
----------
freq_params : dict
Dictionary of frequency parameters.
See pyuvsim documentation for examples of allowable key combinations.
https://pyuvsim.readthedocs.io/en/latest/parameter_files.html#frequency
Returns
-------
dict
Dictionary of :class:`pyuvdata.UVData` parameters related to frequency:
* `channel_width`: (dtype float, ndarray, shape=(Nfreqs)) Frequency channel
widths in Hz
* `Nfreqs`: (int) Number of frequencies
* `freq_array`: (dtype float, ndarray, shape=(Nfreqs)) Frequency channel
centers in Hz
"""
freq_params_orig = copy.deepcopy(freq_params)
# remove Nspws because we don't use it we set it to 1 it at the end anyway
if "Nspws" in freq_params_orig:
del freq_params_orig["Nspws"]
freq_params = copy.deepcopy(freq_params_orig)
ftols = UVData()._freq_array.tols
freq_param_map = {
"coord_array": "freq_array",
"coord_delta": "channel_width",
"coord_start": "start_freq",
"coord_end": "end_freq",
"coord_n": "Nfreqs",
"coord_length": "bandwidth",
}
freq_param_map_rev = {
freq_key: coord_key for coord_key, freq_key in freq_param_map.items()
}
coord_params = {
freq_param_map_rev[freq_key]: value for freq_key, value in freq_params.items()
}
coord_params = _setup_coord_arrays(
coord_params, param_map=freq_param_map, tols=ftols
)
freq_params = {
freq_key: coord_params[coord_key]
for coord_key, freq_key in freq_param_map.items()
}
kwds_used = [freq_param_map[coord_key] for coord_key in coord_params["params_used"]]
# Warn about inconsistencies in supplied parameters
inconsistent_params = []
for kwd in freq_params_orig:
if kwd == "channel_width" or kwd == "freq_array":
orig_arr = np.atleast_1d(freq_params_orig[kwd])
if orig_arr.size != freq_params[kwd].size:
orig_arr = np.full(
(freq_params["Nfreqs"],), freq_params_orig[kwd], dtype=float
)
if not np.allclose(
orig_arr, freq_params[kwd], rtol=ftols[0], atol=ftols[1]
):
inconsistent_params.append(kwd)
else:
if not np.isclose(
freq_params_orig[kwd], freq_params[kwd], rtol=ftols[0], atol=ftols[1]
):
inconsistent_params.append(kwd)
if len(inconsistent_params) > 0:
warnings.warn(
f"The {', '.join(inconsistent_params)} is not consistent with "
f"the {', '.join(kwds_used)} specified. Using the values calculated "
f"from the {', '.join(kwds_used)} parameters. Input values were: "
f"{[freq_params_orig[kwd] for kwd in inconsistent_params]}, calculated "
f"values were: {[freq_params[kwd] for kwd in inconsistent_params]}."
)
# return the things needed for UVData objects
return_dict = {}
return_dict["Nfreqs"] = freq_params["Nfreqs"]
return_dict["freq_array"] = freq_params["freq_array"]
return_dict["channel_width"] = freq_params["channel_width"]
return_dict["Nspws"] = 1
return return_dict
def parse_time_params(time_params):
"""
Parse the "time" section of obsparam.
Parameters
----------
time_params : dict
Dictionary of time parameters
See pyuvsim documentation for examples of allowable key combinations.
https://pyuvsim.readthedocs.io/en/latest/parameter_files.html#time
Returns
-------
dict
Dictionary of :class:`pyuvdata.UVData` parameters related to time:
* `integration_time`: (float) Time array spacing in seconds.
* `Ntimes`: (int) Number of times
* `start_time`: (float) Starting time in Julian Date
* `time_array`: (dtype float, ndarray, shape=(Ntimes,)) Time step centers in JD.
"""
init_time_params = copy.deepcopy(time_params)
# Handle the time_offset: add it to all absolute time parameters unless
# it looks like they already have it, in which case warn!
if "time_offset" in init_time_params:
for param in ["start_time", "end_time", "time_array"]:
if param in init_time_params:
if np.max(init_time_params[param]) > init_time_params["time_offset"]:
warnings.warn(
f"time_offset is present, but {param} is larger than "
f"time_offset, so not adding time_offset to {param}."
)
else:
if isinstance(init_time_params[param], list):
init_time_params[param] = (
np.asarray(init_time_params[param])
+ init_time_params["time_offset"]
).tolist()
else:
init_time_params[param] += init_time_params["time_offset"]
del init_time_params["time_offset"]
time_params = copy.deepcopy(init_time_params)
dtols = UVData()._time_array.tols
stols = UVData()._integration_time.tols
# deal with the different units for different time parameters
daysperhour = 1 / 24.0
hourspersec = 1 / 60.0**2
dayspersec = daysperhour * hourspersec
if "integration_time" in time_params:
time_params["int_time_days"] = (
np.asarray(time_params.pop("integration_time")) * dayspersec
)
if "duration_hours" in time_params:
if "duration_days" in time_params:
warnings.warn(
"Both duration_hours and duration_days are specified, using duration_days."
)
time_params.pop("duration_hours")
else:
time_params["duration_days"] = (
time_params.pop("duration_hours") * daysperhour
)
time_param_map = {
"coord_array": "time_array",
"coord_delta": "int_time_days",
"coord_start": "start_time",
"coord_end": "end_time",
"coord_n": "Ntimes",
"coord_length": "duration_days",
}
time_param_map_rev = {
freq_key: coord_key for coord_key, freq_key in time_param_map.items()
}
coord_params = {
time_param_map_rev[freq_key]: value for freq_key, value in time_params.items()
}
# fix the names in errors/warnings to be the original names
time_param_map_for_func = copy.deepcopy(time_param_map)
time_param_map_for_func["coord_delta"] = "integration_time"
time_param_map_for_func["coord_length"] = "duration"
coord_params = _setup_coord_arrays(
coord_params, param_map=time_param_map_for_func, tols=dtols
)
time_params = {
freq_key: coord_params[coord_key]
for coord_key, freq_key in time_param_map.items()
}
kwds_used = [time_param_map[coord_key] for coord_key in coord_params["params_used"]]
# Go back to the right units
time_params["integration_time"] = time_params.pop("int_time_days") / dayspersec
kwds_used = [
"integration_time" if kwd == "int_time_days" else kwd for kwd in kwds_used
]
if "duration_days" in kwds_used and "duration_days" not in init_time_params:
kwds_used = [
"duration_hours" if kwd == "duration_days" else kwd for kwd in kwds_used
]
# Warn about inconsistencies in supplied parameters
inconsistent_params = []
for kwd in init_time_params:
if kwd == "integration_time" or kwd == "time_array":
orig_arr = np.atleast_1d(init_time_params[kwd])
if orig_arr.size != time_params[kwd].size:
orig_arr = np.full(
(time_params["Ntimes"],), init_time_params[kwd], dtype=float
)
if not np.allclose(
orig_arr, time_params[kwd], rtol=stols[0], atol=stols[1]
):
inconsistent_params.append(kwd)
elif kwd == "duration_hours":
orig_duration_days = init_time_params[kwd] * daysperhour
if not np.isclose(
orig_duration_days,
time_params["duration_days"],
rtol=dtols[0],
atol=dtols[1],
):
inconsistent_params.append(kwd)
time_params[kwd] = time_params["duration_days"] / daysperhour
else:
if not np.isclose(
init_time_params[kwd], time_params[kwd], rtol=dtols[0], atol=dtols[1]
):
inconsistent_params.append(kwd)
if len(inconsistent_params) > 0:
warnings.warn(
f"The {', '.join(inconsistent_params)} is not consistent with "
f"the {', '.join(kwds_used)} specified. Using the values calculated "
f"from the {', '.join(kwds_used)} parameters. Input values were: "
f"{[init_time_params[kwd] for kwd in inconsistent_params]}, calculated "
f"values were: {[time_params[kwd] for kwd in inconsistent_params]}."
)
# return the things needed for UVData objects
return_dict = {}
return_dict["integration_time"] = time_params["integration_time"]
return_dict["time_array"] = time_params["time_array"]
return_dict["Ntimes"] = time_params["Ntimes"]
return_dict["start_time"] = time_params["start_time"]
return return_dict
def _coord_arrays_to_params(coord_array, coord_delta, tols):
if coord_array.size > 1 and (np.asarray(coord_delta)).size == 1:
coord_delta = np.full_like(coord_array, coord_delta)
if (
not uvutils.tools._test_array_constant_spacing(coord_array, tols=tols)
or not uvutils.tools._test_array_constant(coord_delta, tols=tols)
or not uvutils.tools._test_array_consistent(coord_array, coord_delta, tols=tols)
or coord_array.size == 1
):
# arrays are not evenly spaced or deltas do not match diffs or only one
# value in the array. Just use the arrays and deltas.
return {
"coord_array": coord_array.tolist(),
"coord_delta": coord_delta.tolist(),
}
else:
# Evenly spaced arrays and delta matches array diffs
# Use start/end and number because these are easier for people to
# read and understand
return {
"coord_start": float(coord_array[0]),
"coord_end": float(coord_array[-1]),
"coord_n": coord_array.size,
}
def freq_array_to_params(freq_array, channel_width):
"""
Get a set of parameters that can be used to generate a given frequency array.
Parameters
----------
freq_array : array of float
Frequencies in Hz.
channel_width : array of float
Channel widths in Hz.
Returns
-------
dict
Dictionary of frequency parameters consistent with freq_array.
(channel_width, Nfreqs, bandwidth, start_freq, end_freq)
See pyuvsim documentation for details:
https://pyuvsim.readthedocs.io/en/latest/parameter_files.html#frequency
"""
freq_param_map = {
"coord_array": "freq_array",
"coord_delta": "channel_width",
"coord_start": "start_freq",
"coord_end": "end_freq",
"coord_n": "Nfreqs",
"coord_length": "bandwidth",
}
coord_params = _coord_arrays_to_params(
coord_array=freq_array,
coord_delta=channel_width,
tols=UVData()._freq_array.tols,
)
return {
freq_param_map[coord_key]: value for coord_key, value in coord_params.items()
}
def time_array_to_params(time_array, integration_time):
"""
Get a set of parameters that can be used to generate a given time array.
Returns a dictionary of parameters that can be used by parse_time_params
to obtain the time_array passed in.
Parameters
----------
time_array : array of float
Julian dates.
integration_time : array of float
Integration times in seconds.
Returns
-------
dict
Dictionary of time parameters consistent with time_array.
(integration_time, Ntimes, duration_days, start_time, end_times)
See pyuvsim documentation for details:
https://pyuvsim.readthedocs.io/en/latest/parameter_files.html#time
"""
# times maybe supplied as length Nblts with lots of duplicates.
# Find unique times.
time_array, unique_inverse = np.unique(np.asarray(time_array), return_inverse=True)
# If there are duplicate times, we need to downselect the integration time array
# to match. Easy if integration times are all the same, hard otherwise.
if time_array.size < unique_inverse.size:
# there were some repeated times
int_time_use = np.full_like(time_array, np.nan)
if not uvutils.tools._test_array_constant(
integration_time, tols=UVData()._integration_time.tols
):
int_time_varies_per_time = False
for tind in range(time_array.size):
int_times_this_time = integration_time[
np.nonzero(unique_inverse == tind)
]
if not uvutils.tools._test_array_constant(
int_times_this_time, tols=UVData()._integration_time.tols
):
int_time_varies_per_time = True
int_time_use[tind] = np.min(int_times_this_time)
else:
int_time_use[tind] = np.mean(int_times_this_time)
if int_time_varies_per_time:
warnings.warn(
"integration time varies for unique times, using the shortest "
"integration time for each unique time."
)
else:
int_time_use = np.full_like(time_array, np.mean(integration_time))
integration_time = int_time_use
# deal with the different units for different time parameters
daysperhour = 1 / 24.0
hourspersec = 1 / 60.0**2
dayspersec = daysperhour * hourspersec
int_time_days = integration_time * dayspersec
time_param_map = {
"coord_array": "time_array",
"coord_delta": "int_time_days",
"coord_start": "start_time",
"coord_end": "end_time",
"coord_n": "Ntimes",
"coord_length": "duration_days",
}
coord_params = _coord_arrays_to_params(
coord_array=time_array,
coord_delta=int_time_days,
tols=UVData()._time_array.tols,
)
time_params = {
time_param_map[coord_key]: value for coord_key, value in coord_params.items()
}
# go back to the right units
if "int_time_days" in time_params:
time_params["integration_time"] = (
np.asarray(time_params.pop("int_time_days")) / dayspersec
).tolist()
# use time_offset because if these dicts are dumped to yamls we lose too
# much precision on the absolute time parameters
if "time_array" in time_params:
# Set the offset as the JD at midnight of the first time
time_params["time_offset"] = float(
np.floor(time_params["time_array"][0] - 0.5) + 0.5
)
time_params["time_array"] = (
np.asarray(time_params["time_array"]) - time_params["time_offset"]
).tolist()
else:
# Set the offset as the JD at midnight of the first time
time_params["time_offset"] = float(
np.floor(time_params["start_time"] - 0.5) + 0.5
)
time_params["start_time"] = (
time_params["start_time"] - time_params["time_offset"]
)
time_params["end_time"] = time_params["end_time"] - time_params["time_offset"]
return time_params
def subselect(uv_obj, param_dict):
"""
Do selection on a UVData object.
Parameters
----------
uv_obj : UVData
UVData object to do the selection on.
param_dict : dict
Dict of parameters that can include a `select` section.
"""
# downselect baselines (or anything that can be passed to pyuvdata's select method)
# Note: polarization selection is allowed here, but will cause an error if the incorrect pols
# are passed to pyuvsim.
if "select" not in param_dict:
return
valid_select_keys = [
"antenna_nums",
"antenna_names",
"ant_str",
"frequencies",
"freq_chans",
"times",
"blt_inds",
]
select_params = param_dict["select"]
no_autos = bool(select_params.pop("no_autos", False))
select_params = {k: v for k, v in select_params.items() if k in valid_select_keys}
if "antenna_nums" in select_params:
select_params["antenna_nums"] = list(map(int, select_params["antenna_nums"]))
redundant_threshold = param_dict["select"].get("redundant_threshold", None)
if len(select_params) > 0:
uv_obj.select(**select_params)
if no_autos:
uv_obj.select(ant_str="cross")
if redundant_threshold is not None:
uv_obj.compress_by_redundancy(tol=redundant_threshold, use_grid_alg=True)
def set_ordering(uv_obj, param_dict, reorder_blt_kw):
"""
Do conjugation/reordering on a UVData object.
Parameters
----------
uv_obj : UVData
UVData object to reorder.
param_dict : dict
Dict of parameters that can include an `order` section.
reorder_blt_kw : dict
Dict giving order & minor order for blt reordering.
"""
bl_conjugation_convention = None
# this is the order required by the simulator. Might as well default to it.
default_blt_order = ["time", "baseline"]
if "ordering" in param_dict:
ordering_dict = param_dict["ordering"]
if "blt_order" not in ordering_dict:
ordering_dict["blt_order"] = default_blt_order
else:
ordering_dict = {"blt_order": default_blt_order}
bl_conjugation_convention = ordering_dict.pop("conjugation_convention", None)
if reorder_blt_kw is None:
blt_order = ordering_dict["blt_order"]
if isinstance(blt_order, str):
reorder_blt_kw = {"order": blt_order}
if isinstance(blt_order, tuple | list | np.ndarray):
reorder_blt_kw = {"order": blt_order[0]}
if len(blt_order) > 1:
reorder_blt_kw["minor_order"] = blt_order[1]
if bl_conjugation_convention is None:
warnings.warn(
"The default baseline conjugation convention has changed. In the past "
"it was 'ant2<ant1', it now defaults to 'ant1<ant2'. You can specify "
"the baseline conjugation convention in `obs_param` by setting the "
"obs_param['ordering']['conjugation_convention'] field. This warning will go "
"away in version 1.5."
)
elif bl_conjugation_convention != "ant1<ant2":
uv_obj.conjugate_bls(convention=bl_conjugation_convention)
obj_blt_order = uv_obj.blt_order
if obj_blt_order is not None:
obj_blt_order = {"order": uv_obj.blt_order[0]}
if len(uv_obj.blt_order) > 1:
obj_blt_order["minor_order"] = uv_obj.blt_order[1]
if reorder_blt_kw and reorder_blt_kw != obj_blt_order:
uv_obj.reorder_blts(**reorder_blt_kw)
def _initialize_polarization_helper(param_dict, uvparam_dict, beam_list=None):
# There does not seem to be any way to get polarization_array into uvparam_dict, so
# let's add it explicitly.
if param_dict.get("polarization_array", None) is not None:
if beam_list is not None:
polstr_list = uvutils.polnum2str(param_dict["polarization_array"])
feed_set = set()
for polstr in polstr_list:
feed_set = feed_set.union(uvutils.pol.POL_TO_FEED_DICT[polstr])
for feed in feed_set:
beam_feeds = beam_list[0].feed_array
if feed not in beam_feeds:
raise ValueError(
f"Specified polarization array {param_dict['polarization_array']} "
f"requires feeds {feed_set} but the beams have feeds {beam_feeds}."
)
uvparam_dict["polarization_array"] = np.array(param_dict["polarization_array"])
# Parse polarizations
if uvparam_dict.get("polarization_array") is None:
if beam_list is not None:
# default polarization array based on the beam feeds
uvparam_dict["polarization_array"] = uvutils.pol.convert_feeds_to_pols(
feed_array=beam_list[0].feed_array, include_cross_pols=True
)
else:
uvparam_dict["polarization_array"] = np.array([-5, -6, -7, -8])
if "Npols" not in uvparam_dict:
uvparam_dict["Npols"] = len(uvparam_dict["polarization_array"])
return uvparam_dict
[docs]def initialize_uvdata_from_params(
obs_params, return_beams=False, reorder_blt_kw=None, check_kw=None
):
"""
Construct a :class:`pyuvdata.UVData` object from parameters in a valid yaml file.
Sufficient information must be provided by the parameters to define time and frequency arrays
and verify the channel widths and time steps. This will error if insufficient or incompatible
parameters are defined.
The parameter dictionary may contain any valid :class:`pyuvdata.UVData` attributes as well.
If the polarization array is not specified, it defaults to (XX, XY, YX, YY).
Parameters
----------
obs_params : dict or str
Either an obs_param file name or a dictionary of parameters read in.
Additional :class:`pyuvdata.UVData` parameters may be passed in through here.
return_beams : bool
Option to return the beam_list and beam_dict. Default is False.
reorder_blt_kw : dict (optional)
Deprecated. Use obs_param['ordering']['blt_order'] instead.
Keyword arguments to send to the ``uvdata.reorder_blts`` method at the end of
the setup process. Typical parameters include "order" and "minor_order". Default
values are ``order='time'`` and ``minor_order='baseline'``.
check_kw : dict (optional)
A dictionary of keyword arguments to pass to the :func:`uvdata.UVData.check`
method after object creation. Caution: turning off critical checks can
result in a UVData object that cannot be written to a file.
Returns
-------
uv_obj : :class:`pyuvdata.UVData`
Initialized UVData object.
beam_list : :class:`pyuvsim.BeamList`
List of beam specifiers as strings, only returned if return_beams is True.
beam_dict : dict
Map of antenna numbers to index in beam_list, only returned if return_beams is
True.
"""
logger.info("Starting SimSetup")
if reorder_blt_kw is not None:
warnings.warn(
"The reorder_blt_kw parameter is deprecated in favor of setting "
"obs_param['ordering']['blt_order']. This will become an error in "
"version 1.5",
DeprecationWarning,
)
uvparam_dict = {} # Parameters that will go into UVData
if isinstance(obs_params, str):
param_dict = _config_str_to_dict(obs_params) # Container for received settings.
else:
param_dict = copy.deepcopy(obs_params)
logger.info("Finished reading obsparams")
# Parse frequency structure
freq_dict = param_dict["freq"]
parsed_freq_dict = parse_frequency_params(freq_dict)
uvparam_dict["freq_array"] = parsed_freq_dict["freq_array"]
uvparam_dict["channel_width"] = parsed_freq_dict["channel_width"]
freq_array = parsed_freq_dict["freq_array"]
logger.info("Finished reading frequency info")
# Parse telescope parameters
tele_dict = param_dict["telescope"]
beamselect = tele_dict.pop("select", {})
if len(beamselect) > 0:
warnings.warn(
"Beam selections should be specified in the telescope "
"configuration, not in the obsparam. This will become an error in "
"version 1.5",
DeprecationWarning,
)
freq_buffer = beamselect.pop("freq_buffer", None)
if freq_buffer:
freq_range = (freq_array.min() - freq_buffer, freq_array.max() + freq_buffer)
else:
freq_range = None
ptp_ret = parse_telescope_params(
tele_dict,
config_path=param_dict["config_path"],
freq_array=freq_array,
freq_range=freq_range,
return_beams=return_beams,
)
if return_beams:
tele_params, beam_list, beam_dict = ptp_ret
else:
tele_params = ptp_ret
logger.info("Finished Setup of BeamList")
# Use extra_keywords to pass along required paths for file history.
extra_keywords = {}
if "obs_param_file" in param_dict:
extra_keywords["obsparam"] = param_dict["obs_param_file"]
if "telescope_config_name" in tele_dict and isinstance(
tele_dict["telescope_config_name"], str
):
extra_keywords["telecfg"] = tele_dict["telescope_config_name"]
if "array_layout" in tele_dict and isinstance(tele_dict["array_layout"], str):
extra_keywords["layout"] = tele_dict["array_layout"]
if "world" in tele_params:
extra_keywords["world"] = tele_params.get("world")
uvparam_dict["extra_keywords"] = extra_keywords
# Parse time structure
time_dict = param_dict["time"]
parsed_time_dict = parse_time_params(time_dict)
uvparam_dict["times"] = parsed_time_dict["time_array"]
uvparam_dict["integration_time"] = parsed_time_dict["integration_time"]
logger.info("Finished Setup of Time Dict")
# figure out polarization
if return_beams:
bl_pass = beam_list
else:
bl_pass = None
uvparam_dict = _initialize_polarization_helper(
param_dict, uvparam_dict, beam_list=bl_pass
)
# telescope frame is set from world in parse_telescope_params.
# Can only be itrs or (if lunarsky is installed) mcmf
if tele_params["telescope_frame"] == "itrs":
telescope_location = EarthLocation.from_geocentric(
*tele_params["telescope_location"], unit="m"
)
elif tele_params["telescope_frame"] == "mcmf":
# to get here, lunarsky has to be installed, so don't need to test for it
# It's checked in the earlier call to parse_telescope_params, which sets
# tele_params["telescope_frame"] to either "itrs" or "mcmf"
from lunarsky import MoonLocation
telescope_location = MoonLocation.from_selenocentric(
*tele_params["telescope_location"], unit="m"
)
telescope_location.ellipsoid = tele_params["ellipsoid"]
if "cat_name" not in param_dict and "object_name" not in param_dict:
cat_name = "unprojected"
elif "object_name" in param_dict:
cat_name = param_dict["object_name"]
else:
cat_name = param_dict["cat_name"]
phase_center_catalog = {0: {"cat_name": cat_name, "cat_type": "unprojected"}}
# Set the antpairs _before_ creating the uvdata object to conserve memory.
antpairs = param_dict.get("select", {}).pop("bls", None)
if isinstance(antpairs, str):
antpairs = ast.literal_eval(antpairs)
tel_init_params = {"location": telescope_location}
telescope_param_map = {
"telescope_name": "name",
"antenna_names": "antenna_names",
"antenna_numbers": "antenna_numbers",
"antenna_positions": "antenna_positions",
"antenna_diameters": "antenna_diameters",
"x_orientation": "x_orientation",
"feed_angle": "feed_angle",
"feed_array": "feed_array",
"mount_type": "mount_type",
"instrument": "instrument",
}
for key, tel_key in telescope_param_map.items():
if key in param_dict["telescope"]:
tel_init_params[tel_key] = param_dict["telescope"][key]
if key in tele_params:
tel_init_params[tel_key] = tele_params[key]
if "instrument" not in tel_init_params:
tel_init_params["instrument"] = tel_init_params["name"]
uv_obj = UVData.new(
telescope=Telescope.new(**tel_init_params),
phase_center_catalog=phase_center_catalog,
vis_units="Jy",
history="",
do_blt_outer=True,
time_axis_faster_than_bls=False,
antpairs=antpairs,
check_kw=check_kw,
**uvparam_dict,
)
logger.info(f" Baseline Array: {uv_obj.baseline_array.nbytes / 1024**3:.2f} GB")
logger.info(f" Time Array: {uv_obj.time_array.nbytes / 1024**3:.2f} GB")
logger.info(f"Integration Time: {uv_obj.integration_time.nbytes / 1024**3:.2f} GB")
logger.info(f" Ant1Array: {uv_obj.ant_1_array.nbytes / 1024**3:.2f} GB")
logger.info(f"BLT-ORDER: {uv_obj.blt_order}")
logger.info("Initialized UVData object")
subselect(uv_obj, param_dict)
logger.info("After Select")
set_ordering(uv_obj, param_dict, reorder_blt_kw)
logger.info(f"BLT-ORDER: {uv_obj.blt_order}")
logger.info("After Re-order BLTS")
logger.info("After Check")
logger.info(f"BLT-ORDER: {uv_obj.blt_order}")
if return_beams:
return uv_obj, beam_list, beam_dict
else:
return uv_obj
def _complete_uvdata(uv_in, inplace=False):
"""
Initialize data-like arrays on a a :class:`pyuvdata.UVData` object.
This will overwrite any existing data in `uv_in` unless inplace=True.
Parameters
----------
uv_in : :class:`pyuvdata.UVData` instance
Usually an incomplete object, containing only metadata.
inplace : bool, optional
Whether to perform the filling on the passed object, or a copy.
Returns
-------
:class:`pyuvdata.UVData` : object with initialized data-like arrays
(if `inplace` is `True`, it is on a copy of the input). With zeroed
data_array, no flags and nsample_array of all ones.
"""
if not inplace:
uv_obj = copy.deepcopy(uv_in)
else:
uv_obj = uv_in
# moved to have meta-data compatible objects from the init
# but retain this functionality for now.
# in most cases should be a no-op anyway.
if uv_obj.lst_array is None:
uv_obj.set_lsts_from_time_array()
# Clear existing data, if any.
_shape = (uv_obj.Nblts, uv_obj.Nfreqs, uv_obj.Npols)
uv_obj.data_array = np.zeros(_shape, dtype=complex)
uv_obj.flag_array = np.zeros(_shape, dtype=bool)
uv_obj.nsample_array = np.ones(_shape, dtype=float)
return uv_obj
[docs]def initialize_uvdata_from_keywords(
output_yaml_filename=None,
antenna_layout_filepath=None,
output_layout_filename=None,
array_layout=None,
telescope_location=None,
telescope_name=None,
feed_array=None,
feed_angle=None,
mount_type=None,
Nfreqs=None,
start_freq=None,
bandwidth=None,
freq_array=None,
channel_width=None,
Ntimes=None,
integration_time=None,
start_time=None,
time_array=None,
bls=None,
antenna_nums=None,
antenna_names=None,
polarization_array=None,
no_autos=False,
redundant_threshold=None,
conjugation_convention=None,
blt_order=None,
write_files=True,
path_out=None,
complete=False,
check_kw: dict | None = None,
**kwargs,
):
"""
Initialize a :class:`pyuvdata.UVData` object from keyword arguments.
Optionally, write out the configuration to YAML and CSV files such that
:func:`~initialize_uvdata_from_params` will produce the same :class:`pyuvdata.UVData` object.
Parameters
----------
output_yaml_filename : str (optional)
Specify filename for yaml file to write out.
Defaults to obsparam.yaml
antenna_layout_filepath : str (optional)
Path to csv file of antenna positions (see documentation for details).
output_layout_filename : str (optional)
File name for antenna layout if writing files out.
If unspecified, defaults to the basename of the antenna_layout_filepath.
If that is not given, it defaults to "antenna_layout.csv"
Will not overwrite existing files.
array_layout : dictionary (required if antenna_layout_filepath not given)
keys are integer antenna numbers, values are len-3 antenna positions
in ENU coordinates [meters].
antenna_names : list of str (optional)
If unset, antenna names are assigned as "%s" % antnum.
telescope_location : array of float, shape (3,)
Telescope location on Earth in LatLonAlt coordinates [deg, deg, meters]
telescope_name : str
Name of telescope
feed_array : array-like of str or None
List of feeds for each antenna in the telescope, must be one of
"x", "y", "l", "r". Shape (Nants, Nfeeds), dtype str. Can also be shape
(Nfeeds,), in which case the same values for feed_array are used for
all antennas in the object.
feed_angle : array-like of float or None
Orientation of the feed with respect to zenith (or with respect to north if
pointed at zenith). Units is in rads, vertical polarization is nominally 0,
and horizontal polarization is nominally pi / 2. Shape (Nants, Nfeeds),
dtype float. Can also be shape (Nfeeds,), in which case the same values for
feed_angle are used for all antennas in the object.
mount_type : str or array-like of str
Antenna mount type, which describes the optics of the antenna in question.
Supported options include: "alt-az" (primary rotates in azimuth and
elevation), "equatorial" (primary rotates in hour angle and declination)
"orbiting" (antenna is in motion, and its orientation depends on orbital
parameters), "x-y" (primary rotates first in the plane connecting east,
west, and zenith, and then perpendicular to that plane),
"alt-az+nasmyth-r" ("alt-az" mount with a right-handed 90-degree tertiary
mirror), "alt-az+nasmyth-l" ("alt-az" mount with a left-handed 90-degree
tertiary mirror), "phased" (antenna is "electronically steered" by
summing the voltages of multiple elements, e.g. MWA), "fixed" (antenna
beam pattern is fixed in azimuth and elevation, e.g., HERA), and "other"
(also referred to in some formats as "bizarre"). See the "Conventions"
page of the documentation for further details. Shape (Nants,), dtype str.
Can also provide a single string, in which case the same mount_type is
used for all antennas in the object.
Nfreqs : int
Number of frequency channels
start_freq : float
Starting frequency [Hz]
bandwidth : float
Total frequency bandwidth of spectral window [Hz]
channel_width: float
Frequency channel spacing [Hz]
freq_array : ndarray
frequency array [Hz], if this is specified, it supersedes Nfreqs,
start_freq, bandwidth
Ntimes : int
Number of integration bins
integration_time : float
Width of time bins [seconds]
start_time : float
Time of the first integration bin [Julian Date]
time_array : ndarray
time array [Julian Date]. If this is specified it supersedes values of Ntimes,
start_time and integration_time
bls : list
List of antenna-pair tuples for baseline selection
redundant_threshold: float
Redundant baseline selection tolerance for selection [meters]
antenna_nums : list
List of antenna numbers to keep in array
polarization_array : list
List of polarization strings (or ints) to insert into object
no_autos : bool
If True, eliminate all auto correlations
conjugation_convention : str, optional
A convention for the directions of the baselines, options are:
'ant1<ant2', 'ant2<ant1', 'u<0', 'u>0', 'v<0', 'v>0'
blt_order : str or array-like of str, optional
Specifies the ordering along the blt axis. Defaults to ['time', 'baseline'].
If set to anything else, it is passed to :meth:`pyuvdata.UVData.reorder_blts`.
A single string specifies the order, a list of two strings specifies the
[order, minor_order]. See the docs on the UVData method for more information.
write_files : bool
If True, write out the parameter information to yaml files.
path_out : str (optional)
Path in which to place generated configuration files, if write_files is True.
Defaults to current directory.
complete : bool (optional)
Whether to fill out the :class:`pyuvdata.UVData` object with its requisite
data arrays, and check if it's all consistent.
check_kw : dict (optional)
A dictionary of keyword arguments to pass to the :func:`uvdata.UVData.check`
method after object creation. Caution: turning off critical checks can
result in a UVData object that cannot be written to a file.
kwargs : dictionary
Any additional valid :class:`pyuvdata.UVData` attribute to assign to object.
Returns
-------
:class:`pyuvdata.UVData`
Initialized based on keywords, with a zeroed data_array, no flags and
nsample_array of all ones.
"""
arrfile = antenna_layout_filepath is not None
outfile = output_layout_filename is not None
if array_layout is None and antenna_layout_filepath is None:
raise ValueError(
"Either array_layout or antenna_layout_filepath must be passed."
)
if (
array_layout is None or isinstance(array_layout, str) or write_files
) and path_out is None:
path_out = "."
if write_files:
if not outfile:
if not arrfile:
output_layout_filename = "antenna_layout.csv"
else:
output_layout_filename = os.path.basename(antenna_layout_filepath)
outfile = True
# Increment name appropriately:
output_layout_filepath = os.path.join(path_out, output_layout_filename)
output_layout_filename = os.path.basename(
check_file_exists_and_increment(output_layout_filepath)
)
if output_yaml_filename is None:
output_yaml_filename = "obsparam.yaml"
output_yaml_filename = check_file_exists_and_increment(
os.path.join(path_out, output_yaml_filename)
)
# Copying original file to new place, if it exists
if antenna_layout_filepath is not None and os.path.exists(
antenna_layout_filepath
):
shutil.copyfile(
antenna_layout_filepath, os.path.join(path_out, output_layout_filename)
)
antenna_numbers = None
if isinstance(array_layout, dict):
antenna_numbers = np.fromiter(array_layout.keys(), dtype=int)
antpos_enu = array_layout.values()
if antenna_names is None:
antenna_names = antenna_numbers.astype("str")
if write_files:
_write_layout_csv(
output_layout_filepath, antpos_enu, antenna_names, antenna_numbers
)
if array_layout is None:
if write_files and output_layout_filename is not None:
array_layout = output_layout_filename
else:
array_layout = antenna_layout_filepath
freq_params = {
"Nfreqs": Nfreqs,
"start_freq": start_freq,
"bandwidth": bandwidth,
"freq_array": freq_array,
"channel_width": channel_width,
}
time_params = {
"Ntimes": Ntimes,
"start_time": start_time,
"integration_time": integration_time,
"time_array": time_array,
}
selection_params = {
"bls": bls,
"redundant_threshold": redundant_threshold,
"antenna_nums": antenna_nums,
"no_autos": no_autos,
}
tele_params = {
"telescope_location": repr(tuple(telescope_location)),
"telescope_name": telescope_name,
"feed_array": feed_array,
"feed_angle": feed_angle,
"mount_type": mount_type,
}
layout_params = {
"antenna_names": antenna_names,
"antenna_numbers": antenna_numbers,
"array_layout": array_layout,
}
ordering_params = {
"conjugation_convention": conjugation_convention,
"blt_order": blt_order,
}
freq_params = {k: v for k, v in freq_params.items() if v is not None}
time_params = {k: v for k, v in time_params.items() if v is not None}
selection_params = {k: v for k, v in selection_params.items() if v is not None}
tele_params = {k: v for k, v in tele_params.items() if v is not None}
layout_params = {k: v for k, v in layout_params.items() if v is not None}
ordering_params = {k: v for k, v in ordering_params.items() if v is not None}
valid_param_names = [getattr(UVData(), param).name for param in UVData()]
extra_kwds = {k: v for k, v in kwargs.items() if k in valid_param_names}
# Convert str polarization array to int.
if polarization_array is not None and type(polarization_array[0]) is not int:
polarization_array = np.array(uvutils.polstr2num(polarization_array))
if output_yaml_filename is None:
output_yaml_filename = ""
param_dict = {
"time": time_params,
"freq": freq_params,
"select": selection_params,
"ordering": ordering_params,
"telescope": tele_params,
"config_path": path_out,
"polarization_array": polarization_array,
}
param_dict.update(**extra_kwds)
if write_files:
tele_params["array_layout"] = antenna_layout_filepath
param_dict["telescope"] = tele_params
writeable_param_dict = copy.deepcopy(param_dict)
if polarization_array is not None:
writeable_param_dict["polarization_array"] = polarization_array.tolist()
with open(output_yaml_filename, "w") as yfile:
yaml.dump(writeable_param_dict, yfile, default_flow_style=False)
param_dict["obs_param_file"] = os.path.basename(output_yaml_filename)
param_dict["telescope"].update(layout_params)
uv_obj = initialize_uvdata_from_params(
param_dict, return_beams=False, check_kw=check_kw
)
if complete:
_complete_uvdata(uv_obj, inplace=True)
return uv_obj
[docs]def uvdata_to_telescope_config(
uvdata_in,
beam_filepath,
layout_csv_name=None,
telescope_config_name=None,
return_names=False,
path_out=".",
):
"""
Make telescope parameter files from a :class:`pyuvdata.UVData` object.
Makes both a telescope_config file and a layout_csv file:
* telescope_config: YAML file with telescope_location and telescope_name
The beam list is spoofed, since that information cannot be found in a UVData object.
* layout_csv: tab separated value file giving ENU antenna positions.
Beam ID is spoofed as well.
See https://pyuvsim.readthedocs.io/en/latest/parameter_files.html for details.
Parameters
----------
uvdata_in : pyuvdata.UVData
UVData object for which to make config files.
path_out : str
Target directory for the config files.
beam_filepath : str
Path to a beamfits file.
layout_csv_name : str
The name for the antenna positions.
Default <telescope_name>_layout.csv, where <telescope_name> is uvdata_in.telescope_name
telescope_config_name : str
The name for the telescope config file
Default telescope_config_<telescope_name>.yaml
return_names : bool
Return the file names. Default is False. Used in tests.
Returns
-------
tuple
if return_names, returns (path, telescope_config_name, layout_csv_name)
"""
tel_name = uvdata_in.telescope.name
antpos_enu = uvdata_in.telescope.get_enu_antpos()
ant_names = uvdata_in.telescope.antenna_names
ant_numbers = uvdata_in.telescope.antenna_numbers
tel_lla_deg = uvdata_in.telescope.location_lat_lon_alt_degrees
# fix formating issue for numpy types in numpy>2.0
tel_lla_deg = tuple(np.asarray(tel_lla_deg).tolist())
if telescope_config_name is None:
telescope_config_path = check_file_exists_and_increment(
os.path.join(path_out, f"telescope_config_{tel_name}.yaml")
)
telescope_config_name = os.path.basename(telescope_config_path)
if layout_csv_name is None:
layout_csv_path = check_file_exists_and_increment(
os.path.join(path_out, tel_name + "_layout.csv")
)
layout_csv_name = os.path.basename(layout_csv_path)
_write_layout_csv(
os.path.join(path_out, layout_csv_name), antpos_enu, ant_names, ant_numbers
)
# create a nearly empty beam object, only defining the filepath and
# mount_type for the yaml dumper
beam = UVBeam()
beam.filename = [beam_filepath]
beam.mount_type = uvdata_in.telescope.mount_type[0]
# Write the rest to a yaml file.
yaml_dict = {
"telescope_name": tel_name,
"telescope_location": repr(tel_lla_deg),
"beam_paths": {0: beam},
}
with open(os.path.join(path_out, telescope_config_name), "w+") as yfile:
yaml.safe_dump(yaml_dict, yfile, default_flow_style=False)
logger.info(
f"Path: {path_out}, telescope_config: {telescope_config_name}, layout: {layout_csv_name}"
)
if return_names:
return path_out, telescope_config_name, layout_csv_name
[docs]def uvdata_to_config_file(
uvdata_in,
param_filename=None,
telescope_config_name="",
layout_csv_name="",
catalog="mock",
path_out=".",
):
"""
Extract simulation configuration settings from a UVData object.
When used with :func:`~uvdata_to_telescope_config`, this will produce all the necessary
configuration yaml and csv file to make an "empty" :class:`pyuvdata.UVData` object comparable to
the argument `uvdata_in`. The generated file will match `uvdata_in` in frequency, time,
antenna positions, and uvw coordinates. Note that it may be different in the
baseline conjugation convention and the ordering of the baseline-time axis,
but those can be modified using the :meth:`pyuvdata.UVData.conjugate_bls` and
:meth:`pyuvdata.UVData.reorder_blt` methods.
Parameters
----------
uvdata_in: :class:`pyuvdata.UVData`
UVData object for which to make config files.
param_filename: str
output param file name, defaults to obsparam_#.yaml.
telescope_config_name: str
Name of telescope configuration yaml file. Defaults to blank string.
layout_csv_name: str
Name of antenna layout csv file. Defaults to blank string.
catalog: str
Path to catalog file. Defaults to 'mock'.
path_out: str
Where to put config files.
"""
if param_filename is None:
param_filename = check_file_exists_and_increment(
os.path.join(path_out, "obsparam.yaml")
)
param_filename = os.path.basename(param_filename)
tdict = time_array_to_params(
time_array=uvdata_in.time_array, integration_time=uvdata_in.integration_time
)
fdict = freq_array_to_params(
freq_array=uvdata_in.freq_array, channel_width=uvdata_in.channel_width
)
param_dict = {
"time": tdict,
"freq": fdict,
"sources": {"catalog": catalog},
"telescope": {
"telescope_config_name": telescope_config_name,
"array_layout": layout_csv_name,
},
"filing": {"outdir": ".", "outfile_name": "", "outfile_prefix": ""},
# Add this in to prevent warnings and make it explicit, these are just
# the default values.
"ordering": {
"conjugation_convention": "ant1<ant2",
"blt_order": ["time", "baseline"],
},
}
if catalog == "mock":
param_dict["sources"]["mock_arrangement"] = "zenith"
with open(os.path.join(path_out, param_filename), "w") as yfile:
yaml.safe_dump(param_dict, yfile, default_flow_style=False)