Source code for srttools.io

"""Input/output functions."""

import copy
import glob
import logging
import os
import re
import warnings
from collections.abc import Iterable

import numpy as np
from scipy.interpolate import interp1d

import astropy.io.fits as fits
import astropy.units as u
from astropy.coordinates import (
    GCRS,
    ICRS,
    AltAz,
    Angle,
    EarthLocation,
    SkyCoord,
    get_sun,
)
from astropy.table import Table
from astropy.time import Time

from .utils import TWOPI, force_move_file

try:
    from sunpy.coordinates import frames, sun

    DEFAULT_SUN_FRAME = frames.Helioprojective
except ImportError:
    DEFAULT_SUN_FRAME = None


__all__ = [
    "correct_offsets",
    "detect_data_kind",
    "get_chan_columns",
    "get_rest_angle",
    "mkdir_p",
    "observing_angle",
    "print_obs_info_fitszilla",
    "read_data",
    "read_data_fitszilla",
    "root_name",
]


chan_re = re.compile(
    r"^Ch([0-9]+)$" r"|^Feed([0-9]+)_([a-zA-Z]+)$" r"|^Feed([0-9]+)_([a-zA-Z]+)_([0-9]+)$"
)


# 'srt': EarthLocation(4865182.7660, 791922.6890, 4035137.1740,
#                                   unit=u.m)
# EarthLocation(Angle("9:14:42.5764", u.deg),
#                                   Angle("39:29:34.93742", u.deg),
#                                   600 * u.meter) # not precise enough

locations = {
    "srt": EarthLocation(4865182.7660, 791922.6890, 4035137.1740, unit=u.m),
    "medicina": EarthLocation(Angle("11:38:49", u.deg), Angle("44:31:15", u.deg), 25 * u.meter),
    "greenwich": EarthLocation(lat=51.477 * u.deg, lon=0 * u.deg),
}


def interpret_chan_name(chan_name):
    """Get feed, polarization and baseband info from chan name.

    Examples
    --------
    >>> feed, polar, baseband = interpret_chan_name('blablabal')
    >>> feed  # None
    >>> polar  # None
    >>> baseband  # None
    >>> feed, polar, baseband = interpret_chan_name('Ch0')
    >>> feed
    0
    >>> polar  # None
    >>> baseband  # None
    >>> feed, polar, baseband = interpret_chan_name('Feed1_LCP')
    >>> feed
    1
    >>> polar
    'LCP'
    >>> baseband  # None
    >>> feed, polar, baseband = interpret_chan_name('Feed2_LCP_3')
    >>> feed
    2
    >>> polar
    'LCP'
    >>> baseband
    3
    """
    matchobj = chan_re.match(chan_name)
    if not matchobj:
        return None, None, None

    matches = [matchobj.group(i) for i in range(7)]
    polar, baseband = None, None
    if matches[6] is not None:
        baseband = int(matchobj.group(6))
        polar = matchobj.group(5)
        feed = int(matchobj.group(4))
    elif matches[3] is not None:
        polar = matchobj.group(3)
        feed = int(matchobj.group(2))
    else:
        feed = int(matchobj.group(1))

    return feed, polar, baseband


def classify_chan_columns(chans):
    """Classify the name of channels per feed, polarization, baseband.

    Examples
    --------
    >>> chans = ['Feed0_LCP_3', 'Feed0_RCP_3']
    >>> classif = classify_chan_columns(chans)
    >>> classif[0][3]['LCP']
    'Feed0_LCP_3'
    >>> classif[0][3]['RCP']
    'Feed0_RCP_3'
    >>> chans = ['Ch0']
    >>> classif = classify_chan_columns(chans)
    >>> classif[0][1]['N']
    'Ch0'
    >>> chans = ['Feed0_LCP']
    >>> classif = classify_chan_columns(chans)
    >>> classif[0][1]['LCP']
    'Feed0_LCP'
    """
    combinations = {}
    for ch in chans:
        feed, polar, baseband = interpret_chan_name(ch)
        if baseband is None:
            baseband = 1
        if polar is None:
            polar = "N"
        if feed not in combinations:
            combinations[feed] = {}

        if baseband not in combinations[feed]:
            combinations[feed][baseband] = {}

        combinations[feed][baseband][polar] = ch

    return combinations


