Source code for pyuvsim.uvsim

# -*- mode: python; coding: utf-8 -*
# Copyright (c) 2018 Radio Astronomy Software Group
# Licensed under the 3-clause BSD License

"""
The primary machinery of pyuvsim.

The :class:`~UVTask` and :class:`~UVEngine` classes and the functions that actually run
the simulation.
"""

import warnings

import astropy.units as units
import numpy as np
import yaml
from astropy.constants import c as speed_of_light
from astropy.coordinates import EarthLocation
from astropy.time import Time
from astropy.units import Quantity
from pyuvdata import UVData

try:
    from lunarsky import MoonLocation
    from lunarsky import Time as LTime

    hasmoon = True
except ImportError:
    hasmoon = False

try:
    from . import mpi
except ImportError:  # pragma: no cover
    mpi = None
from . import simsetup
from . import utils as simutils
from .antenna import Antenna
from .baseline import Baseline
from .simsetup import SkyModelData
from .telescope import Telescope

__all__ = ["UVTask", "UVEngine", "uvdata_to_task_iter", "run_uvsim", "run_uvdata_uvsim"]


[docs]class UVTask: """ An object to hold all the information to calculate a visibility for a set of sources. Parameters ---------- sources : :class:`pyuvsim.simsetup.SkyModelData` The sources to include in the visibility. time : :class:`astropy.time.Time` object or float Time at which to calculate the visibility, either an astropy Time object or a float in Julian Day (will be converted to a Time object using `format='jd'`). freq : :class:`astropy.units.Quantity` or float Frequency at which to calculate the visibility, either an astropy Quantity with units compatible with Hz or a float in Hz (will be converted to an astropy Quantity with units of Hz) baseline : :class:`pyuvsim.Baseline` The baseline to calculate the visibility for. telescope : :class:`pyuvsim.Telescope` The telescope object this visibility belongs to, which carries the telescope location and beam information. freq_i : int Frequency index for this visibility's source coherency. Attributes ---------- sources : :class:`pyuvsim.simsetup.SkyModelData` The sources to include in the visibility. time : :class:`astropy.time.Time` Time at which to calculate the visibility. freq : :class:`astropy.units.Quantity` Frequency at which to calculate the visibility. baseline : :class:`pyuvsim.Baseline` The baseline to calculate the visibility for. telescope : :class:`pyuvsim.Telescope` The telescope object this visibility belongs to. freq_i : int Frequency index for this visibility's source coherency. Always `0` for flat spectrum sources, otherwise should be the index of `freq` in the full frequency array for this simulation. uvdata_index : tuple of int Indexes for this visibility location in the uvdata data_array, length 3 giving (blt_index, 0, frequency_index), where the zero index is for the old spw axis. visibility_vector : ndarray of complex The calculated visibility, shape (4,) ordered as [xx, yy, xy, yx]. """ def __init__(self, sources, time, freq, baseline, telescope, freq_i=0): self.time = time self.freq = freq self.sources = sources # SkyModel object self.baseline = baseline self.telescope = telescope self.freq_i = freq_i self.visibility_vector = None self.uvdata_index = None # Where to add the visibility in the uvdata object. if isinstance(self.time, float): self.time = Time(self.time, format="jd") if isinstance(self.freq, float): self.freq = self.freq * units.Hz if sources.spectral_type == "flat": self.freq_i = 0 def __repr__(self) -> str: """Make a nice representation.""" return ( f"UVTask<time: {self.time}, freq: {self.freq}, source names: " f"{self.sources.name}, baseline: {self.baseline.antenna1.name}-" f"{self.baseline.antenna2.name}, freq_i: {self.freq_i}>" ) def __eq__(self, other): """ Test for equality between two UVTask objects. Parameters ---------- other : :class:`UVTask` The other """ return ( np.isclose(self.time.jd, other.time.jd, atol=1e-4) and np.isclose(self.freq.value, other.freq.value, atol=1e-4) and (self.sources == other.sources) and (self.baseline == other.baseline) and (self.visibility_vector == other.visibility_vector) and (self.uvdata_index == other.uvdata_index) and (self.telescope == other.telescope) ) def __gt__(self, other): """ Check if this UVTask has a larger `uvdata_index` than another one. Parameters ---------- other : :class:`UVTask` The other """ blti0, fi0 = self.uvdata_index blti1, fi1 = other.uvdata_index if self.baseline == other.baseline: if fi0 == fi1: return blti0 > blti1 return fi0 > fi1 return self.baseline > other.baseline def __ge__(self, other): """ Check if this UVTask has a larger or equal `uvdata_index` than another one. Parameters ---------- other : :class:`UVTask` The other """ blti0, fi0 = self.uvdata_index blti1, fi1 = other.uvdata_index if self.baseline == other.baseline: if fi0 == fi1: return blti0 >= blti1 return fi0 >= fi1 return self.baseline >= other.baseline def __lt__(self, other): """ Check if this UVTask has a smaller `uvdata_index` than another one. Parameters ---------- other : :class:`UVTask` The other """ return not self.__ge__(other) def __le__(self, other): """ Check if this UVTask has a smaller or equal `uvdata_index` than another one. Parameters ---------- other : :class:`UVTask` The other """ return not self.__gt__(other)
[docs]class UVEngine: """ The object where the visibility calculations actually take place. Parameters ---------- task : :class:`~UVTask` The task to do the calculations for. reuse_spline : bool Option to reuse the spline in the beam interpolation to save time. update_positions : bool Flag indicating that positions need to be updated (when time or source changes). update beams : bool Flag indicating that beams need to be updated (when time, sources, frequency, or beam pair changes). Attributes ---------- reuse_spline : bool Option to reuse the spline in the beam interpolation to save time. update_positions : bool Flag indicating that positions need to be updated (when time or source changes). update_local_coherency : bool Flag indicating that local coherencies need to be updated (when time or source changes). update beams : bool Flag indicating that beams need to be updated (when time, sources, frequency, or beam pair changes). sources : :class:`pyuvsim.simsetup.SkyModelData` The sources to include in the visibility. current_time : :class:`astropy.time.Time` Time for the current calculation. current_freq : :class:`astropy.units.Quantity` Frequency for the current calculation, units compatible with Hz. current_beam_pair : 2-tuple of ints Tuple containing the beam ids for the current calculation. beam1_jones : array_like of float Jones matricies for the first beam for each source location, shape (2,2, Ncomponents). The first axis is feed, the second axis is vector component on the sky in az/za. beam2_jones : array_like of float Jones matricies for the second beam for each source location, shape (2,2, Ncomponents). The first axis is feed, the second axis is vector component on the sky in az/za. local_coherency : array_like of float Source local coherencies in alt/az basis, shape (2, 2, Nfreqs, Ncomponents). apparent_coherency : array_like of float Source apparent coherencies (including the beam response), shape (2, 2, Nfreqs, Ncomponents). task : :class:`~UVTask` The task currently being calculated. """ def __init__( self, task=None, update_positions=True, update_beams=True, reuse_spline=True ): self.reuse_spline = reuse_spline # Reuse spline fits in beam interpolation self.update_positions = update_positions self.update_beams = update_beams self.sources = None self.current_time = None self.current_freq = None self.current_beam_pair = None self.beam1_jones = None self.beam2_jones = None self.local_coherency = None self.apparent_coherency = None if task is not None: self.set_task(task)
[docs] def set_task(self, task): """ Set the task for the Engine. Parameters ---------- task : :class:`UVTask` The task to do the calculations for. """ self.task = task # These flags control whether quantities can be re-used from the last task: # Update local coherency for all frequencies when time or sources changes. # Update source positions when time or sources changes. # Update saved beam jones matrices when time, sources, frequency, or beam pair changes. baseline = self.task.baseline beam_pair = (baseline.antenna1.beam_id, baseline.antenna2.beam_id) if (not self.current_time == task.time.jd) or ( self.sources is not task.sources ): self.update_positions = True self.update_local_coherency = True self.update_beams = True else: self.update_beams = False self.update_positions = False self.update_local_coherency = False if not self.current_freq == task.freq.to("Hz").value: self.update_beams = True if not self.current_beam_pair == beam_pair: self.current_beam_pair = beam_pair self.update_beams = True self.current_time = task.time.jd self.current_freq = task.freq.to("Hz").value self.sources = task.sources
[docs] def apply_beam(self, beam_interp_check=True): """ Set apparent coherency from jones matrices and source coherency. beam_interp_check : bool Option to enable checking that the source positions are within the area covered by the beam. If all the az/za beams cover the full sky horizon to horizon this checking is turned off by default in `run_uvdata_uvsim`, otherwise it is turned on. Setting this to False can speed up simulations but if sources are simulated outside the beam area the response will be incorrect. This keyword only applies to beams that are regularly gridded in azimuth and zenith angle. """ beam1_id, beam2_id = self.current_beam_pair sources = self.task.sources baseline = self.task.baseline if sources.alt_az is None: sources.update_positions(self.task.time, self.task.telescope.location) if self.update_local_coherency: self.local_coherency = sources.coherency_calc() self.beam1_jones = baseline.antenna1.get_beam_jones( self.task.telescope, sources.alt_az[..., sources.above_horizon], self.task.freq, reuse_spline=self.reuse_spline, beam_interp_check=beam_interp_check, ) if beam1_id == beam2_id: self.beam2_jones = np.copy(self.beam1_jones) else: self.beam2_jones = baseline.antenna2.get_beam_jones( self.task.telescope, sources.alt_az[..., sources.above_horizon], self.task.freq, reuse_spline=self.reuse_spline, beam_interp_check=beam_interp_check, ) # coherency is a 2x2 matrix # [ |Ex|^2, Ex* Ey, Ey* Ex |Ey|^2 ] # where x and y vectors along the local alt/az axes. # Apparent coherency gives the direction and polarization dependent baseline response to # a source. coherency = self.local_coherency[:, :, self.task.freq_i, :] self.beam2_jones = np.swapaxes( self.beam2_jones, 0, 1 ).conj() # Transpose at each component self.apparent_coherency = np.einsum( "abz,bcz,cdz->adz", self.beam1_jones, coherency, self.beam2_jones )
[docs] def make_visibility(self): """Calculate visibility contribution from a set of source components.""" assert isinstance(self.task.freq, Quantity) srcs = self.task.sources time = self.task.time location = self.task.telescope.location if self.update_positions: with warnings.catch_warnings(): warnings.filterwarnings("ignore", message="The get_frame_attr_names") srcs.update_positions(time, location) if self.update_beams: self.apply_beam() pos_lmn = srcs.pos_lmn[..., srcs.above_horizon] # need to convert uvws from meters to wavelengths uvw_wavelength = ( self.task.baseline.uvw / speed_of_light * self.task.freq.to("1/s") ) fringe = np.exp(2j * np.pi * np.dot(uvw_wavelength, pos_lmn)) vij = self.apparent_coherency * fringe # Sum over source component axis: vij = np.sum(vij, axis=2) # Reshape to be [xx, yy, xy, yx] vis_vector = np.asarray([vij[0, 0], vij[1, 1], vij[0, 1], vij[1, 0]]) return vis_vector
def _make_task_inds(Nblts, Nfreqs, Nsrcs, rank, Npus): """ Make iterators defining task and sources computed on rank. The splitting depends on the relative sizes of the axes and the number of processing units. There are three possible ways the splitting is done: - if Npus < Nbltf -- Split by Nbltf, split sources in the task loop for memory's sake. - if Nbltf < Npus and Nsrcs > Npus -- Split by Nsrcs only - if (Nsrcs, Nbltf) < Npus -- Split by Nbltf Parameters ---------- Nblts : int Number of blts included in the sim. Nfreqs : int Number of frequencies included in the sim. Nsrcs : int Number of sources included in the sim. rank : int mpi rank. Npus : int Number of processing units. """ Nbltf = Nblts * Nfreqs split_srcs = False if (Nbltf < Npus) and (Npus < Nsrcs): split_srcs = True if split_srcs: src_inds, Nsrcs_local = simutils.iter_array_split(rank, Nsrcs, Npus) task_inds = range(Nbltf) Ntasks_local = Nbltf else: task_inds, Ntasks_local = simutils.iter_array_split(rank, Nbltf, Npus) src_inds = range(Nsrcs) Nsrcs_local = Nsrcs return task_inds, src_inds, Ntasks_local, Nsrcs_local
[docs]def uvdata_to_task_iter( task_ids, input_uv, catalog, beam_list, beam_dict, Nsky_parts=1 ): """ Generate UVTask objects. Parameters ---------- task_ids : range Task indices in the full flattened meshgrid of parameters. input_uv : :class:`pyuvdata.UVData` UVData object to be filled with data. catalog : :class:`pyuvsim.simsetup.SkyModelData` Source components. beam_list : :class:`pyuvsim.BeamList` BeamList carrying beam model (in object mode). beam_dict : dict Map of antenna numbers to index in beam_list. Yields ------ Iterable of UVTask objects. """ # The task_ids refer to tasks on the flattened meshgrid. if not isinstance(input_uv, UVData): raise TypeError("input_uv must be UVData object.") # Skymodel will now be passed in as a catalog array. if not isinstance(catalog, SkyModelData): raise TypeError("catalog must be a SkyModelData object.") # use future array shapes if not input_uv.future_array_shapes: input_uv.use_future_array_shapes() # Splitting the catalog for memory's sake. Nsrcs_total = catalog.Ncomponents if Nsky_parts > 1: Nsky_parts = int(Nsky_parts) src_iter = [ simutils.iter_array_split(s, Nsrcs_total, Nsky_parts)[0] for s in range(Nsky_parts) ] else: src_iter = [range(Nsrcs_total)] # Build the antenna list. antenna_names = input_uv.antenna_names antennas = [] antpos_enu, antnums = input_uv.get_ENU_antpos() for num, antname in enumerate(antenna_names): if beam_dict is None: beam_id = 0 else: beam_id = beam_dict[antname] antennas.append(Antenna(antname, num, antpos_enu[num], beam_id)) baselines = {} Nfreqs = input_uv.Nfreqs Nblts = input_uv.Nblts tasks_shape = (Nblts, Nfreqs) blt_ax, freq_ax = range(2) # we need to get slice or indices to index into the flat iterators # it does not accept range arguments as indices. if isinstance(task_ids, range): task_slice = slice(task_ids.start, task_ids.stop, task_ids.step) else: # There is a test where the input ids is an np.arange object # direct indexing should be fine. task_slice = task_ids # broadcast_to returns a view into the array # flat returns an iterator object. # no overhead from these calls. _times = np.broadcast_to( input_uv.time_array.reshape(-1, 1), (input_uv.Nblts, input_uv.Nfreqs) ).flat _bls = np.broadcast_to( input_uv.baseline_array.reshape(-1, 1), (input_uv.Nblts, input_uv.Nfreqs) ).flat _freqs = np.broadcast_to( input_uv.freq_array.flatten().reshape(1, -1), (input_uv.Nblts, input_uv.Nfreqs) ).flat # indexing with the slice should return a view, # but the iterator has to be collected into an array. # This should incure 64 * Ntasks_local bits per array # some additional overhead for numpy arrays as well # usually seeing [0.00001, 0.00003] MiB / task memory required # based on the reference simulations. order = np.lexsort((_bls[task_slice], _freqs[task_slice], _times[task_slice])) tloc = [np.float64(x) for x in input_uv.telescope_location] world = input_uv.extra_keywords.get("world", "earth") if world.lower() == "earth": location = EarthLocation.from_geocentric(*tloc, unit="m") elif world.lower() == "moon": if not hasmoon: raise ValueError("Need lunarsky module to simulate an array on the Moon.") location = MoonLocation.from_selenocentric(*tloc, unit="m") location.ellipsoid = input_uv._telescope_location.ellipsoid else: raise ValueError( "If world keyword is set, it must be either 'moon' or 'earth'." ) telescope = Telescope(input_uv.telescope_name, location, beam_list) freq_array = input_uv.freq_array * units.Hz if hasmoon and isinstance(location, MoonLocation): tclass = LTime else: tclass = Time time_array = tclass( input_uv.time_array, scale="utc", format="jd", location=location ) for src_i in src_iter: sky = catalog.get_skymodel(src_i) if ( sky.spectral_type == "flat" and sky.freq_array is None and sky.reference_frequency is None ): sky.freq_array = freq_array if sky.component_type == "healpix" and hasattr(sky, "healpix_to_point"): sky.healpix_to_point() if sky.spectral_type != "flat": sky.at_frequencies(freq_array) # Use flat here to get a 1D iterator for index in order.flat: task_index = task_ids[index] # Shape indicates slowest to fastest index. if not isinstance(task_index, tuple): task_index = np.unravel_index(task_index, tasks_shape) blt_i = task_index[blt_ax] freq_i = task_index[freq_ax] bl_num = input_uv.baseline_array[blt_i] # We reuse a lot of baseline info, so make the baseline list on the first go and reuse. if bl_num not in baselines.keys(): antnum1 = input_uv.ant_1_array[blt_i] antnum2 = input_uv.ant_2_array[blt_i] index1 = np.where(input_uv.antenna_numbers == antnum1)[0][0] index2 = np.where(input_uv.antenna_numbers == antnum2)[0][0] baselines[bl_num] = Baseline(antennas[index1], antennas[index2]) time = time_array[blt_i] bl = baselines[bl_num] freq = freq_array[freq_i] # 0 = spw axis task = UVTask(sky, time, freq, bl, telescope, freq_i) task.uvdata_index = (blt_i, freq_i) # 0 = spectral window index yield task del sky
def _check_ntasks_valid(Ntasks_tot): """ Check that the size of the task array won't overflow the gather. Parameters ---------- Ntasks_tot : int Number of total tasks. """ # Found experimentally, this is the limit on the number of tasks # that can be gathered with MPI. MAX_NTASKS_GATHER = 10226018 if Ntasks_tot >= MAX_NTASKS_GATHER: raise ValueError( f"Too many tasks for MPI to gather successfully: {Ntasks_tot}\n" "\t Consider splitting your simulation into multiple smaller jobs " "and joining them together with UVData.\n" "\t This error will go away when issue #289 is closed." ) def _set_nsky_parts(Nsrcs, cat_nfreqs, Nsky_parts): """Set the Nsky_parts.""" # Estimating required memory to decide how to split source array. mem_avail = ( simutils.get_avail_memory() - mpi.get_max_node_rss(return_per_node=True) * 2**30 ) Npus_node = mpi.node_comm.Get_size() skymodel_mem_footprint = ( simutils.estimate_skymodel_memory_usage(Nsrcs, cat_nfreqs) * Npus_node ) # Allow up to 50% of available memory for SkyModel data. skymodel_mem_max = 0.5 * mem_avail Nsky_parts_calc = np.ceil(skymodel_mem_footprint / float(skymodel_mem_max)) Nsky_parts_calc = max(Nsky_parts_calc, 1) if Nsky_parts_calc > Nsrcs: raise ValueError("Insufficient memory for simulation.") if Nsky_parts is None: Nsky_parts = Nsky_parts_calc elif Nsky_parts < Nsky_parts_calc: raise ValueError( "Nsky_parts is too small, it will lead to out of memory errors. It " f"needs to be at least {Nsky_parts_calc}. " ) return Nsky_parts
[docs]def run_uvdata_uvsim( input_uv, beam_list, beam_dict, catalog, beam_interp_check=None, quiet=False, block_nonroot_stdout=True, Nsky_parts=None, ): """ Run uvsim from UVData object. Parameters ---------- input_uv : :class:`pyuvdata.UVData` instance Provides baseline/time/frequency information. beam_list : :class:`pyuvsim.BeamList` BeamList carrying beam models. beam_dict : dictionary, optional {`antenna_name` : `beam_id`}, where `beam_id` is an index in the beam_list. This is used to assign beams to antennas. Default: All antennas get the 0th beam in the `beam_list`. catalog : :class:`pyuvsim.simsetup.SkyModelData` Immutable source parameters. beam_interp_check : bool Option to enable checking that the source positions are within the area covered by the beam. If the beam covers the full sky horizon to horizon this checking is turned off by default, otherwise it is turned on. Setting this to False can speed up simulations but if sources are simulated outside the beam area the response will be incorrect. This keyword only applies to beams that are regularly gridded in azimuth and zenith angle. quiet : bool Do not print anything. block_nonroot_stdout : bool Redirect stdout on nonzero ranks to /dev/null, for cleaner output. Nsky_parts : int, optional Number of parts to chunk the source list into to reduce memory. The default is to use the minimum number to fit within the memory of the processing unit. This parameter is included mostly to allow for testing, but can be used by a knowledgeable user if needed. If set too low, a ValueError will be raised. Returns ------- :class:`pyuvdata.UVData` instance containing simulated visibilities. """ if mpi is None: raise ImportError( "You need mpi4py to use the uvsim module. " "Install it by running pip install pyuvsim[sim] " "or pip install pyuvsim[all] if you also want the " "line_profiler installed." ) mpi.start_mpi(block_nonroot_stdout=block_nonroot_stdout) rank = mpi.get_rank() comm = mpi.get_comm() Npus = mpi.get_Npus() input_uv = comm.bcast(input_uv, root=0) beam_list = comm.bcast(beam_list, root=0) beam_dict = comm.bcast(beam_dict, root=0) catalog.share(root=0) if not isinstance(input_uv, UVData): raise TypeError("input_uv must be UVData object") if not ( (input_uv.Npols == 4) and (input_uv.polarization_array.tolist() == [-5, -6, -7, -8]) ): raise ValueError("input_uv must have XX,YY,XY,YX polarization") if not input_uv.future_array_shapes: input_uv.use_future_array_shapes() input_order = input_uv.blt_order if input_order != ("time", "baseline"): input_uv.reorder_blts(order="time", minor_order="baseline") # The root node will initialize our simulation # Read input file and make uvtask list if rank == 0 and not quiet: print("Nbls:", input_uv.Nbls, flush=True) print("Ntimes:", input_uv.Ntimes, flush=True) print("Nfreqs:", input_uv.Nfreqs, flush=True) print("Nsrcs:", catalog.Ncomponents, flush=True) if rank == 0: uv_container = simsetup._complete_uvdata(input_uv, inplace=False) if "world" in input_uv.extra_keywords: uv_container.extra_keywords["world"] = input_uv.extra_keywords["world"] vis_data = mpi.MPI.Win.Create( uv_container._data_array.value, comm=mpi.world_comm ) else: vis_data = mpi.MPI.Win.Create(None, comm=mpi.world_comm) Nbls = input_uv.Nbls Nblts = input_uv.Nblts Nfreqs = input_uv.Nfreqs Nsrcs = catalog.Ncomponents task_inds, src_inds, Ntasks_local, Nsrcs_local = _make_task_inds( Nblts, Nfreqs, Nsrcs, rank, Npus ) # Construct beam objects from strings beam_list.set_obj_mode(use_shared_mem=True) count = mpi.Counter() # wrap this in a try/finally (no exception handling) to ensure resources are freed try: # In case the user created the beam list without checking consistency: beam_list.check_consistency() if beam_list.beam_type != "efield": raise ValueError("Beam type must be efield!") if beam_interp_check is None: # check that all the beams cover the full sky if beam_list.check_all_azza_beams_full_sky(): beam_interp_check = False else: beam_interp_check = True Nsky_parts = _set_nsky_parts(Nsrcs, catalog.Nfreqs, Nsky_parts) local_task_iter = uvdata_to_task_iter( task_inds, input_uv, catalog.subselect(src_inds), beam_list, beam_dict, Nsky_parts=Nsky_parts, ) Ntasks_tot = Ntasks_local * Nsky_parts # Sum all the tasks across each node Nsky_parts = comm.reduce(Nsky_parts, op=mpi.MPI.MAX, root=0) Ntasks_tot = comm.reduce(Ntasks_tot, op=mpi.MPI.SUM, root=0) if rank == 0 and not quiet: if Nsky_parts > 1: print( "The source list has been split into Nsky_parts" f" <= {Nsky_parts} chunks on some or all nodes" " due to memory limitations.", flush=True, ) print("Tasks: ", Ntasks_tot, flush=True) pbar = simutils.progsteps(maxval=Ntasks_tot) engine = UVEngine() size_complex = np.ones(1, dtype=complex).nbytes data_array_shape = (Nblts, Nfreqs, 4) uvdata_indices = [] for task in local_task_iter: engine.set_task(task) vis = engine.make_visibility() blti, freq_ind = task.uvdata_index uvdata_indices.append(task.uvdata_index) flat_ind = np.ravel_multi_index((blti, freq_ind, 0), data_array_shape) offset = flat_ind * size_complex vis_data.Lock(0) vis_data.Accumulate(vis, 0, target=offset, op=mpi.MPI.SUM) vis_data.Unlock(0) cval = count.next() if rank == 0 and not quiet: pbar.update(cval) request = comm.Ibarrier() while not request.Test(): if rank == 0 and not quiet: cval = count.current_value() pbar.update(cval) if rank == 0 and not quiet: pbar.finish() if rank == 0 and not quiet: print("Calculations Complete.", flush=True) # If profiling is active, save meta data: from .profiling import prof # noqa if hasattr(prof, "meta_file"): # pragma: nocover # Saving axis sizes on current rank (local) and for the whole job (global). # These lines are affected by issue 179 of line_profiler, so the nocover # above will need to stay until this issue is resolved (see profiling.py). task_inds = np.array(uvdata_indices) bl_inds = task_inds[:, 0] % Nbls time_inds = (task_inds[:, 0] - bl_inds) // Nbls Ntimes_loc = np.unique(time_inds).size Nbls_loc = np.unique(bl_inds).size Nfreqs_loc = np.unique(task_inds[:, 1]).size axes_dict = { "Ntimes_loc": Ntimes_loc, "Nbls_loc": Nbls_loc, "Nfreqs_loc": Nfreqs_loc, "Nsrcs_loc": Nsky_parts, "prof_rank": prof.rank, } with open(prof.meta_file, "w") as afile: for k, v in axes_dict.items(): afile.write("{} \t {:d}\n".format(k, int(v))) finally: count.free() vis_data.Free() if rank == 0: if input_order is not None: if len(input_order) < 2: input_order = (input_order[0], None) # Don't call the function if we're already expecting (time, baseline) if input_order != ("time", "baseline"): uv_container.reorder_blts( order=input_order[0], minor_order=input_order[1] ) else: warnings.warn( "The parameter `blt_order` could not be identified for input_uv " " so the ordering cannot be restored." "The output UVData object will have (time, baseline) ordering." ) # Updating file history. history = simutils.get_version_string() if catalog.filename is not None: history += ( " Sources from source list(s): [" + ", ".join(catalog.filename) + "]." ) if "obsparam" in input_uv.extra_keywords: obs_param_file = input_uv.extra_keywords["obsparam"] telescope_config_file = input_uv.extra_keywords["telecfg"] antenna_location_file = input_uv.extra_keywords["layout"] history += ( " Based on config files: " + obs_param_file + ", " + telescope_config_file + ", " + antenna_location_file ) elif hasattr(uv_container, "filename") and uv_container.filename is not None: history += ( " Based on uvdata file(s): [" + ", ".join(uv_container.filename) + "]." ) history += " Npus = " + str(mpi.Npus) + "." # add pyradiosky version history += catalog.pyradiosky_version_str # add pyuvdata version info history += uv_container.pyuvdata_version_str uv_container.history = history return uv_container
[docs]def run_uvsim( params, return_uv=False, beam_interp_check=None, quiet=False, block_nonroot_stdout=True, ): """ Run a simulation off of an obsparam yaml file. Parameters ---------- params : str Path to a parameter yaml file. return_uv : bool If true, do not write results to file and return uv_out. (Default False) beam_interp_check : bool Option to enable checking that the source positions are within the area covered by the beam. If the beam covers the full sky horizon to horizon this checking is turned off by default, otherwise it is turned on. Setting this to False can speed up simulations but if sources are simulated outside the beam area the response will be incorrect. This keyword only applies to beams that are regularly gridded in azimuth and zenith angle. quiet : bool If True, do not print anything to stdout. (Default False) block_nonroot_stdout : bool Redirect stdout on nonzero ranks to /dev/null, for cleaner output. Returns ------- uv_out : `pyuvdata.UVData` Finished simulation results. Returned only if return_uv is True. """ if mpi is None: raise ImportError( "You need mpi4py to use the uvsim module. " "Install it by running pip install pyuvsim[sim] " "or pip install pyuvsim[all] if you also want the " "line_profiler installed." ) mpi.start_mpi(block_nonroot_stdout=block_nonroot_stdout) rank = mpi.get_rank() input_uv = UVData() beam_list = None beam_dict = None skydata = SkyModelData() if rank == 0: start = Time.now() input_uv, beam_list, beam_dict = simsetup.initialize_uvdata_from_params( params, return_beams=True ) skydata = simsetup.initialize_catalog_from_params( params, input_uv, return_catname=False ) if not quiet: print(f"UVData initialization took {(Time.now() - start).to('minute'):.3f}") start = Time.now() skydata = simsetup.SkyModelData(skydata) if not quiet: print(f"Skymodel setup took {(Time.now() - start).to('minute'):.3f}") start = Time.now() uv_out = run_uvdata_uvsim( input_uv, beam_list, beam_dict=beam_dict, catalog=skydata, quiet=quiet, beam_interp_check=beam_interp_check, ) if rank == 0 and not quiet: print(f"Run uvdata uvsim took {(Time.now() - start).to('minute'):.3f}") if rank == 0: if isinstance(params, str): with open(params, "r") as pfile: param_dict = yaml.safe_load(pfile) else: param_dict = params simutils.write_uvdata(uv_out, param_dict, dryrun=return_uv) if return_uv: return uv_out