# -*- mode: python; coding: utf-8 -*
# Copyright (c) 2018 Radio Astronomy Software Group
# Licensed under the 3-clause BSD License
"""
Functions for setting up simulations based on input config files.
This provides a configuration interface of human-readable yaml and csv files to specify
all the required simulation parameters. This configuration interface is used by other
simulators as well.
This module contains methods to create configuration files from :class:`pyuvdata.UVData`
objects and empty :class:`pyuvdata.UVData` objects from configuration files.
"""
import ast
import copy
import logging
import os
import shutil
import warnings
import astropy.units as units
import numpy as np
import pyradiosky
import yaml
from astropy.coordinates import (
ICRS,
AltAz,
Angle,
EarthLocation,
Latitude,
Longitude,
SkyCoord,
)
from astropy.time import Time
from packaging import version # packaging is installed with setuptools
from pyradiosky import SkyModel
from pyuvdata import UVData
from pyuvdata import utils as uvutils
from pyuvsim.data import DATA_PATH as SIM_DATA_PATH
from .analyticbeam import AnalyticBeam
from .telescope import BeamList
try:
import astropy_healpix
except ImportError:
astropy_healpix = None
try:
import analytic_diffuse
except ImportError:
analytic_diffuse = None
try:
from . import mpi
from .mpi import get_rank
except ImportError:
[docs] def get_rank():
"""Mock to prevent import errors."""
return 0
mpi = None
try:
from lunarsky import LunarTopo, MoonLocation
from lunarsky import SkyCoord as LunarSkyCoord
hasmoon = True
except ImportError:
hasmoon = False
from .utils import check_file_exists_and_increment
logger = logging.getLogger(__name__)
def _parse_layout_csv(layout_csv):
"""
Interpret the layout csv file.
Columns in the file provide, in order from left to right, the antenna name, antenna number,
a beam ID number, and the antenna positions relative to the telescope location in
east, north, up (ENU) in meters.
See https://pyuvsim.readthedocs.io/en/latest/parameter_files.html for more details.
Parameters
----------
layout_csv : str
Filename of a layout csv.
"""
with open(layout_csv, "r") as fhandle:
header = fhandle.readline()
header = [h.strip() for h in header.split()]
str_format_code = "U"
# get data types for each column
dtypes = {
"name": str_format_code + "10",
"number": "i4",
"beamid": "i4",
"e": "f8",
"n": "f8",
"u": "f8",
}
# check columns in file
lower_header = [col.lower() for col in header]
columns = ["name", "number", "beamid", "e", "n", "u"]
col_exist = [col for col in columns if col in lower_header]
dt = np.format_parser([dtypes[col] for col in col_exist], col_exist, header)
return np.genfromtxt(layout_csv, autostrip=True, skip_header=1, dtype=dt.dtype)
def _write_layout_csv(
filepath, antpos_enu, antenna_names, antenna_numbers, beam_ids=None
):
"""
Write a layout csv file.
Columns in the file provide, in order from left to right, the antenna name, antenna number,
a beam ID number, and the antenna positions relative to the telescope location in
east, north, up (ENU) in meters.
See https://pyuvsim.readthedocs.io/en/latest/parameter_files.html for more details.
Parameters
----------
filepath : str
Filename to save the file to.
antpos_enu : array_like of float
East, North, Up positions of antennas in meters relative to the telescope location.
antenna_names : array_like of str
Names of antennas.
antenna_numbers : array_like of int
Antenna numbers.
beam_ids : array_like of int
Beam ID numbers to associate each antenna with a beam from a BeamList object.
Defaults to all zeros (all antennas have the same beam) if nothing is passed in.
"""
col_width = max(len(name) for name in antenna_names)
header = ("{:" + str(col_width) + "} {:8} {:8} {:10} {:10} {:10}\n").format(
"Name", "Number", "BeamID", "E", "N", "U"
)
if beam_ids is None:
beam_ids = np.zeros(len(antenna_names), dtype=int)
with open(filepath, "w") as lfile:
lfile.write(header + "\n")
for i, (e, n, u) in enumerate(antpos_enu):
beam_id = beam_ids[i]
name = antenna_names[i]
num = antenna_numbers[i]
line = ("{:{}} {:8d} {:8d} {:10.4f} {:10.4f} {:10.4f}\n").format(
name, col_width, num, beam_id, e, n, u
)
lfile.write(line)
def _config_str_to_dict(config_str):
"""
Read a yaml file into a dictionary, also add the file path info to the dictionary.
Parameters
----------
config_str : str
Filename of a configuration yaml file to read.
"""
with open(config_str, "r") as pfile:
param_dict = yaml.safe_load(pfile)
config_str = os.path.abspath(config_str)
param_dict["config_path"] = os.path.dirname(config_str)
param_dict["obs_param_file"] = os.path.basename(config_str)
return param_dict
def _create_catalog_diffuse(
map_nside, diffuse_model, diffuse_params, time, array_location
):
"""
Make a :class:`pyradiosky.SkyModel` object from an analog diffuse map function.
Only used for internal testing, should not be called by users.
Parameters
----------
map_nside : int
HEALPix nside of desired map.
diffuse_model : str
Name of diffuse model.
diffuse_params : dict
Dictionary of parameters corresponding to the `diffuse_model`.
time : :class:`astropy.time.Time` object
Time to use in creating map (to assign RA/Decs based on Alt/Az).
array_location : :class:`astropy.coordinates.EarthLocation` or :class:`lunarsky.MoonLocation`
Location to use in creating map (to assign RA/Decs based on Alt/Az).
Returns
-------
catalog : class:`pyradiosky.SkyModel`
SkyModel object containing the diffuse map.
icrs_coord : :class:`astropy.coordinates.SkyCoord`
Astropy SkyCoord object containing the coordinates for the pixels in ICRS.
alts : array_like of float
Altitudes of the pixels in radians.
azs : array_like of float
Azimuths of the pixels in radians.
fluxes : array_like of float
Brightness temperature of the pixels in K.
"""
# Make the map, calculate AltAz positions of pixels, evaluate the model.
npix = 12 * map_nside**2
pixinds = np.arange(npix)
hpix = astropy_healpix.HEALPix(nside=map_nside, frame=ICRS())
icrs_coord = hpix.healpix_to_skycoord(pixinds)
if hasmoon and isinstance(array_location, MoonLocation):
localframe = LunarTopo(location=array_location, obstime=time)
icrs_coord = LunarSkyCoord(icrs_coord)
else:
localframe = AltAz(location=array_location, obstime=time)
source_coord = icrs_coord.transform_to(localframe)
alts = source_coord.alt.rad
azs = source_coord.az.rad
modfunc = analytic_diffuse.get_model(diffuse_model)
fluxes = modfunc(source_coord.az.rad, source_coord.zen.rad, **diffuse_params)
stokes = np.zeros((4, 1, npix))
stokes[0, :] = fluxes
stokes *= units.K
catalog = SkyModel(
stokes=stokes,
nside=map_nside,
hpx_inds=pixinds,
spectral_type="flat",
frame="icrs",
)
return catalog, icrs_coord, alts, azs, fluxes
def _create_catalog_discrete(Nsrcs, alts, azs, fluxes, time, array_location):
"""
Make a :class:`pyradiosky.SkyModel` object from a set of point sources.
Only used for internal testing, should not be called by users.
Parameters
----------
Nsrcs : int
Number of sources
alts : array_like of float
Altitudes or Latitudes (depending on array_location) of the sources in radians.
azs : array_like of float
Azimuths or Longitues (depending on array_location) of the sources in radians.
fluxes : array_like of float
Brightnesses of the sources in Jy.
time : :class:`astropy.time.Time` object
Time to use in defining the positions.
array_location : :class:`astropy.coordinates.EarthLocation` or :class:`lunarsky.MoonLocation`
Location to use in defining the positions.
Returns
-------
catalog : class:`pyradiosky.SkyModel`
SkyModel object containing the diffuse map.
icrs_coord : :class:`astropy.coordinates.SkyCoord`
Astropy SkyCoord object containing the coordinates for the sources in ICRS.
"""
if hasmoon and isinstance(array_location, MoonLocation):
localframe = LunarTopo
coord_class = LunarSkyCoord
else:
localframe = AltAz
coord_class = SkyCoord
source_coord = coord_class(
alt=Angle(alts, unit=units.deg),
az=Angle(azs, unit=units.deg),
obstime=time,
frame=localframe,
location=array_location,
)
icrs_coord = source_coord.transform_to("icrs")
names = np.array(["src" + str(si) for si in range(Nsrcs)])
stokes = np.zeros((4, 1, Nsrcs))
stokes[0, :] = fluxes
stokes *= units.Jy
catalog = SkyModel(
name=names, skycoord=icrs_coord, stokes=stokes, spectral_type="flat"
)
return catalog, icrs_coord
[docs]def create_mock_catalog(
time,
arrangement="zenith",
array_location=None,
Nsrcs=None,
alt=None,
save=False,
min_alt=None,
rseed=None,
return_data=False,
diffuse_model=None,
diffuse_params=None,
map_nside=None,
):
"""
Create a mock catalog.
SkyModels are defined in an AltAz frame at the given time, then returned in
ICRS ra/dec coordinates. They always have a 'flat' spectral type.
Parameters
----------
time: float or :class:`astropy.time.Time` object
Julian date, if a float it is interpreted as a JD.
arrangement: str
Point source pattern (default = 1 source at zenith).
Accepted arrangements:
* `triangle`: Three point sources forming a triangle around the zenith
* `cross`: An asymmetric cross
* `zenith`: Some number of sources placed at the zenith.
* `off-zenith`: A single source off zenith
* `long-line`: Horizon to horizon line of point sources
* `hera_text`: Spell out HERA around the zenith
* `random`: Randomly distributed point sources near zenith
* `diffuse`: Choice of solved models in the analytic_diffuse module.
Nsrcs: int
Number of sources to make (used for zenith, off-zenith, long-line, and random arrangements).
array_location: :class:`astropy.coordinates.EarthLocation`
Location of observer. Source positions are defined with respect to a particular zenith.
Can also be a :class:`lunarsky.MoonLocation` object to make catalogs for
observers on the Moon.
Default = HERA site
alt: float
For off-zenith and triangle arrangements, altitude to place sources.
In degrees.
min_alt: float
For random and long-line arrangements, minimum altitude at which to place sources.
In degrees.
save: bool
Save mock catalog as npz file.
rseed: int
If using the random configuration, pass in a RandomState seed.
return_data: bool
If True, return a :class:`~.SkyModelData` object instead of
:class:`pyradiosky.SkyModel`.
diffuse_model: str
If arrangement is 'diffuse', name of the diffuse model to generate.
See documentation in `analytic_diffuse.models` for details.
diffuse_params: dict
If arrangement is 'diffuse', extra parameters accepted by the chosen model.
See documentation in `analytic_diffuse.models` for details.
map_nside: int
If arrangement is 'diffuse', nside parameter for the healpix map to make.
Returns
-------
:class:`pyradiosky.SkyModel` or :class:`~.SkyModelData`
The catalog, as either a SkyModel or a SkyModelData (if `return_data` is True)
dict
A dictionary of keywords used to define the catalog.
Notes
-----
Generating diffuse models requires the `analytic_diffuse` and `astropy_healpix` modules.
"""
if isinstance(time, float):
time = Time(time, scale="utc", format="jd")
if array_location is None:
array_location = EarthLocation(
lat="-30d43m17.5s", lon="21d25m41.9s", height=1073.0
)
if arrangement not in [
"off-zenith",
"zenith",
"cross",
"triangle",
"long-line",
"hera_text",
"random",
"diffuse",
]:
raise KeyError("Invalid mock catalog arrangement: " + str(arrangement))
mock_keywords = {
"time": time.jd,
"arrangement": arrangement,
"array_location": repr(
(
array_location.lat.deg,
array_location.lon.deg,
array_location.height.value,
)
),
}
if hasmoon and isinstance(array_location, MoonLocation):
mock_keywords["world"] = "moon"
else:
mock_keywords["world"] = "earth"
if arrangement == "off-zenith":
if alt is None:
alt = 85.0 # Degrees
mock_keywords["alt"] = alt
Nsrcs = 1
alts = [alt]
azs = [90.0] # 0 = North pole, 90. = East pole
fluxes = [1.0]
if arrangement == "triangle":
Nsrcs = 3
if alt is None:
alt = 87.0 # Degrees
mock_keywords["alt"] = alt
alts = [alt, alt, alt]
azs = [0.0, 120.0, 240.0]
fluxes = [1.0, 1.0, 1.0]
if arrangement == "cross":
Nsrcs = 4
alts = [88.0, 90.0, 86.0, 82.0]
azs = [270.0, 0.0, 90.0, 135.0]
fluxes = [5.0, 4.0, 1.0, 2.0]
if arrangement == "zenith":
if Nsrcs is None:
Nsrcs = 1
mock_keywords["Nsrcs"] = Nsrcs
alts = np.ones(Nsrcs) * 90.0
azs = np.zeros(Nsrcs, dtype=float)
fluxes = np.ones(Nsrcs) * 1 / Nsrcs
# Divide total Stokes I intensity among all sources
# Test file has Stokes I = 1 Jy
if arrangement == "random":
if Nsrcs is None:
Nsrcs = 1
if min_alt is None:
min_alt = 30 # Degrees
mock_keywords["Nsrcs"] = Nsrcs
np.random.seed(seed=rseed)
mock_keywords["rseed"] = np.random.get_state()[1][0]
# Necessary to get uniform distribution per solid angle.
min_alt_rad = np.radians(min_alt)
rv = np.random.uniform(-1, np.cos(min_alt_rad + np.pi / 2), Nsrcs)
alts = np.degrees(np.arccos(rv) - np.pi / 2)
azs = np.degrees(np.random.uniform(0, 2 * np.pi, Nsrcs))
fluxes = np.ones(Nsrcs, dtype=float)
if arrangement == "long-line":
if Nsrcs is None:
Nsrcs = 10
if min_alt is None:
min_alt = 5
mock_keywords["Nsrcs"] = Nsrcs
mock_keywords["alt"] = alt
fluxes = np.ones(Nsrcs, dtype=float)
if Nsrcs % 2 == 0:
length = 180 - min_alt * 2
spacing = length / (Nsrcs - 1)
max_alt = 90.0 - spacing / 2
alts = np.linspace(min_alt, max_alt, Nsrcs // 2)
alts = np.append(alts, np.flip(alts, axis=0))
azs = np.append(
np.zeros(Nsrcs // 2, dtype=float) + 180.0,
np.zeros(Nsrcs // 2, dtype=float),
)
else:
alts = np.linspace(min_alt, 90, (Nsrcs + 1) // 2)
alts = np.append(alts, np.flip(alts[1:], axis=0))
azs = np.append(
np.zeros((Nsrcs + 1) // 2, dtype=float) + 180.0,
np.zeros((Nsrcs - 1) // 2, dtype=float),
)
if arrangement == "hera_text":
# fmt: off
azs = np.array([-254.055, -248.199, -236.310, -225.000, -206.565,
-153.435, -123.690, -111.801, -105.945, -261.870,
-258.690, -251.565, -135.000, -116.565, -101.310,
-98.130, 90.000, 90.000, 90.000, 90.000, 90.000,
-90.000, -90.000, -90.000, -90.000, -90.000,
-90.000, 81.870, 78.690, 71.565, -45.000, -71.565,
-78.690, -81.870, 74.055, 68.199, 56.310, 45.000,
26.565, -26.565, -45.000, -56.310, -71.565])
zas = np.array([7.280, 5.385, 3.606, 2.828, 2.236, 2.236, 3.606,
5.385, 7.280, 7.071, 5.099, 3.162, 1.414, 2.236,
5.099, 7.071, 7.000, 6.000, 5.000, 3.000, 2.000,
1.000, 2.000, 3.000, 5.000, 6.000, 7.000, 7.071,
5.099, 3.162, 1.414, 3.162, 5.099, 7.071, 7.280,
5.385, 3.606, 2.828, 2.236, 2.236, 2.828, 3.606, 6.325])
# fmt: on
alts = 90.0 - zas
Nsrcs = alts.size
fluxes = np.ones_like(alts)
if arrangement == "diffuse":
if analytic_diffuse is None or astropy_healpix is None:
raise ValueError(
"analytic_diffuse and astropy_healpix "
"modules are required to use diffuse mock catalog."
)
if diffuse_model is None:
raise ValueError(
"Diffuse arrangement selected, but diffuse model not chosen."
)
if diffuse_params is None:
diffuse_params = {}
if map_nside is None:
warnings.warn("No nside chosen. Defaulting to 32")
map_nside = 32
mock_keywords["diffuse_model"] = diffuse_model
mock_keywords["map_nside"] = map_nside
mock_keywords["diffuse_params"] = repr(diffuse_params)
catalog, icrs_coord, alts, azs, fluxes = _create_catalog_diffuse(
map_nside, diffuse_model, diffuse_params, time, array_location
)
else:
catalog, icrs_coord = _create_catalog_discrete(
Nsrcs, alts, azs, fluxes, time, array_location
)
if return_data:
catalog = SkyModelData(catalog)
if get_rank() == 0 and save:
np.savez(
"mock_catalog_" + arrangement,
ra=icrs_coord.ra.rad,
dec=icrs_coord.dec.rad,
alts=alts,
azs=azs,
fluxes=fluxes,
)
return catalog, mock_keywords
[docs]class SkyModelData:
"""
Carries immutable SkyModel data in simple ndarrays.
This is to facilitate sharing SkyModel objects in MPI, without
excessive copying and memory bloat.
When used with MPI, this can be initialized simultaneously on all
processes such that sky_in is provided only on the root process.
The data can then be shared across all processes by running the `share`
method.
Parameters
----------
sky_in: :class:`pyradiosky.SkyModel`
A valid SkyModel object.
filename : str or list of str, optional
The filename (or other string identifier) of the input catalog. This overrides
the filename set on the sky_in object (if it has one). If not set, this defaults
to the filename set on the sky_in object (if it has one)
"""
filename = None
pyradiosky_version_str = None
Ncomponents = None
component_type = None
spectral_type = None
ra = None
dec = None
name = None
Nfreqs = None
stokes_I = None # noqa should be all lower case
stokes_Q = None # noqa should be all lower case
stokes_U = None # noqa should be all lower case
stokes_V = None # noqa should be all lower case
freq_array = None
freq_edge_array = None
reference_frequency = None
spectral_index = None
polarized = None
nside = None
hpx_inds = None
flux_unit = None
put_in_shared = [
"stokes_I",
"stokes_Q",
"stokes_U",
"stokes_V",
"polarized",
"ra",
"dec",
"reference_frequency",
"spectral_index",
"hpx_inds",
]
def __init__(self, sky_in=None, filename=None):
# Collect relevant attributes.
if sky_in is not None:
if sky_in.name is not None:
self.name = np.asarray(sky_in.name)
else:
self.name = None
self.nside = sky_in.nside
self.hpx_inds = sky_in.hpx_inds
self.component_type = sky_in.component_type
self.spectral_type = sky_in.spectral_type
self.Ncomponents = sky_in.Ncomponents
if sky_in.frame != "icrs":
sky_in.transform_to(ICRS)
sky_ra, sky_dec = sky_in.get_lon_lat()
self.ra = sky_ra.deg
self.dec = sky_dec.deg
self.Nfreqs = sky_in.Nfreqs
stokes_in = sky_in.stokes
if isinstance(stokes_in, units.Quantity):
if stokes_in.unit.is_equivalent("Jy"):
stokes_in = stokes_in.to_value("Jy")
self.flux_unit = "Jy"
elif stokes_in.unit.is_equivalent("K"):
stokes_in = stokes_in.to_value("K")
self.flux_unit = "K"
self.stokes_I = stokes_in[0, ...]
if sky_in._n_polarized > 0:
self.polarized = sky_in._polarized
Q, U, V = stokes_in[1:, :, sky_in._polarized]
self.stokes_Q = Q
self.stokes_U = U
self.stokes_V = V
if sky_in._freq_array.required:
self.freq_array = sky_in.freq_array.to("Hz").value
if (
version.parse(pyradiosky.__version__) > version.parse("0.2")
and sky_in.spectral_type == "subband"
and sky_in.freq_edge_array is not None
):
self.freq_edge_array = sky_in.freq_edge_array.to("Hz").value
if sky_in._reference_frequency.required:
self.reference_frequency = sky_in.reference_frequency.to("Hz").value
if sky_in._spectral_index.required:
self.spectral_index = sky_in.spectral_index
self.pyradiosky_version_str = sky_in.pyradiosky_version_str
if filename is not None:
if isinstance(filename, str):
filename_use = [filename]
else:
filename_use = filename
self.filename = filename_use
elif sky_in is not None and sky_in.filename is not None:
self.filename = sky_in.filename
[docs] def subselect(self, inds):
"""
Subselect, returning a new SkyModelData object.
Parameters
----------
inds: range or index array
Indices to select along the component axis.
Returns
-------
SkyModelData
A new SkyModelData with Ncomp axes downselected.
Notes
-----
If inds is a range object, this method will avoid copying data in numpy arrays,
such that the returned SkyModelData object carries views into the current object's arrays.
"""
new_sky = SkyModelData()
new_sky.Ncomponents = len(inds)
new_sky.nside = self.nside
new_sky.component_type = self.component_type
if self.name is not None:
new_sky.name = self.name[inds]
if isinstance(inds, range):
new_sky.stokes_I = self.stokes_I[:, slice(inds.start, inds.stop, inds.step)]
else:
new_sky.stokes_I = self.stokes_I[:, inds]
new_sky.ra = self.ra[inds]
new_sky.dec = self.dec[inds]
new_sky.Nfreqs = self.Nfreqs
new_sky.spectral_type = self.spectral_type
new_sky.flux_unit = self.flux_unit
if self.reference_frequency is not None:
new_sky.reference_frequency = self.reference_frequency[inds]
if self.spectral_index is not None:
new_sky.spectral_index = self.spectral_index[inds]
if self.freq_array is not None:
new_sky.freq_array = self.freq_array
if self.freq_edge_array is not None:
new_sky.freq_edge_array = self.freq_edge_array
if self.hpx_inds is not None:
new_sky.hpx_inds = self.hpx_inds[inds]
if self.polarized is not None:
sub_inds = np.in1d(self.polarized, inds)
new_sky.stokes_Q = self.stokes_Q[..., sub_inds]
new_sky.stokes_U = self.stokes_U[..., sub_inds]
new_sky.stokes_V = self.stokes_V[..., sub_inds]
new_sky.polarized = np.where(np.in1d(inds, self.polarized))[0]
return new_sky
[docs] def share(self, root=0):
"""
Share across MPI processes (requires mpi4py to use).
All attributes are put in shared memory.
Parameters
----------
root : int
Root rank on COMM_WORLD, from which data will be broadcast.
"""
if mpi is None:
raise ImportError(
"You need mpi4py to use this method. "
"Install it by running pip install pyuvsim[sim] "
"or pip install pyuvsim[all] if you also want the "
"line_profiler installed."
)
mpi.start_mpi()
self.Ncomponents = mpi.world_comm.bcast(self.Ncomponents, root=root)
# Get list of attributes that are set.
isset = None
if mpi.rank == root:
isset = [key for key, value in self.__dict__.items() if value is not None]
isset = mpi.world_comm.bcast(isset, root=root)
for key in isset:
attr = getattr(self, key)
if key in self.put_in_shared:
val = mpi.shared_mem_bcast(attr, root=root)
else:
val = mpi.world_comm.bcast(attr, root=root)
if val is not None:
setattr(self, key, val)
mpi.world_comm.Barrier()
[docs] def get_skymodel(self, inds=None):
"""
Initialize :class:`pyradiosky.SkyModel` from current settings.
Parameters
----------
inds : range or index array
Indices to select along the component axis.
"""
if inds is not None:
obj = self.subselect(inds)
return obj.get_skymodel()
stokes_use = np.zeros((4, self.Nfreqs, self.Ncomponents), dtype=float)
stokes_use[0, ...] = self.stokes_I
if self.polarized is not None:
stokes_use[1, :, self.polarized] = self.stokes_Q.T
stokes_use[2, :, self.polarized] = self.stokes_U.T
stokes_use[3, :, self.polarized] = self.stokes_V.T
if self.flux_unit is not None:
stokes_use *= units.Unit(self.flux_unit)
other = {}
if self.spectral_type in ["full", "subband"]:
other["freq_array"] = self.freq_array * units.Hz
if self.freq_edge_array is not None:
other["freq_edge_array"] = self.freq_edge_array * units.Hz
if self.spectral_type == "spectral_index":
other["reference_frequency"] = self.reference_frequency * units.Hz
other["spectral_index"] = self.spectral_index
if self.component_type == "healpix":
other["nside"] = self.nside
other["hpx_inds"] = self.hpx_inds
other["frame"] = "icrs"
else:
other["name"] = self.name
other["skycoord"] = SkyCoord(
ra=Longitude(self.ra, unit="deg"),
dec=Latitude(self.dec, unit="deg"),
frame="icrs",
)
other["filename"] = self.filename
return SkyModel(stokes=stokes_use, spectral_type=self.spectral_type, **other)
def _sky_select_calc_rise_set(sky, source_params, telescope_lat_deg=None):
"""
Apply flux and non-rising source cuts, calculate rise and set lsts.
Parameters
----------
source_params : dict
Dict specifying flux cut and horizon buffer parameters.
telescope_lat_deg : float
Latitude of telescope in degrees, used for horizon calculations.
"""
if telescope_lat_deg is not None:
telescope_lat = Latitude(telescope_lat_deg * units.deg)
sky.cut_nonrising(telescope_lat)
calc_kwargs = {}
if "horizon_buffer" in source_params:
calc_kwargs["horizon_buffer"] = float(source_params["horizon_buffer"])
sky.calculate_rise_set_lsts(telescope_lat, **calc_kwargs)
min_brightness = None
if "min_flux" in source_params:
min_brightness = float(source_params["min_flux"]) * units.Jy
max_brightness = None
if "max_flux" in source_params:
max_brightness = float(source_params["max_flux"]) * units.Jy
sky.select(min_brightness=min_brightness, max_brightness=max_brightness)
return sky
[docs]def initialize_catalog_from_params(
obs_params, input_uv=None, filetype=None, return_catname=None
):
"""
Make catalog from parameter file specifications.
Default behavior is to do coarse horizon cuts.
Parameters
----------
obs_params: str or dict
Either an obsparam file name or a dictionary of parameters.
input_uv: :class:`pyuvdata.UVData`
Used to set location for mock catalogs and for horizon cuts.
filetype : str
One of ['skyh5', 'gleam', 'vot', 'text', 'hdf5'] or None.
If None, the code attempts to guess what the file type is.
return_catname: bool
Return the catalog name. Defaults to True currently but will default to False
in version 1.4.
Returns
-------
sky: :class:`pyradiosky.SkyModel`
Source catalog filled with data.
source_list_name: str
Catalog identifier for metadata. Only returned if return_catname is True.
"""
if return_catname is None:
warnings.warn(
"The return_catname parameter currently defaults to True, but "
"starting in version 1.4 it will default to False.",
DeprecationWarning,
)
return_catname = True
if input_uv is not None and not isinstance(input_uv, UVData):
raise TypeError("input_uv must be UVData object")
if isinstance(obs_params, str):
with open(obs_params, "r") as pfile:
param_dict = yaml.safe_load(pfile)
param_dict["config_path"] = os.path.dirname(obs_params)
else:
param_dict = obs_params
source_params = param_dict["sources"]
if "catalog" in source_params:
catalog = source_params["catalog"]
else:
raise KeyError("No catalog defined.")
if catalog == "mock":
mock_keywords = {"arrangement": source_params["mock_arrangement"]}
extra_mock_kwds = [
"time",
"Nsrcs",
"zen_ang",
"save",
"min_alt",
"array_location",
"diffuse_model",
"diffuse_params",
"map_nside",
]
for k in extra_mock_kwds:
if k in source_params.keys():
if k == "array_location":
# String -- lat, lon, alt in degrees
latlonalt = [float(s.strip()) for s in source_params[k].split(",")]
lat, lon, alt = latlonalt
mock_keywords[k] = EarthLocation.from_geodetic(lon, lat, alt)
else:
mock_keywords[k] = source_params[k]
# time, arrangement, array_location, save, Nsrcs, min_alt
if "array_location" not in mock_keywords:
if input_uv is not None:
mock_keywords["array_location"] = EarthLocation.from_geocentric(
*input_uv.telescope_location, unit="m"
)
else:
warnings.warn(
"No array_location specified. Defaulting to the HERA site."
)
if "time" not in mock_keywords:
if input_uv is not None:
mock_keywords["time"] = input_uv.time_array[0]
warnings.warn(
"No julian date given for mock catalog. Defaulting to first time step."
)
else:
raise ValueError(
"input_uv must be supplied if using mock catalog without specified julian date"
)
time = mock_keywords.pop("time")
sky, mock_keywords = create_mock_catalog(time, **mock_keywords)
mock_keyvals = [str(key) + str(val) for key, val in mock_keywords.items()]
source_list_name = "mock_" + "_".join(mock_keyvals)
elif isinstance(catalog, str):
if not os.path.isfile(catalog):
catalog = os.path.join(param_dict["config_path"], catalog)
allowed_read_params = [
"spectral_type",
"table_name",
"id_column",
"ra_column",
"dec_column",
"lon_column",
"lat_column",
"frame",
"flux_columns",
"reference_frequency",
"freq_array",
"spectral_index_column",
]
read_params = {}
if filetype is not None:
read_params["filetype"] = filetype
elif "filetype" in source_params:
read_params["filetype"] = source_params["filetype"]
for param, value in source_params.items():
if param in allowed_read_params:
read_params[param] = value
detect_gleam = filetype == "gleam" or (
os.path.splitext(catalog)[1] == ".vot" and "gleam" in catalog.casefold()
)
if detect_gleam and "spectral_type" not in source_params.keys():
warnings.warn(
"No spectral_type specified for GLEAM, using 'flat'. In version 1.4 "
"this default will change to 'subband' to match pyradiosky's default.",
DeprecationWarning,
)
read_params["spectral_type"] = "flat"
source_list_name = os.path.basename(catalog)
sky = SkyModel.from_file(catalog, **read_params)
if input_uv is not None:
telescope_lat_deg = input_uv.telescope_location_lat_lon_alt_degrees[0]
else:
telescope_lat_deg = None
# Do source selections, if any.
sky = _sky_select_calc_rise_set(
sky, source_params, telescope_lat_deg=telescope_lat_deg
)
# If the filename parameter is None (e.g. for mock skies)
# add the source_list_name to the object so it can be put in the UVData history later
if sky.filename is None:
sky.filename = source_list_name
if return_catname:
return sky, source_list_name
else:
return sky
def _check_uvbeam_file(beam_file):
if not os.path.exists(beam_file):
testdata_beam_file = os.path.join(SIM_DATA_PATH, beam_file)
if os.path.exists(testdata_beam_file):
beam_file = testdata_beam_file
else:
raise ValueError(f"Unrecognized beam file or model: {beam_file}")
return beam_file
def _construct_beam_list(beam_ids, telconfig, freq_range=None, force_check=False):
"""Construct BeamList."""
beam_list = []
uvb_read_kwargs = {}
# possible global shape options
if "diameter" in telconfig.keys() or "sigma" in telconfig.keys():
warnings.warn(
"Beam shape options diameter and sigma should be specified per beamID "
"in the 'beam_paths' section not as globals. For examples see the "
"parameter_files documentation. This will become an error in version 1.4",
DeprecationWarning,
)
for beamID in beam_ids:
beam_model = telconfig["beam_paths"][beamID]
if not isinstance(beam_model, (str, dict)):
raise ValueError(
"Beam model is not properly specified in telescope config file."
)
if isinstance(beam_model, str):
warnings.warn(
"Entries in 'beam_paths' can no longer be specified as simple strings, "
"they must be parsed as dicts. For examples see the parameter_files "
"documentation. This will become an error in version 1.4",
DeprecationWarning,
)
if (
isinstance(beam_model, str)
and beam_model not in AnalyticBeam.supported_types
):
# this is the path to a UVBeam readable file
beam_file = _check_uvbeam_file(beam_model)
beam_list.append(beam_file)
elif isinstance(beam_model, dict) and "filename" in beam_model:
# this a UVBeam readable file with (possibly) some read kwargs
beam_file = beam_model["filename"]
beam_file = beam_model.pop("filename")
read_kwargs = beam_model
read_kwargs = {}
for key, value in beam_model.items():
if key != "filename":
read_kwargs[key] = value
if len(read_kwargs) > 0:
uvb_read_kwargs[beamID] = read_kwargs
beam_file = _check_uvbeam_file(beam_file)
beam_list.append(beam_file)
else:
if isinstance(beam_model, str):
beam_type = beam_model
elif "type" in beam_model:
beam_type = beam_model["type"]
else:
raise ValueError(
"Beam model must have either a 'filename' field for UVBeam "
"files or a 'type' field for analytic beams."
)
if beam_type not in AnalyticBeam.supported_types:
raise ValueError(f"Undefined beam model type: {beam_type}")
this_beam_opts = {}
if isinstance(beam_model, dict):
for key in beam_model:
if key != "type":
this_beam_opts[key] = beam_model[key]
# Gaussian beam requires either diameter or sigma
# Airy beam requires diameter
# Default to None for diameter and sigma.
# Values in the "beam_paths" override globally-defined options.
shape_opts = {"diameter": None, "sigma": None}
for opt in shape_opts.keys():
shape_opts[opt] = this_beam_opts.get(opt, None)
if all(v is None for v in shape_opts.values()):
for opt in shape_opts.keys():
shape_opts[opt] = telconfig.get(opt, None)
diameter = shape_opts.pop("diameter")
sigma = shape_opts.pop("sigma")
if beam_type == "uniform":
beam_model = "analytic_uniform"
if beam_type == "gaussian":
if diameter is not None:
beam_model = "_".join(
["analytic_gaussian", "diam=" + str(diameter)]
)
elif sigma is not None:
beam_model = "_".join(["analytic_gaussian", "sig=" + str(sigma)])
else:
raise KeyError(
"Missing shape parameter for gaussian beam (diameter or sigma)."
)
if beam_type == "airy":
if diameter is not None:
beam_model = "_".join(["analytic_airy", "diam=" + str(diameter)])
else:
raise KeyError("Missing diameter for airy beam.")
beam_list.append(beam_model)
select = telconfig.pop("select", {})
if freq_range is not None:
select["freq_range"] = freq_range
bl_options = {}
if "spline_interp_opts" in telconfig.keys():
bl_options["spline_interp_opts"] = telconfig["spline_interp_opts"]
if "freq_interp_kind" in telconfig.keys():
bl_options["freq_interp_kind"] = telconfig["freq_interp_kind"]
beam_list_obj = BeamList(
beam_list=beam_list,
select_params=select,
uvb_read_kwargs=uvb_read_kwargs,
force_check=force_check,
**bl_options,
)
return beam_list_obj
[docs]def parse_telescope_params(
tele_params, config_path="", freq_range=None, force_beam_check=False
):
"""
Parse the "telescope" section of obsparam.
Parameters
----------
tele_params : dict
Telescope parameters
See pyuvsim documentation for allowable keys.
https://pyuvsim.readthedocs.io/en/latest/parameter_files.html#telescope-configuration
config_path : str
Path to directory holding configuration and layout files.
freq_range : float
If given, select frequencies on reading the beam.
force_beam_check : bool
The beam consistency check is only possible for object-beams (not string mode).
If `force_beam_check` is True, it will convert beams to object-mode to force
the check to run (then convert back to string mode).
Returns
-------
param_dict : dict
Parameters related to the telescope and antenna layout, to be included in the
:class:`pyuvdata.UVData` object.
* `Nants_data`: Number of antennas
* `Nants_telescope`: Number of antennas
* `antenna_names`: list of antenna names
* `antenna_numbers`: corresponding list of antenna numbers
* `antenna_positions`: Array of ECEF antenna positions
* `telescope_location`: ECEF array center location
* `telescope_location_lat_lon_alt`: Lat Lon Alt array center location
* `telescope_config_file`: Path to configuration yaml file
* `antenna_location_file`: Path to csv layout file
* `telescope_name`: observatory name
* `x_orientation`: physical orientation of the dipole labelled as "x"
* `world`: Either "earth" or "moon" (requires lunarsky).
beam_list : :class:`pyuvsim.BeamList`
This is a BeamList object in string mode.
The strings provide either paths to beamfits files or the specifications to make
a :class:`pyuvsim.AnalyticBeam`.
beam_dict : dict
Antenna numbers to beam indices
"""
tele_params = copy.deepcopy(tele_params)
# check for telescope config
tele_config = "telescope_config_name" in tele_params
if tele_config:
# parse telescope config
if not os.path.isdir(config_path):
config_path = os.path.dirname(config_path)
if not os.path.isdir(config_path):
raise ValueError(f"config_path {config_path} is not a directory")
telescope_config_name = tele_params["telescope_config_name"]
if not os.path.exists(telescope_config_name):
telescope_config_name = os.path.join(config_path, telescope_config_name)
if not os.path.exists(telescope_config_name):
raise ValueError("telescope_config_name file from yaml does not exist")
with open(telescope_config_name, "r") as yf:
telconfig = yaml.safe_load(yf)
telescope_location_latlonalt = ast.literal_eval(telconfig["telescope_location"])
world = telconfig.pop("world", None)
# get the lunar ellipsoid. Default to None for earth or SPHERE for moon
if world == "moon":
ellipsoid = telconfig.pop("ellipsoid", "SPHERE")
else:
ellipsoid = telconfig.pop("ellipsoid", None)
tele_params["telescope_name"] = telconfig["telescope_name"]
else:
# if not provided, get bare-minumum keys from tele_params
if "telescope_location" not in tele_params:
raise KeyError(
"If telescope_config_name not provided in `telescope` obsparam section, "
"you must provide telescope_location"
)
if "telescope_name" not in tele_params:
raise KeyError(
"If telescope_config_name not provided in `telescope` obsparam section, "
"you must provide telescope_name"
)
telescope_location_latlonalt = tele_params["telescope_location"]
if isinstance(telescope_location_latlonalt, str):
telescope_location_latlonalt = ast.literal_eval(
telescope_location_latlonalt
)
world = tele_params.pop("world", None)
# get the lunar ellipsoid. Default to None for earth or SPHERE for moon
if world == "moon":
ellipsoid = tele_params.pop("ellipsoid", "SPHERE")
else:
ellipsoid = tele_params.pop("ellipsoid", None)
lat_rad = telescope_location_latlonalt[0] * np.pi / 180.0
long_rad = telescope_location_latlonalt[1] * np.pi / 180.0
alt = telescope_location_latlonalt[2]
if world == "moon":
frame = "mcmf"
elif world == "earth" or world is None:
frame = "itrs"
else:
raise ValueError(f"Invalid world {world}")
tele_params["telescope_location"] = uvutils.XYZ_from_LatLonAlt(
latitude=lat_rad,
longitude=long_rad,
altitude=alt,
frame=frame,
ellipsoid=ellipsoid,
)
telescope_name = tele_params["telescope_name"]
# get array layout
if "array_layout" not in tele_params:
raise KeyError("array_layout must be provided.")
array_layout = tele_params.pop("array_layout")
if not isinstance(array_layout, (str, dict)):
raise ValueError(
"array_layout must be a string or have options that parse as a dict."
)
if isinstance(array_layout, str):
# Interpet as file path to layout csv file.
layout_csv = array_layout
# if array layout is a str, parse it as .csv filepath
if isinstance(layout_csv, str):
if not os.path.exists(layout_csv) and isinstance(config_path, str):
layout_csv = os.path.join(config_path, layout_csv)
if not os.path.exists(layout_csv):
raise ValueError(
f"layout_csv file {layout_csv} from yaml does not exist"
)
ant_layout = _parse_layout_csv(layout_csv)
E, N, U = ant_layout["e"], ant_layout["n"], ant_layout["u"]
antnames = ant_layout["name"]
antnums = np.array(ant_layout["number"])
elif isinstance(array_layout, dict):
# Receiving antenna positions directly
antnums = tele_params.pop("antenna_numbers")
antnames = tele_params.pop("antenna_names")
E, N, U = np.array([array_layout[an] for an in antnums]).T
layout_csv = "user-fed dict"
# fill in outputs with just array info
return_dict = {}
beam_list = BeamList([])
beam_dict = {}
return_dict["Nants_data"] = antnames.size
return_dict["Nants_telescope"] = antnames.size
return_dict["antenna_names"] = np.array(antnames.tolist())
return_dict["antenna_numbers"] = np.array(antnums)
antpos_enu = np.vstack((E, N, U)).T
return_dict["antenna_positions"] = (
uvutils.ECEF_from_ENU(
antpos_enu,
latitude=lat_rad,
longitude=long_rad,
altitude=alt,
frame=frame,
ellipsoid=ellipsoid,
)
- tele_params["telescope_location"]
)
if world is not None:
return_dict["world"] = world
return_dict["telescope_frame"] = frame
return_dict["ellipsoid"] = ellipsoid
return_dict["array_layout"] = layout_csv
return_dict["telescope_location"] = np.asarray(tele_params["telescope_location"])
return_dict["telescope_location_lat_lon_alt"] = np.asarray(
telescope_location_latlonalt
)
return_dict["telescope_name"] = telescope_name
if not tele_config:
# if no info on beams, just return what we have
beam_dict = {n: 0 for n in antnames}
return return_dict, beam_list, beam_dict
# if provided, parse sections related to beam files and types
return_dict["telescope_config_name"] = telescope_config_name
beam_ids = ant_layout["beamid"]
beam_dict = {}
for beamID in np.unique(beam_ids):
which_ants = antnames[np.where(beam_ids == beamID)]
for a in which_ants:
beam_dict[a] = beamID
beam_list = _construct_beam_list(
np.unique(beam_ids),
telconfig,
freq_range=freq_range,
force_check=force_beam_check,
)
return return_dict, beam_list, beam_dict
[docs]def parse_frequency_params(freq_params):
"""
Parse the "freq" section of obsparam.
Parameters
----------
freq_params : dict
Dictionary of frequency parameters.
See pyuvsim documentation for examples of allowable key combinations.
https://pyuvsim.readthedocs.io/en/latest/parameter_files.html#frequency
Returns
-------
dict
Dictionary of :class:`pyuvdata.UVData` parameters related to frequency:
* `channel_width`: (dtype float, ndarray, shape=(Nfreqs)) Frequency channel
widths in Hz
* `Nfreqs`: (int) Number of frequencies
* `freq_array`: (dtype float, ndarray, shape=(Nfreqs)) Frequency channel
centers in Hz
"""
freq_keywords = [
"freq_array",
"start_freq",
"end_freq",
"Nfreqs",
"channel_width",
"bandwidth",
]
fa, sf, ef, nf, cw, bw = [fk in freq_params for fk in freq_keywords]
kws_used = ", ".join(sorted(freq_params.keys()))
freq_params = copy.deepcopy(freq_params)
if fa:
freq_arr = np.asarray(freq_params["freq_array"])
freq_params["Nfreqs"] = freq_arr.size
if "channel_width" not in freq_params:
if freq_params["Nfreqs"] > 1:
freq_params["channel_width"] = np.diff(freq_arr)[0]
if not np.allclose(np.diff(freq_arr), freq_params["channel_width"]):
raise ValueError(
"Channel width must be specified if spacing in freq_array is uneven."
)
else:
raise ValueError(
"Channel width must be specified if freq_array has length 1"
)
else:
if not (sf or ef):
raise ValueError(
"Either start or end frequency must be specified: " + kws_used
)
if cw:
if np.asarray(freq_params["channel_width"]).size > 1:
raise ValueError(
"channel_width must be a scalar if freq_array is not specified"
)
if not nf:
if not cw:
raise ValueError(
"Either channel_width or Nfreqs "
" must be included in parameters:" + kws_used
)
if sf and ef:
freq_params["bandwidth"] = (
freq_params["end_freq"]
- freq_params["start_freq"]
+ freq_params["channel_width"]
)
bw = True
if bw:
Nfreqs = float(freq_params["bandwidth"] / freq_params["channel_width"])
freq_params["Nfreqs"] = Nfreqs
else:
raise ValueError(
"Either bandwidth or band edges must be specified: " + kws_used
)
if not cw:
if not bw:
raise ValueError(
"Either bandwidth or channel width must be specified: " + kws_used
)
freq_params["channel_width"] = freq_params["bandwidth"] / float(
freq_params["Nfreqs"]
)
if not bw:
freq_params["bandwidth"] = (
freq_params["channel_width"] * freq_params["Nfreqs"]
)
bw = True
if not sf:
if ef and bw:
freq_params["start_freq"] = (
freq_params["end_freq"]
- freq_params["bandwidth"]
+ freq_params["channel_width"]
)
if not ef:
if sf and bw:
freq_params["end_freq"] = (
freq_params["start_freq"]
+ freq_params["bandwidth"]
- freq_params["channel_width"]
)
if not np.isclose(freq_params["Nfreqs"] % 1, 0):
raise ValueError(
"end_freq - start_freq must be evenly divisible by channel_width"
)
freq_params["Nfreqs"] = int(freq_params["Nfreqs"])
freq_arr = np.linspace(
freq_params["start_freq"],
freq_params["end_freq"] + freq_params["channel_width"],
freq_params["Nfreqs"],
endpoint=False,
)
chan_width = np.atleast_1d(freq_params["channel_width"])
if chan_width.size < freq_params["Nfreqs"] and chan_width.size == 1:
chan_width = np.full((freq_params["Nfreqs"],), chan_width, dtype=float)
return_dict = {}
return_dict["Nfreqs"] = freq_params["Nfreqs"]
return_dict["freq_array"] = freq_arr
return_dict["channel_width"] = chan_width
return_dict["Nspws"] = 1
return return_dict
[docs]def parse_time_params(time_params):
"""
Parse the "time" section of obsparam.
Parameters
----------
time_params : dict
Dictionary of time parameters
See pyuvsim documentation for examples of allowable key combinations.
https://pyuvsim.readthedocs.io/en/latest/parameter_files.html#time
Returns
-------
dict
Dictionary of :class:`pyuvdata.UVData` parameters related to time:
* `integration_time`: (float) Time array spacing in seconds.
* `Ntimes`: (int) Number of times
* `start_time`: (float) Starting time in Julian Date
* `time_array`: (dtype float, ndarray, shape=(Ntimes,)) Time step centers in JD.
"""
return_dict = {}
init_time_params = copy.deepcopy(time_params)
time_params = copy.deepcopy(time_params)
time_keywords = [
"time_array",
"start_time",
"end_time",
"Ntimes",
"integration_time",
"duration_hours",
"duration_days",
]
ta, st, et, nt, it, dh, dd = [tk in time_params for tk in time_keywords]
kws_used = ", ".join(sorted(time_params.keys()))
daysperhour = 1 / 24.0
hourspersec = 1 / 60.0**2
dayspersec = daysperhour * hourspersec
if ta:
# Time array is defined. Supersedes all other parameters:
time_arr = time_params["time_array"]
time_params["Ntimes"] = len(time_arr)
time_params["start_time"] = np.min(time_arr)
time_params["integration_time"] = np.mean(np.diff(time_arr))
else:
if not (st or et):
raise ValueError("Start or end time must be specified: " + kws_used)
if dh and not dd:
time_params["duration"] = time_params["duration_hours"] * daysperhour
dd = True
elif dd:
time_params["duration"] = time_params["duration_days"]
if not nt:
if not it:
raise ValueError(
"Either integration_time or Ntimes must be "
"included in parameters: " + kws_used
)
if st and et:
time_params["duration"] = (
time_params["end_time"]
- time_params["start_time"]
+ time_params["integration_time"] * dayspersec
)
dd = True
if dd:
time_params["Ntimes"] = int(
np.round(
time_params["duration"]
/ (time_params["integration_time"] * dayspersec)
)
)
else:
raise ValueError(
"Either duration or time bounds must be specified: " + kws_used
)
if not it:
if not dd:
raise ValueError(
"Either duration or integration time "
"must be specified: " + kws_used
)
# In seconds
time_params["integration_time"] = (
time_params["duration"] / dayspersec / float(time_params["Ntimes"])
)
inttime_days = time_params["integration_time"] * dayspersec
if not dd:
time_params["duration"] = inttime_days * (time_params["Ntimes"])
dd = True
if not st:
if et and dd:
time_params["start_time"] = (
time_params["end_time"] - time_params["duration"] + inttime_days
)
if not et:
if st and dd:
time_params["end_time"] = (
time_params["start_time"] + time_params["duration"] - inttime_days
)
time_arr = np.linspace(
time_params["start_time"],
time_params["end_time"] + inttime_days,
time_params["Ntimes"],
endpoint=False,
)
if time_params["Ntimes"] != 1:
if not np.allclose(
np.diff(time_arr),
inttime_days * np.ones(time_params["Ntimes"] - 1),
atol=dayspersec,
): # To nearest second
raise ValueError(
"Calculated time array is not consistent with set integration_time."
f"\nInput parameters are: {init_time_params}"
)
return_dict["integration_time"] = time_params["integration_time"]
return_dict["time_array"] = time_arr
return_dict["Ntimes"] = time_params["Ntimes"]
return_dict["start_time"] = time_params["start_time"]
return return_dict
[docs]def freq_array_to_params(freq_array):
"""
Get a set of parameters that can be used to generate a given frequency array.
Parameters
----------
freq_array : array of float
Frequencies in Hz.
Returns
-------
dict
Dictionary of frequency parameters consistent with freq_array.
(channel_width, Nfreqs, bandwidth, start_freq, end_freq)
See pyuvsim documentation for details:
https://pyuvsim.readthedocs.io/en/latest/parameter_files.html#frequency
"""
freq_array = np.asarray(freq_array).ravel()
fdict = {}
if freq_array.size < 2:
fdict["channel_width"] = 1.0
fdict["Nfreqs"] = 1
fdict["start_freq"] = freq_array.item(0)
return fdict
fdict["channel_width"] = np.diff(freq_array).item(0)
fdict["Nfreqs"] = freq_array.size
fdict["bandwidth"] = fdict["channel_width"] * fdict["Nfreqs"]
fdict["start_freq"] = freq_array.item(0)
fdict["end_freq"] = freq_array.item(-1)
return fdict
[docs]def time_array_to_params(time_array):
"""
Get a set of parameters that can be used to generate a given time array.
Returns a dictionary of parameters that can be used by parse_time_params
to obtain the time_array passed in.
Parameters
----------
time_array : array of float
Julian dates.
Returns
-------
dict
Dictionary of time parameters consistent with time_array.
(integration_time, Ntimes, duration_days, start_time, end_times)
See pyuvsim documentation for details:
https://pyuvsim.readthedocs.io/en/latest/parameter_files.html#time
"""
time_array = np.asarray(time_array)
Ntimes_uniq = np.unique(time_array).size
tdict = {}
if Ntimes_uniq < 2:
tdict["integration_time"] = 1.0
tdict["Ntimes"] = 1
tdict["start_time"] = time_array.item(0)
return tdict
dt = np.min(np.diff(time_array)).item()
if not np.allclose(np.diff(time_array), np.ones(time_array.size - 1) * dt):
tdict["time_array"] = time_array.tolist()
tdict["integration_time"] = dt * (24.0 * 3600.0)
tdict["Ntimes"] = Ntimes_uniq
tdict["duration_days"] = (
tdict["integration_time"] * tdict["Ntimes"] / (24.0 * 3600.0)
)
tdict["start_time"] = time_array.item(0)
tdict["end_time"] = time_array.item(-1)
return tdict
[docs]def subselect(uv_obj, param_dict):
"""Make sub-selection on a UVData object."""
# select on object
valid_select_keys = [
"antenna_nums",
"antenna_names",
"ant_str",
"bls",
"frequencies",
"freq_chans",
"times",
"blt_inds",
]
# downselect baselines (or anything that can be passed to pyuvdata's select method)
# Note: polarization selection is allowed here, but will cause an error if the incorrect pols
# are passed to pyuvsim.
if "select" in param_dict:
select_params = param_dict["select"]
no_autos = bool(select_params.pop("no_autos", False))
select_params = {
k: v for k, v in select_params.items() if k in valid_select_keys
}
if "antenna_nums" in select_params:
select_params["antenna_nums"] = list(
map(int, select_params["antenna_nums"])
)
redundant_threshold = param_dict["select"].get("redundant_threshold", None)
if "bls" in select_params:
bls = select_params["bls"]
if isinstance(bls, str):
# If read from file, this should be a string.
bls = ast.literal_eval(bls)
select_params["bls"] = bls
if len(select_params) > 0:
uv_obj.select(**select_params)
if no_autos:
uv_obj.select(ant_str="cross")
if redundant_threshold is not None:
uv_obj.compress_by_redundancy(tol=redundant_threshold)
[docs]def initialize_uvdata_from_params(
obs_params,
return_beams=None,
reorder_blt_kw=None,
check_kw=None,
force_beam_check=False,
):
"""
Construct a :class:`pyuvdata.UVData` object from parameters in a valid yaml file.
Sufficient information must be provided by the parameters to define time and frequency arrays
and verify the channel widths and time steps. This will error if insufficient or incompatible
parameters are defined.
The parameter dictionary may contain any valid :class:`pyuvdata.UVData` attributes as well.
If the polarization array is not specified, it defaults to (XX, XY, YX, YY).
Parameters
----------
obs_params : dict or str
Either an obs_param file name or a dictionary of parameters read in.
Additional :class:`pyuvdata.UVData` parameters may be passed in through here.
return_beams : bool
Option to return the beam_list and beam_dict. Currently defaults to True, but
will default to False starting in version 1.4.
reorder_blt_kw : dict (optional)
Keyword arguments to send to the ``uvdata.reorder_blts`` method at the end of
the setup process. Typical parameters include "order" and "minor_order". Default
values are ``order='time'`` and ``minor_order='baseline'``.
check_kw : dict (optional)
Keyword arguments to send to the ``uvdata.check()`` method at the end of the
setup process. Typical parameters include `run_check_acceptability` and
`check_extra`, which are both True by default.
force_beam_check : bool
The beam consistency check is only possible for object-beams (not string mode).
If `force_beam_check` is True, it will convert beams to object-mode to force
the check to run (then convert back to string mode).
Returns
-------
uv_obj : :class:`pyuvdata.UVData`
Initialized UVData object.
beam_list : :class:`pyuvsim.BeamList`
List of beam specifiers as strings, only returned if return_beams is True.
beam_dict : dict
Map of antenna numbers to index in beam_list, only returned if return_beams is
True.
"""
logger.info("Starting SimSetup")
if return_beams is None:
warnings.warn(
"The return_beams parameter currently defaults to True, but starting in"
"version 1.4 it will default to False.",
DeprecationWarning,
)
return_beams = True
uvparam_dict = {} # Parameters that will go into UVData
if isinstance(obs_params, str):
param_dict = _config_str_to_dict(obs_params) # Container for received settings.
else:
param_dict = copy.deepcopy(obs_params)
logger.info("Finished reading obsparams")
# Parse frequency structure
freq_dict = param_dict["freq"]
uvparam_dict.update(parse_frequency_params(freq_dict))
freq_array = uvparam_dict["freq_array"]
logger.info("Finished reading frequency info")
# Parse telescope parameters
tele_dict = param_dict["telescope"]
uvparam_dict.update(tele_dict)
beamselect = tele_dict.pop("select", {})
freq_buffer = beamselect.pop("freq_buffer", None)
if freq_buffer:
freq_range = (freq_array.min() - freq_buffer, freq_array.max() + freq_buffer)
else:
freq_range = None
tele_params, beam_list, beam_dict = parse_telescope_params(
tele_dict,
config_path=param_dict["config_path"],
freq_range=freq_range,
force_beam_check=force_beam_check,
)
uvparam_dict.update(tele_params)
uvparam_dict["x_orientation"] = beam_list.x_orientation
logger.info("Finished Setup of BeamList")
# Use extra_keywords to pass along required paths for file history.
extra_keywords = {}
if "obs_param_file" in param_dict:
extra_keywords["obsparam"] = param_dict["obs_param_file"]
if "telescope_config_name" in tele_dict and isinstance(
tele_dict["telescope_config_name"], str
):
extra_keywords["telecfg"] = tele_dict["telescope_config_name"]
if "array_layout" in tele_dict and isinstance(tele_dict["array_layout"], str):
extra_keywords["layout"] = tele_dict["array_layout"]
if "world" in tele_params:
extra_keywords["world"] = tele_params.get("world")
uvparam_dict["extra_keywords"] = extra_keywords
# Parse time structure
time_dict = param_dict["time"]
uvparam_dict.update(parse_time_params(time_dict))
logger.info("Finished Setup of Time Dict")
# There does not seem to be any way to get polarization_array into uvparam_dict, so
# let's add it explicitly.
if param_dict.get("polarization_array", None) is not None:
uvparam_dict["polarization_array"] = np.array(param_dict["polarization_array"])
# Parse polarizations
if uvparam_dict.get("polarization_array", None) is None:
uvparam_dict["polarization_array"] = np.array([-5, -6, -7, -8])
if "Npols" not in uvparam_dict:
uvparam_dict["Npols"] = len(uvparam_dict["polarization_array"])
uvparam_name = "cat_name"
if "cat_name" not in param_dict and "object_name" not in param_dict:
tloc = EarthLocation.from_geocentric(
*uvparam_dict["telescope_location"], unit="m"
)
time = Time(uvparam_dict["time_array"][0], scale="utc", format="jd")
src, _ = create_mock_catalog(time, arrangement="zenith", array_location=tloc)
if "sources" in param_dict:
source_file_name = os.path.basename(param_dict["sources"]["catalog"])
uvparam_dict[uvparam_name] = "{}_ra{:.4f}_dec{:.4f}".format(
source_file_name, src.ra.deg[0], src.dec.deg[0]
)
else:
uvparam_dict[uvparam_name] = "unprojected"
elif "object_name" in param_dict:
uvparam_dict[uvparam_name] = param_dict["object_name"]
else:
uvparam_dict[uvparam_name] = param_dict["cat_name"]
uv_obj = UVData()
uv_obj._set_future_array_shapes()
# Setting the frame and acceptable range for the moon
uv_obj._telescope_location.frame = uvparam_dict["telescope_frame"]
uv_obj._telescope_location.ellipsoid = uvparam_dict["ellipsoid"]
# use the __iter__ function on UVData to get list of UVParameters on UVData
valid_param_names = [getattr(uv_obj, param).name for param in uv_obj]
for k in valid_param_names:
if k in param_dict:
setattr(uv_obj, k, param_dict[k])
if k in uvparam_dict:
setattr(uv_obj, k, uvparam_dict[k])
logger.info("Initialized UVData object")
bls = np.array(
[
uv_obj.antnums_to_baseline(
uv_obj.antenna_numbers[j], uv_obj.antenna_numbers[i]
)
for i in range(0, uv_obj.Nants_data)
for j in range(i, uv_obj.Nants_data)
]
)
uv_obj.baseline_array = np.tile(bls, uv_obj.Ntimes)
uv_obj.Nbls = bls.size
uv_obj.time_array = np.repeat(uv_obj.time_array, uv_obj.Nbls)
uv_obj.integration_time = np.repeat(
uv_obj.integration_time, uv_obj.Nbls * uv_obj.Ntimes
)
uv_obj.Nblts = uv_obj.Nbls * uv_obj.Ntimes
uv_obj.blt_order = ("time", "ant1")
uv_obj.ant_1_array, uv_obj.ant_2_array = uv_obj.baseline_to_antnums(
uv_obj.baseline_array
)
logger.info(f"BLT-ORDER: {uv_obj.blt_order}")
cat_id = uv_obj._add_phase_center(
cat_name=uvparam_dict["cat_name"], cat_type="unprojected"
)
uv_obj.phase_center_id_array = np.full(uv_obj.Nblts, cat_id, dtype=int)
uv_obj.set_lsts_from_time_array()
# set the apparent coordinates on the object
uv_obj._set_app_coords_helper()
# add other required metadata to allow select to work without errors
# these will all be overwritten in uvsim._complete_uvdata, so it's ok to hardcode them here
logger.info("Set Metadata")
logger.info(f" Baseline Array: {uv_obj.baseline_array.nbytes / 1024**3:.2f} GB")
logger.info(f" Time Array: {uv_obj.time_array.nbytes / 1024**3:.2f} GB")
logger.info(f"Integration Time: {uv_obj.integration_time.nbytes / 1024**3:.2f} GB")
logger.info(f" Ant1Array: {uv_obj.ant_1_array.nbytes / 1024**3:.2f} GB")
uv_obj.set_lsts_from_time_array()
uv_obj.set_uvws_from_antenna_positions()
logger.info(f"BLT-ORDER: {uv_obj.blt_order}")
logger.info("Set UVWs")
uv_obj.history = ""
uv_obj.vis_units = "Jy"
uv_obj.instrument = uv_obj.telescope_name
uv_obj.spw_array = np.array([0])
uv_obj.flex_spw_id_array = np.full(uv_obj.Nfreqs, uv_obj.spw_array[0], dtype=int)
if uv_obj.Ntimes == 1:
uv_obj.integration_time = np.ones_like(
uv_obj.time_array, dtype=np.float64
) # Second
else:
# Note: currently only support a constant spacing of times
uv_obj.integration_time = (
np.ones_like(uv_obj.time_array, dtype=np.float64)
* np.diff(np.unique(uv_obj.time_array))[0]
* (24.0 * 60**2) # Seconds
)
subselect(uv_obj, param_dict)
logger.info("After Select")
# we construct uvdata objects in (time, ant1) order
# but the simulator will force (time, baseline) later
# so order this now so we don't get any warnings.
if reorder_blt_kw is None:
reorder_blt_kw = {"order": "time", "minor_order": "baseline"}
if reorder_blt_kw and reorder_blt_kw != {
"order": uv_obj.blt_order[0],
"minor_order": uv_obj.blt_order[1],
}:
uv_obj.reorder_blts(**reorder_blt_kw)
logger.info(f"BLT-ORDER: {uv_obj.blt_order}")
logger.info("After Re-order BLTS")
if check_kw is None:
check_kw = {}
uv_obj.check(**check_kw)
logger.info("After Check")
logger.info(f"BLT-ORDER: {uv_obj.blt_order}")
if return_beams:
return uv_obj, beam_list, beam_dict
else:
return uv_obj
def _complete_uvdata(uv_in, inplace=False, check_kw=None):
"""
Fill out all required parameters of a :class:`pyuvdata.UVData` object.
Ensure that it passes the :func:`pyuvdata.UVData.check()`.
This will overwrite existing data in `uv_in`!
Parameters
----------
uv_in : :class:`pyuvdata.UVData` instance
Usually an incomplete object, containing only metadata.
inplace : bool, optional
Whether to perform the filling on the passed object, or a copy.
check_kw : dict, optional
A dict of kwargs to pass to :func:`pyuvdata.UVData.check()`. If not provided,
will be an empty dict (i.e. default checks are run). If set to False, no checks
will be run at all.
Returns
-------
:class:`pyuvdata.UVData` : filled/completed object (if `inplace` is `True`, it is
the modified input). With zeroed data_array, no flags and nsample_array of all ones.
"""
if not inplace:
uv_obj = copy.deepcopy(uv_in)
else:
uv_obj = uv_in
# moved to have meta-data compatible objects from the init
# but retain this functionality for now.
# in most cases should be a no-op anyway.
if uv_obj.lst_array is None:
uv_obj.set_lsts_from_time_array()
# Clear existing data, if any.
_shape = (uv_obj.Nblts, uv_obj.Nfreqs, uv_obj.Npols)
uv_obj.data_array = np.zeros(_shape, dtype=complex)
uv_obj.flag_array = np.zeros(_shape, dtype=bool)
uv_obj.nsample_array = np.ones(_shape, dtype=float)
uv_obj.extra_keywords = {}
if check_kw is not False:
if check_kw is None:
check_kw = {}
uv_obj.check(**check_kw)
return uv_obj
[docs]def initialize_uvdata_from_keywords(
output_yaml_filename=None,
antenna_layout_filepath=None,
output_layout_filename=None,
array_layout=None,
telescope_location=None,
telescope_name=None,
Nfreqs=None,
start_freq=None,
bandwidth=None,
freq_array=None,
channel_width=None,
Ntimes=None,
integration_time=None,
start_time=None,
time_array=None,
bls=None,
antenna_nums=None,
antenna_names=None,
polarization_array=None,
no_autos=False,
redundant_threshold=None,
write_files=True,
path_out=None,
complete=False,
force_beam_check=False,
**kwargs,
):
"""
Initialize a :class:`pyuvdata.UVData` object from keyword arguments.
Optionally, write out the configuration to YAML and CSV files such that
:func:`~initialize_uvdata_from_params` will produce the same :class:`pyuvdata.UVData` object.
Parameters
----------
output_yaml_filename : str (optional)
Specify filename for yaml file to write out.
Defaults to obsparam.yaml
antenna_layout_filepath : str (optional)
Path to csv file of antenna positions (see documentation for details).
output_layout_filename : str (optional)
File name for antenna layout if writing files out.
If unspecified, defaults to the basename of the antenna_layout_filepath.
If that is not given, it defaults to "antenna_layout.csv"
Will not overwrite existing files.
array_layout : dictionary (required if antenna_layout_filepath not given)
keys are integer antenna numbers, values are len-3 antenna positions
in ENU coordinates [meters].
antenna_names : list of str (optional)
If unset, antenna names are assigned as "%s" % antnum.
telescope_location : array of float, shape (3,)
Telescope location on Earth in LatLonAlt coordinates [deg, deg, meters]
telescope_name : str
Name of telescope
Nfreqs : int
Number of frequency channels
start_freq : float
Starting frequency [Hz]
bandwidth : float
Total frequency bandwidth of spectral window [Hz]
channel_width: float
Frequency channel spacing [Hz]
freq_array : ndarray
frequency array [Hz], if this is specified, it supersedes Nfreqs,
start_freq, bandwidth
Ntimes : int
Number of integration bins
integration_time : float
Width of time bins [seconds]
start_time : float
Time of the first integration bin [Julian Date]
time_array : ndarray
time array [Julian Date]. If this is specified it supersedes values of Ntimes,
start_time and integration_time
bls : list
List of antenna-pair tuples for baseline selection
redundant_threshold: float
Redundant baseline selection tolerance for selection [meters]
antenna_nums : list
List of antenna numbers to keep in array
polarization_array : list
List of polarization strings (or ints) to insert into object
no_autos : bool
If True, eliminate all auto correlations
write_files : bool
If True, write out the parameter information to yaml files.
path_out : str (optional)
Path in which to place generated configuration files, if write_files is True.
Defaults to current directory.
complete : bool (optional)
Whether to fill out the :class:`pyuvdata.UVData` object with its requisite
data arrays, and check if it's all consistent.
force_beam_check : bool
The beam consistency check is only possible for object-beams (not string mode).
If `force_beam_check` is True, it will convert beams to object-mode to force
the check to run (then convert back to string mode).
kwargs : dictionary
Any additional valid :class:`pyuvdata.UVData` attribute to assign to object.
Returns
-------
:class:`pyuvdata.UVData`
Initialized based on keywords, with a zeroed data_array, no flags and
nsample_array of all ones.
"""
arrfile = antenna_layout_filepath is not None
outfile = output_layout_filename is not None
if array_layout is None and antenna_layout_filepath is None:
raise ValueError(
"Either array_layout or antenna_layout_filepath must be passed."
)
if array_layout is None or isinstance(array_layout, str) or write_files:
if path_out is None:
path_out = "."
if write_files:
if not outfile:
if not arrfile:
output_layout_filename = "antenna_layout.csv"
else:
output_layout_filename = os.path.basename(antenna_layout_filepath)
outfile = True
# Increment name appropriately:
output_layout_filepath = os.path.join(path_out, output_layout_filename)
output_layout_filename = os.path.basename(
check_file_exists_and_increment(output_layout_filepath)
)
if output_yaml_filename is None:
output_yaml_filename = "obsparam.yaml"
output_yaml_filename = check_file_exists_and_increment(
os.path.join(path_out, output_yaml_filename)
)
if antenna_layout_filepath is not None:
# Copying original file to new place, if it exists
if os.path.exists(antenna_layout_filepath):
shutil.copyfile(
antenna_layout_filepath,
os.path.join(path_out, output_layout_filename),
)
antenna_numbers = None
if isinstance(array_layout, dict):
antenna_numbers = np.fromiter(array_layout.keys(), dtype=int)
antpos_enu = array_layout.values()
if antenna_names is None:
antenna_names = antenna_numbers.astype("str")
if write_files:
_write_layout_csv(
output_layout_filepath, antpos_enu, antenna_names, antenna_numbers
)
if array_layout is None:
if write_files and output_layout_filename is not None:
array_layout = output_layout_filename
else:
array_layout = antenna_layout_filepath
freq_params = {
"Nfreqs": Nfreqs,
"start_freq": start_freq,
"bandwidth": bandwidth,
"freq_array": freq_array,
"channel_width": channel_width,
}
time_params = {
"Ntimes": Ntimes,
"start_time": start_time,
"integration_time": integration_time,
"time_array": time_array,
}
selection_params = {
"bls": bls,
"redundant_threshold": redundant_threshold,
"antenna_nums": antenna_nums,
"no_autos": no_autos,
}
tele_params = {
"telescope_location": repr(tuple(telescope_location)),
"telescope_name": telescope_name,
}
layout_params = {
"antenna_names": antenna_names,
"antenna_numbers": antenna_numbers,
"array_layout": array_layout,
}
freq_params = {k: v for k, v in freq_params.items() if v is not None}
time_params = {k: v for k, v in time_params.items() if v is not None}
selection_params = {k: v for k, v in selection_params.items() if v is not None}
tele_params = {k: v for k, v in tele_params.items() if v is not None}
layout_params = {k: v for k, v in layout_params.items() if v is not None}
uv_obj = UVData()
valid_param_names = [getattr(uv_obj, param).name for param in uv_obj]
extra_kwds = {k: v for k, v in kwargs.items() if k in valid_param_names}
# Convert str polarization array to int.
if polarization_array is not None:
if type(polarization_array[0]) is not int:
polarization_array = np.array(uvutils.polstr2num(polarization_array))
if output_yaml_filename is None:
output_yaml_filename = ""
param_dict = {
"time": time_params,
"freq": freq_params,
"select": selection_params,
"telescope": tele_params,
"config_path": path_out,
"polarization_array": polarization_array,
}
param_dict.update(**extra_kwds)
if write_files:
tele_params["array_layout"] = antenna_layout_filepath
param_dict["telescope"] = tele_params
writeable_param_dict = copy.deepcopy(param_dict)
if polarization_array is not None:
writeable_param_dict["polarization_array"] = polarization_array.tolist()
with open(output_yaml_filename, "w") as yfile:
yaml.dump(writeable_param_dict, yfile, default_flow_style=False)
param_dict["obs_param_file"] = os.path.basename(output_yaml_filename)
param_dict["telescope"].update(layout_params)
uv_obj = initialize_uvdata_from_params(
param_dict, return_beams=False, force_beam_check=force_beam_check
)
if complete:
_complete_uvdata(uv_obj, inplace=True)
return uv_obj
[docs]def uvdata_to_telescope_config(
uvdata_in,
beam_filepath,
layout_csv_name=None,
telescope_config_name=None,
return_names=False,
path_out=".",
):
"""
Make telescope parameter files from a :class:`pyuvdata.UVData` object.
Makes both a telescope_config file and a layout_csv file:
* telescope_config: YAML file with telescope_location and telescope_name
The beam list is spoofed, since that information cannot be found in a UVData object.
* layout_csv: tab separated value file giving ENU antenna positions.
Beam ID is spoofed as well.
See https://pyuvsim.readthedocs.io/en/latest/parameter_files.html for details.
Parameters
----------
uvdata_in : pyuvdata.UVData
UVData object for which to make config files.
path_out : str
Target directory for the config files.
beam_filepath : str
Path to a beamfits file.
layout_csv_name : str
The name for the antenna positions.
Default <telescope_name>_layout.csv, where <telescope_name> is uvdata_in.telescope_name
telescope_config_name : str
The name for the telescope config file
Default telescope_config_<telescope_name>.yaml
return_names : bool
Return the file names. Default is False. Used in tests.
Returns
-------
tuple
if return_names, returns (path, telescope_config_name, layout_csv_name)
"""
if telescope_config_name is None:
telescope_config_path = check_file_exists_and_increment(
os.path.join(path_out, "telescope_config_{uvdata_in.telescope_name}.yaml")
)
telescope_config_name = os.path.basename(telescope_config_path)
if layout_csv_name is None:
layout_csv_path = check_file_exists_and_increment(
os.path.join(path_out, uvdata_in.telescope_name + "_layout.csv")
)
layout_csv_name = os.path.basename(layout_csv_path)
antpos_enu, _ = uvdata_in.get_ENU_antpos()
_write_layout_csv(
os.path.join(path_out, layout_csv_name),
antpos_enu,
uvdata_in.antenna_names,
uvdata_in.antenna_numbers,
)
# Write the rest to a yaml file.
yaml_dict = {
"telescope_name": uvdata_in.telescope_name,
"telescope_location": repr(
tuple(uvdata_in.telescope_location_lat_lon_alt_degrees)
),
"Nants": uvdata_in.Nants_telescope,
"beam_paths": {0: {"filename": beam_filepath}},
}
with open(os.path.join(path_out, telescope_config_name), "w+") as yfile:
yaml.dump(yaml_dict, yfile, default_flow_style=False)
logger.info(
f"Path: {path_out}, telescope_config: {telescope_config_name}, layout: {layout_csv_name}"
)
if return_names:
return path_out, telescope_config_name, layout_csv_name
[docs]def uvdata_to_config_file(
uvdata_in,
param_filename=None,
telescope_config_name="",
layout_csv_name="",
catalog="mock",
path_out=".",
):
"""
Extract simulation configuration settings from a UVData object.
When used with :func:`~uvdata_to_telescope_config`, this will produce all the necessary
configuration yaml and csv file to make an "empty" :class:`pyuvdata.UVData` object comparable to
the argument `uvdata_in`. The generated file will match `uvdata_in` in frequency, time,
antenna positions, and uvw coordinates.
Parameters
----------
uvdata_in: :class:`pyuvdata.UVData`
UVData object for which to make config files.
param_filename: str
output param file name, defaults to obsparam_#.yaml.
telescope_config_name: str
Name of telescope configuration yaml file. Defaults to blank string.
layout_csv_name: str
Name of antenna layout csv file. Defaults to blank string.
catalog: str
Path to catalog file. Defaults to 'mock'.
path_out: str
Where to put config files.
"""
if param_filename is None:
param_filename = check_file_exists_and_increment(
os.path.join(path_out, "obsparam.yaml")
)
param_filename = os.path.basename(param_filename)
freq_array = uvdata_in.freq_array
time_array = uvdata_in.time_array
integration_time_array = np.array(uvdata_in.integration_time)
if np.max(integration_time_array) != np.min(integration_time_array):
warnings.warn(
"The integration time is not constant. Using the shortest integration time"
)
tdict = time_array_to_params(time_array)
fdict = freq_array_to_params(freq_array)
tdict.pop("time_array", None)
param_dict = {
"time": tdict,
"freq": fdict,
"sources": {"catalog": catalog},
"telescope": {
"telescope_config_name": telescope_config_name,
"array_layout": layout_csv_name,
},
"filing": {"outdir": ".", "outfile_name": "", "outfile_prefix": ""},
}
if catalog == "mock":
param_dict["sources"]["mock_arrangement"] = "zenith"
with open(os.path.join(path_out, param_filename), "w") as yfile:
yaml.dump(param_dict, yfile, default_flow_style=False)