[docs] def get_chan_columns(table): return np.array([i for i in table.columns if chan_re.match(i)])
def get_channel_feed(ch): if re.search("Feed?", ch): return int(ch[4])
[docs] def mkdir_p(path): """Safe mkdir function. Parameters ---------- path : str Name of the directory/ies to create Notes ----- Found at https://stackoverflow.com/questions/600268/mkdir-p-functionality-in-python """ import errno try: os.makedirs(path) except OSError as exc: # Python >2.5 if exc.errno == errno.EEXIST and os.path.isdir(path): pass else: raise
def _check_derotator(derot_angle): # Check that derotator angle is outside any plausible value if np.any(np.abs(derot_angle) > 2 * 360): return False return True
[docs] def detect_data_kind(fname): """Placeholder for function that recognizes data format.""" if fname.endswith(".hdf5"): return "hdf5" elif "fits" in fname: return "fitszilla" else: warnings.warn(f"File {fname} is not in a known format") return None
[docs] def correct_offsets(obs_angle, xoffset, yoffset): """Correct feed offsets for derotation angle. All angles are in radians. Examples -------- >>> x = 2 ** 0.5 >>> y = 2 ** 0.5 >>> angle = np.pi / 4 >>> xoff, yoff = correct_offsets(angle, x, y) >>> np.allclose([xoff, yoff], 2 ** 0.5) True """ sep = np.sqrt(xoffset**2.0 + yoffset**2.0) new_xoff = sep * np.cos(obs_angle) new_yoff = sep * np.sin(obs_angle) return new_xoff, new_yoff
[docs] def observing_angle(rest_angle, derot_angle): """Calculate the observing angle of the multifeed. If values have no units, they are assumed in radians Parameters ---------- rest_angle : float or Astropy quantity, angle rest angle of the feeds derot_angle : float or Astropy quantity, angle derotator angle Examples -------- >>> float(observing_angle(0 * u.rad, TWOPI * u.rad).to(u.rad).value) 0.0 >>> float(observing_angle(0, TWOPI).to(u.rad).value) 0.0 """ if not hasattr(rest_angle, "unit"): rest_angle *= u.rad if not hasattr(derot_angle, "unit"): derot_angle *= u.rad return rest_angle + (TWOPI * u.rad - derot_angle)
def _rest_angle_default(n_lat_feeds): """Default rest angles for a multifeed, in units of a circle Assumes uniform coverage. Examples -------- >>> np.allclose(_rest_angle_default(5), ... np.array([1., 0.8, 0.6, 0.4, 0.2])) True >>> np.allclose(_rest_angle_default(6) * 360, ... np.array([360., 300., 240., 180., 120., 60.])) True """ return np.arange(1, 0, -1 / n_lat_feeds)
[docs] def get_rest_angle(xoffsets, yoffsets): """Calculate the rest angle for multifeed. The first feed is assumed to be at position 0, for it the return value is 0 Examples -------- >>> xoffsets = [0.0, -0.0382222, -0.0191226, 0.0191226, 0.0382222, ... 0.0191226, -0.0191226] >>> yoffsets = [0.0, 0.0, 0.0331014, 0.0331014, 0.0, -0.0331014, ... -0.0331014] >>> np.allclose(get_rest_angle(xoffsets, yoffsets).to(u.deg).value, ... np.array([0., 180., 120., 60., 360., 300., 240.])) True """ if len(xoffsets) <= 2: return np.array([0] * len(xoffsets)) xoffsets = np.asarray(xoffsets) yoffsets = np.asarray(yoffsets) n_lat_feeds = len(xoffsets) - 1 rest_angle_default = _rest_angle_default(n_lat_feeds) * TWOPI * u.rad w_0 = np.where((xoffsets[1:] > 0) & (yoffsets[1:] == 0.0))[0][0] return np.concatenate(([0], np.roll(rest_angle_default.to(u.rad).value, w_0))) * u.rad
def infer_skydip_from_elevation(elevation, azimuth=None): if azimuth is None: azimuth = np.array([0, 0]) el_condition = np.max(elevation) - np.min(elevation) > np.pi / 3.0 az_condition = np.max(azimuth) - np.min(azimuth) < 0.1 / 180.0 * np.pi return az_condition & el_condition def get_sun_coords_from_radec(obstimes, ra, dec, sun_frame=None): if sun_frame is None: # pragma: no cover sun_frame = DEFAULT_SUN_FRAME coords = GCRS( ra=Angle(ra), dec=Angle(dec), obstime=obstimes, distance=sun.earth_distance(obstimes), ) coords_asec = coords.transform_to(sun_frame(obstime=obstimes, observer="earth")) lon = coords_asec.Tx lat = coords_asec.Ty dist = coords_asec.distance return lon.to(u.radian), lat.to(u.radian), dist.to(u.m).value def update_table_with_sun_coords(new_table, feeds=None, sun_frame=None): lon_str, lat_str = "hpln", "hplt" if "dsun" not in new_table.colnames: new_table[lon_str] = np.zeros_like(new_table["el"]) new_table[lat_str] = np.zeros_like(new_table["az"]) new_table["dsun"] = np.zeros(len(new_table["az"])) if feeds is None: feeds = np.arange(0, new_table["el"].shape[1], dtype=int) for i in feeds: obstimes = Time(new_table["time"] * u.day, format="mjd", scale="utc") lon, lat, dist = get_sun_coords_from_radec( obstimes, new_table["ra"][:, i], new_table["dec"][:, i], sun_frame=sun_frame, ) new_table[lon_str][:, i] = lon new_table[lat_str][:, i] = lat if i == 0: new_table["dsun"][:] = dist return new_table def get_coords_from_altaz_offset(obstimes, el, az, xoffs, yoffs, location, inplace=False): """""" # Calculate observing angle if not inplace: el = copy.deepcopy(el) az = copy.deepcopy(az) el += yoffs.to(u.rad).value az += xoffs.to(u.rad).value / np.cos(el) coords = AltAz(az=Angle(az), alt=Angle(el), location=location, obstime=obstimes) # According to line_profiler, coords.icrs is *by far* the longest # operation in this function, taking between 80 and 90% of the # execution time. Need to study a way to avoid this. coords_deg = coords.transform_to(ICRS()) ra = np.radians(coords_deg.ra) dec = np.radians(coords_deg.dec) return ra, dec def is_close_to_sun(ra, dec, obstime, tolerance=3 * u.deg): """Test if current source is close to the Sun. Examples -------- >>> ra, dec = 131.13535699 * u.deg, 18.08202663 * u.deg >>> obstime = Time("2017-08-01") >>> assert is_close_to_sun(ra, dec, obstime, tolerance=3 * u.deg) >>> assert not is_close_to_sun(ra, dec + 4 * u.deg, obstime, tolerance=3 * u.deg) """ coords = SkyCoord(ra=ra, dec=dec, frame=GCRS(obstime=obstime)) sun_position = get_sun(obstime).transform_to(GCRS(obstime=obstime)) return (coords.separation(sun_position)).to(u.deg).value < tolerance.value def update_table_with_offsets( new_table, xoffsets, yoffsets, rest_angles, feeds=None, inplace=False ): if not inplace: new_table = copy.deepcopy(new_table) lon_str, lat_str = "ra", "dec" if lon_str not in new_table.colnames: new_table[lon_str] = np.zeros_like(new_table["el"]) new_table[lat_str] = np.zeros_like(new_table["az"]) if feeds is None: feeds = np.arange(0, new_table["el"].shape[1], dtype=int) for i in feeds: obs_angle = observing_angle(rest_angles[i], new_table["derot_angle"]) # offsets < 0.001 arcseconds: don't correct (usually feed 0) if ( np.abs(xoffsets[i]) < np.radians(0.001 / 60.0) * u.rad and np.abs(yoffsets[i]) < np.radians(0.001 / 60.0) * u.rad ): continue xoffs, yoffs = correct_offsets(obs_angle, xoffsets[i], yoffsets[i]) obstimes = Time(new_table["time"] * u.day, format="mjd", scale="utc") location = locations[new_table.meta["site"]] lon, lat = get_coords_from_altaz_offset( obstimes, new_table["el"][:, i], new_table["az"][:, i], xoffs, yoffs, location=location, inplace=inplace, ) new_table[lon_str][:, i] = lon new_table[lat_str][:, i] = lat return new_table def _chan_name(f, p, c=None): if c is not None: return f"Feed{f}_{p}_{c}" else: return f"Feed{f}_{p}"
[docs] def read_data_fitszilla(fname): with fits.open(fname, memmap=False) as lchdulist: retval = _read_data_fitszilla(lchdulist) return retval
def get_value_with_units(fitsext, keyword, default=""): if isinstance(fitsext, fits.BinTableHDU): fitsext = fitsext.data unitstr = fitsext.columns[keyword].unit if unitstr is None: if default not in ["", None]: unit = u.Unit(default) else: unit = 1 else: unit = u.Unit(unitstr) value = fitsext[keyword] is_string = isinstance(value, str) is_iterable = isinstance(value, Iterable) if is_string or (is_iterable and isinstance(value[0], str)): return value else: return value * unit def adjust_temperature_size_rough(temp, comparison_array): """Adjust the size of the temperature array. Examples -------- >>> temp = [1, 2, 3, 4] >>> adjust_temperature_size_rough(temp, [5, 6, 7]) array([1, 2, 3]) >>> adjust_temperature_size_rough(temp, [5, 6, 7, 5, 4]) array([1, 2, 3, 4, 4]) >>> adjust_temperature_size_rough(temp, [5, 6]) array([2, 3]) >>> adjust_temperature_size_rough(temp, [5, 6, 7, 5, 4, 6]) array([1, 1, 2, 3, 4, 4]) """ import copy temp = np.asarray(temp) comparison_array = np.asarray(comparison_array) temp_save = copy.deepcopy(temp) sizediff = temp.size - comparison_array.size if sizediff > 0: temp = temp[sizediff // 2 : sizediff // 2 + comparison_array.size] elif sizediff < 0: # make it positive sizediff = -sizediff temp = np.zeros_like(comparison_array) temp[sizediff // 2 : sizediff // 2 + temp_save.size] = temp_save temp[: sizediff // 2] = temp_save[0] temp[sizediff // 2 + temp_save.size - 1 :] = temp_save[-1] return temp def adjust_temperature_size(temp, comparison_array): """Adjust the size of the temperature array. Examples -------- >>> temp = [1, 2, 3, 4] >>> np.allclose(adjust_temperature_size(temp, [5, 6]), [1.0, 4.0]) True >>> temp = [1, 2, 3, 4] >>> np.allclose(adjust_temperature_size(temp, [5, 6, 4, 5]), temp) True """ temp = np.asarray(temp) comparison_array = np.asarray(comparison_array) Ntemp = temp.shape[0] Ndata = comparison_array.shape[0] if Ntemp == Ndata: return temp temp_func = interp1d(np.linspace(0, 1, Ntemp), temp) newtemp = temp_func(np.linspace(0, 1, Ndata)) return newtemp # from memory_profiler import profile # @profile def _read_data_fitszilla(lchdulist): """Open a fitszilla FITS file and read all relevant information.""" is_new_fitszilla = np.any(["coord" in i.name.lower() for i in lchdulist]) # ----------- Extract generic observation information ------------------ headerdict = dict(lchdulist[0].header.items()) source = lchdulist[0].header["SOURCE"] site = lchdulist[0].header["ANTENNA"].lower() receiver = lchdulist[0].header["RECEIVER CODE"] ra = lchdulist[0].header["RIGHTASCENSION"] * u.rad dec = lchdulist[0].header["DECLINATION"] * u.rad ra_offset = dec_offset = az_offset = el_offset = 0 * u.rad if "RightAscension Offset" in lchdulist[0].header: ra_offset = lchdulist[0].header["RightAscension Offset"] * u.rad if "Declination Offset" in lchdulist[0].header: dec_offset = lchdulist[0].header["Declination Offset"] * u.rad if "Azimuth Offset" in lchdulist[0].header: az_offset = lchdulist[0].header["Azimuth Offset"] * u.rad if "Elevation Offset" in lchdulist[0].header: el_offset = lchdulist[0].header["Elevation Offset"] * u.rad # ----------- Read the list of channel ids ------------------ section_table_data = lchdulist["SECTION TABLE"].data chan_ids = get_value_with_units(section_table_data, "id") nbin_per_chan = get_value_with_units(section_table_data, "bins") sample_rate = get_value_with_units(section_table_data, "sampleRate") try: bw_section = get_value_with_units(section_table_data, "bandWidth") fr_section = get_value_with_units(section_table_data, "frequency") except KeyError: bw_section = None fr_section = None integration_time = lchdulist["SECTION TABLE"].header["Integration"] * u.ms if len(list(set(nbin_per_chan))) > 1: raise ValueError( "Only datasets with the same nbin per channel are " "supported at the moment" ) nbin_per_chan = next(iter(set(nbin_per_chan))) types = get_value_with_units(section_table_data, "type") if "stokes" in types: is_polarized = True else: is_polarized = False # Check. If backend is not specified, use Total Power try: backend = lchdulist[0].header["BACKEND NAME"] except Exception: if "stokes" in types: if nbin_per_chan == 2048: backend = "XARCOS" else: backend = "SARDARA" elif "spectra" in types: backend = "SARDARA" else: backend = "TP" # ----------- Read the list of RF inputs, feeds, polarization, etc. -- rf_input_data = lchdulist["RF INPUTS"].data feeds = get_value_with_units(rf_input_data, "feed") IFs = get_value_with_units(rf_input_data, "ifChain") polarizations = get_value_with_units(rf_input_data, "polarization") sections = get_value_with_units(rf_input_data, "section") frequencies_rf = get_value_with_units(rf_input_data, "frequency") bandwidths_rf = get_value_with_units(rf_input_data, "bandWidth") local_oscillator = get_value_with_units(rf_input_data, "localOscillator") attenuation = get_value_with_units(rf_input_data, "attenuation") try: cal_mark_temp = get_value_with_units(rf_input_data, "calibrationMark") except KeyError: # Old, stupid typo cal_mark_temp = get_value_with_units(rf_input_data, "calibratonMark") if bw_section is not None: bandwidths_section = [bw_section[i] for i in sections] frequencies_section = [fr_section[i] for i in sections] frequencies_section = [f + l for (f, l) in zip(frequencies_section, local_oscillator)] if backend == "TP" or bw_section is None: frequencies, bandwidths = frequencies_rf, bandwidths_rf else: frequencies, bandwidths = frequencies_section, bandwidths_section combinations = list(zip(frequencies, bandwidths)) combination_idx = np.arange(len(combinations)) # Solve stupid problem with old CCB data if receiver.lower() == "ccb": feeds[:] = 0 if len(set(combinations)) > 1: chan_names = [_chan_name(f, p, c) for f, p, c in zip(feeds, polarizations, combination_idx)] else: chan_names = [_chan_name(f, p) for f, p in zip(feeds, polarizations)] # ----- Read the offsets of different feeds (nonzero only if multifeed)-- feed_input_data = lchdulist["FEED TABLE"].data # Add management of historical offsets. # Note that we need to add the units by hand in this case. xoffsets = get_value_with_units(feed_input_data, "xOffset", default="rad") yoffsets = get_value_with_units(feed_input_data, "yOffset", default="rad") relpowers = get_value_with_units(feed_input_data, "relativePower") # -------------- Read data!----------------------------------------- datahdu = lchdulist["DATA TABLE"] # N.B.: there is an increase in memory usage here. This is just because # data are being read from the file at this point, not before. data_table_data = Table(datahdu.data) tempdata = Table(lchdulist["ANTENNA TEMP TABLE"].data) for col in data_table_data.colnames: if col == col.lower(): continue data_table_data.rename_column(col, col.lower()) for col in tempdata.colnames: if col == col.lower(): continue tempdata.rename_column(col, col.lower()) is_old_spectrum = "SPECTRUM" in list(datahdu.header.values()) if is_old_spectrum: data_table_data.rename_column("spectrum", "ch0") sections = np.array([0, 0]) unsupported_temperature = False if len(tempdata[tempdata.colnames[0]].shape) == 2: try: tempdata_new = Table() for i, (feed, ifnum) in enumerate(zip(feeds, IFs)): tempdata_new[f"ch{i}"] = tempdata[f"ch{feed}"][:, ifnum] tempdata = tempdata_new except Exception: # pragma: no cover warnings.warn("Temperature format not supported", UserWarning) unsupported_temperature = True existing_columns = [chn for chn in data_table_data.colnames if chn.startswith("ch")] if existing_columns == []: raise ValueError("Invalid data") is_spectrum = nbin_per_chan > 1 is_single_channel = len(set(combinations)) == 1 good = np.ones(len(feeds), dtype=bool) for i, s in enumerate(sections): section_name = f"ch{s}" if section_name not in existing_columns: good[i] = False allfeeds = feeds feeds = allfeeds[good] IFs = IFs[good] polarizations = polarizations[good] sections = sections[good] unique_feeds = np.unique(feeds) rest_angles = get_rest_angle(xoffsets, yoffsets) if is_spectrum: nchan = len(chan_ids) sample_channel = existing_columns[0] _, nbins = data_table_data[sample_channel].shape # Development version of SARDARA -- will it remain the same? if nbin_per_chan == nbins: IFs = np.zeros_like(IFs) if nbin_per_chan * nchan * 2 == nbins and not is_polarized: warnings.warn( "Data appear to contain polarization information " "but are classified as simple, not stokes, in the " "Section table." ) is_polarized = True if ( nbin_per_chan != nbins and nbin_per_chan * nchan != nbins and nbin_per_chan * nchan * 2 != nbins and not is_polarized ): raise ValueError( "Something wrong with channel subdivision: " f"{nbin_per_chan} bins/channel, {nchan} channels, " f"{nbins} total bins" ) for f, ic, p, s in zip(feeds, IFs, polarizations, sections): c = s if is_single_channel: c = None section_name = f"ch{s}" ch = _chan_name(f, p, c) start, end = ic * nbin_per_chan, (ic + 1) * nbin_per_chan data_table_data[ch] = data_table_data[section_name][:, start:end] if is_polarized: # for f, ic, p, s in zip(feeds, IFs, polarizations, sections): for s in list(set(sections)): f = feeds[sections == s][0] c = s if is_single_channel: c = None section_name = f"ch{s}" qname, uname = _chan_name(f, "Q", c), _chan_name(f, "U", c) qstart, qend = 2 * nbin_per_chan, 3 * nbin_per_chan ustart, uend = 3 * nbin_per_chan, 4 * nbin_per_chan data_table_data[qname] = data_table_data[section_name][:, qstart:qend] data_table_data[uname] = data_table_data[section_name][:, ustart:uend] chan_names += [qname, uname] for f, ic, p, s in zip(feeds, IFs, polarizations, sections): section_name = f"ch{s}" if section_name in data_table_data.colnames: data_table_data.remove_column(section_name) else: for ic, ch in enumerate(chan_names): data_table_data[ch] = data_table_data[f"ch{chan_ids[ic]}"] # ----------- Read temperature data, if possible ---------------- for ic, ch in enumerate(chan_names): data_table_data[ch + "-Temp"] = 0.0 if unsupported_temperature: continue if len(chan_ids) <= ic: continue ch_string = f"ch{chan_ids[ic]}" if ch_string not in tempdata.colnames: continue td = np.asarray(tempdata[ch_string]) data_table_data[ch + "-Temp"] = adjust_temperature_size(td, data_table_data[ch + "-Temp"]) info_to_retrieve = [ "time", "derot_angle", "weather", "par_angle", "flag_track", "flag_cal", ] + [ch + "-Temp" for ch in chan_names] new_table = Table() new_table.meta.update(headerdict) new_table.meta["SOURCE"] = source new_table.meta["site"] = site new_table.meta["backend"] = backend new_table.meta["receiver"] = receiver new_table.meta["RA"] = ra new_table.meta["Dec"] = dec new_table.meta["channels"] = nbin_per_chan new_table.meta["VLSR"] = new_table.meta["VLSR"] * u.Unit("km/s") new_table.meta["attenuations"] = ",".join([str(int(a.value)) for a in attenuation]) for i, off in zip( "ra,dec,el,az".split(","), [ra_offset, dec_offset, el_offset, az_offset], ): new_table.meta[i + "_offset"] = off for info in info_to_retrieve: new_table[info] = data_table_data[info] if not _check_derotator(new_table["derot_angle"]): logging.debug("Derotator angle looks weird. Setting to 0") new_table["derot_angle"][:] = 0 # Duplicate raj and decj columns (in order to be corrected later) Nfeeds = np.max(allfeeds) + 1 for newcol, oldcol in [("ra", "raj2000"), ("dec", "decj2000"), ("el", "el"), ("az", "az")]: new_table[newcol] = np.tile(data_table_data[oldcol], (Nfeeds, 1)).transpose() new_table.meta["is_skydip"] = infer_skydip_from_elevation( data_table_data["el"], data_table_data["az"] ) for info in ["ra", "dec", "az", "el", "derot_angle"]: new_table[info].unit = u.radian if not is_new_fitszilla: update_table_with_offsets( new_table, xoffsets, yoffsets, rest_angles, feeds=unique_feeds, inplace=True ) else: for i in range(len(xoffsets)): try: ext = lchdulist[f"Coord{i}"] extdata = ext.data ra, dec = extdata["raj2000"], extdata["decj2000"] el, az = extdata["el"], extdata["az"] except KeyError: ra, dec = new_table["ra"][:, 0], new_table["dec"][:, 0] el, az = new_table["el"][:, 0], new_table["az"][:, 0] new_table["ra"][:, i] = ra new_table["dec"][:, i] = dec new_table["el"][:, i] = el new_table["az"][:, i] = az # Don't know if better euristics is needed obstime = Time(np.mean(new_table["time"]) * u.day, format="mjd", scale="utc") if is_close_to_sun( new_table.meta["RA"], new_table.meta["Dec"], obstime, tolerance=3 * u.deg, ): if DEFAULT_SUN_FRAME is None: raise ValueError("You need Sunpy to process Sun observations.") update_table_with_sun_coords( new_table, feeds=unique_feeds, sun_frame=DEFAULT_SUN_FRAME, ) lchdulist.close() # So ugly. But it works filtered_frequencies = [f for (f, g) in zip(frequencies, good) if g] for i, fr in enumerate(filtered_frequencies): f = feeds[i] s = sections[i] ic = IFs[i] p = polarizations[i] b = bandwidths[i] lo = local_oscillator[i] cal = cal_mark_temp[i] att = attenuation[i] c = s if is_single_channel: c = None chan_name = _chan_name(f, p, c) if bandwidths[ic] < 0: frequencies[ic] -= bandwidths[ic] bandwidths[ic] *= -1 for i in range(data_table_data[chan_name].shape[0]): data_table_data[chan_name][f, :] = data_table_data[chan_name][f, ::-1] new_table[chan_name] = data_table_data[chan_name] * relpowers[feeds[ic]] new_table[chan_name + "-filt"] = np.ones(len(data_table_data[chan_name]), dtype=bool) data_table_data.remove_column(chan_name) newmeta = { "polarization": polarizations[ic], "feed": int(f), "IF": int(ic), "frequency": fr.to("MHz"), "bandwidth": b.to("MHz"), "sample_rate": sample_rate[s], "sample_time": (1 / (sample_rate[s].to(u.Hz))).to("s"), "local_oscillator": lo.to("MHz"), "attenuation": att, "cal_mark_temp": cal.to("K"), "integration_time": integration_time.to("s"), "xoffset": xoffsets[f].to(u.rad), "yoffset": yoffsets[f].to(u.rad), "relpower": float(relpowers[f]), } new_table[chan_name].meta.update(headerdict) new_table[chan_name].meta.update(new_table.meta) new_table[chan_name].meta.update(newmeta) if is_polarized: for s in list(set(sections)): feed = feeds[sections == s][0] c = s if is_single_channel: c = None for stokes_par in "QU": chan_name = _chan_name(feed, stokes_par, c) try: new_table[chan_name] = data_table_data[chan_name] except KeyError: continue sample_time = 1 / (sample_rate[s].to(u.Hz)) newmeta = { "polarization": stokes_par, "feed": int(feed), "IF": -1, # There are two IFs for each section "frequency": frequencies[2 * s].to("MHz"), "bandwidth": bandwidths[2 * s].to("MHz"), "sample_rate": sample_rate[s], "sample_time": sample_time.to("s"), "local_oscillator": local_oscillator[2 * s].to("MHz"), "attenuation": attenuation[2 * s], "cal_mark_temp": cal_mark_temp[2 * s].to("K"), "integration_time": integration_time.to("s"), "xoffset": xoffsets[feed].to(u.rad), "yoffset": yoffsets[feed].to(u.rad), "relpower": 1.0, } new_table[chan_name].meta.update(headerdict) new_table[chan_name].meta.update(new_table.meta) new_table[chan_name].meta.update(newmeta) new_table[chan_name + "-filt"] = np.ones( len(data_table_data[chan_name]), dtype=bool ) data_table_data.remove_column(chan_name) return new_table
[docs] def read_data(fname): """Read the data, whatever the format, and return them.""" kind = detect_data_kind(fname) if kind == "fitszilla": return read_data_fitszilla(fname) elif kind == "hdf5": return Table.read(fname) else: return None
[docs] def root_name(fname): """Return the file name without extension.""" fn, ext = os.path.splitext(fname) if "fits" in ext and not ext.endswith("fits"): fn += ext.replace("fits", "").replace(".", "") return fn
def _try_type(value, dtype): """ Examples -------- >>> _try_type("1", int) 1 >>> _try_type(1.0, int) 1 >>> _try_type("ab", float) 'ab' """ try: return dtype(value) except ValueError: return value def label_from_chan_name(ch): """ Examples -------- >>> label_from_chan_name('Feed0_LCP_1') 'LL' >>> label_from_chan_name('Feed0_Q_2') 'LR' >>> label_from_chan_name('Feed3_RCP_1') 'RR' >>> label_from_chan_name('Feed2_U_3') 'RL' """ _, polar, _ = interpret_chan_name(ch) if polar.startswith("L"): return "LL" elif polar.startswith("R"): return "RR" elif polar.startswith("Q"): return "LR" elif polar.startswith("U"): return "RL" else: raise ValueError("Unrecognized polarization") def bulk_change(file, path, value): """Bulk change keyword or column values in FITS file. Parameters ---------- file : str Input file path : str it has to be formatted as EXT,data,COLUMN or EXT,header,KEY depending on what is being changed (a data column or a header key resp.). Ex. 1,TIME to change the values of column TIME in ext. n. 1 value : any acceptable type Value to be filled in """ with fits.open(file, memmap=False) as hdul: ext, attr, key = path.split(",") ext = _try_type(ext, int) data = getattr(hdul[ext], attr) data[key] = value setattr(hdul[ext], attr, data) hdul.writeto("tmp.fits", overwrite=True) force_move_file("tmp.fits", file) def main_bulk_change(args=None): """Preprocess the data.""" import argparse description = "Change all values of a given column or header keyword in " "fits files" parser = argparse.ArgumentParser(description=description) parser.add_argument( "files", nargs="*", help="Single files to preprocess", default=None, type=str, ) parser.add_argument( "-k", "--key", type=str, default=None, help="Path to key or data column. E.g. " '"EXT,header,KEY" to change key KEY in the header' "in extension EXT; EXT,data,COL to change column" "COL in the data of extension EXT", ) parser.add_argument("-v", "--value", default=None, type=str, help="Value to be written") parser.add_argument( "--apply-cal-mark", action="store_true", default=False, help='Short for -k "DATA TABLE,data,flag_cal" -v 1', ) parser.add_argument( "--recursive", action="store_true", default=False, help="Look for file in up to two subdirectories", ) parser.add_argument( "--debug", action="store_true", default=False, help="Plot stuff and be verbose", ) args = parser.parse_args(args) if args.apply_cal_mark: args.key = "DATA TABLE,data,flag_cal" args.value = 1 if args.key is None: raise ValueError( "What should I do? Please specify either key and " "value, or apply-cal-mark" ) fnames = [] for fname in args.files: if args.recursive: if not fname == os.path.basename(fname): raise ValueError( "Options recursive requires a file name, not " f"a full path: {fname}" ) fs = glob.glob(os.path.join("**", fname), recursive=True) fnames.extend(fs) else: fnames.append(fname) for fname in fnames: print("Updating", fname, "...", end="") bulk_change(fname, args.key, args.value) print(fname, " Done.")