Source code for srttools.io

"""Input/output functions."""
import astropy.io.fits as fits
from astropy.table import Table
import numpy as np
import astropy.units as u
from astropy.coordinates import (
    EarthLocation,
    AltAz,
    Angle,
    ICRS,
    GCRS,
    SkyCoord,
    get_sun,
)
import os
from astropy.time import Time
import warnings
import logging
import copy
import re
import glob
from collections.abc import Iterable
from scipy.interpolate import interp1d

from .utils import force_move_file, TWOPI

try:
    from sunpy.coordinates import frames, sun

    DEFAULT_SUN_FRAME = frames.Helioprojective
except ImportError:
    DEFAULT_SUN_FRAME = None


__all__ = [
    "mkdir_p",
    "detect_data_kind",
    "correct_offsets",
    "observing_angle",
    "get_rest_angle",
    "print_obs_info_fitszilla",
    "read_data_fitszilla",
    "read_data",
    "root_name",
    "get_chan_columns",
]


chan_re = re.compile(
    r"^Ch([0-9]+)$" r"|^Feed([0-9]+)_([a-zA-Z]+)$" r"|^Feed([0-9]+)_([a-zA-Z]+)_([0-9]+)$"
)


# 'srt': EarthLocation(4865182.7660, 791922.6890, 4035137.1740,
#                                   unit=u.m)
# EarthLocation(Angle("9:14:42.5764", u.deg),
#                                   Angle("39:29:34.93742", u.deg),
#                                   600 * u.meter) # not precise enough

locations = {
    "srt": EarthLocation(4865182.7660, 791922.6890, 4035137.1740, unit=u.m),
    "medicina": EarthLocation(Angle("11:38:49", u.deg), Angle("44:31:15", u.deg), 25 * u.meter),
    "greenwich": EarthLocation(lat=51.477 * u.deg, lon=0 * u.deg),
}


def interpret_chan_name(chan_name):
    """Get feed, polarization and baseband info from chan name.

    Examples
    >>> feed, polar, baseband = interpret_chan_name('blablabal')
    >>> feed  # None
    >>> polar  # None
    >>> baseband  # None
    >>> feed, polar, baseband = interpret_chan_name('Ch0')
    >>> feed
    0
    >>> polar  # None
    >>> baseband  # None
    >>> feed, polar, baseband = interpret_chan_name('Feed1_LCP')
    >>> feed
    1
    >>> polar
    'LCP'
    >>> baseband  # None
    >>> feed, polar, baseband = interpret_chan_name('Feed2_LCP_3')
    >>> feed
    2
    >>> polar
    'LCP'
    >>> baseband
    3
    """
    matchobj = chan_re.match(chan_name)
    if not matchobj:
        return None, None, None

    matches = [matchobj.group(i) for i in range(7)]
    polar, baseband = None, None
    if matches[6] is not None:
        baseband = int(matchobj.group(6))
        polar = matchobj.group(5)
        feed = int(matchobj.group(4))
    elif matches[3] is not None:
        polar = matchobj.group(3)
        feed = int(matchobj.group(2))
    else:
        feed = int(matchobj.group(1))

    return feed, polar, baseband


def classify_chan_columns(chans):
    """Classify the name of channels per feed, polarization, baseband.

    Examples
    --------
    >>> chans = ['Feed0_LCP_3', 'Feed0_RCP_3']
    >>> classif = classify_chan_columns(chans)
    >>> classif[0][3]['LCP']
    'Feed0_LCP_3'
    >>> classif[0][3]['RCP']
    'Feed0_RCP_3'
    >>> chans = ['Ch0']
    >>> classif = classify_chan_columns(chans)
    >>> classif[0][1]['N']
    'Ch0'
    >>> chans = ['Feed0_LCP']
    >>> classif = classify_chan_columns(chans)
    >>> classif[0][1]['LCP']
    'Feed0_LCP'
    """
    combinations = {}
    for ch in chans:
        feed, polar, baseband = interpret_chan_name(ch)
        if baseband is None:
            baseband = 1
        if polar is None:
            polar = "N"
        if feed not in combinations:
            combinations[feed] = {}

        if baseband not in combinations[feed]:
            combinations[feed][baseband] = {}

        combinations[feed][baseband][polar] = ch

    return combinations


[docs]def get_chan_columns(table): return np.array([i for i in table.columns if chan_re.match(i)])
def get_channel_feed(ch): if re.search("Feed?", ch): return int(ch[4])
[docs]def mkdir_p(path): """Safe mkdir function. Parameters ---------- path : str Name of the directory/ies to create Notes ----- Found at https://stackoverflow.com/questions/600268/mkdir-p-functionality-in-python """ import errno try: os.makedirs(path) except OSError as exc: # Python >2.5 if exc.errno == errno.EEXIST and os.path.isdir(path): pass else: raise
def _check_derotator(derot_angle): # Check that derotator angle is outside any plausible value if np.any(np.abs(derot_angle) > 2 * 360): return False return True
[docs]def detect_data_kind(fname): """Placeholder for function that recognizes data format.""" if fname.endswith(".hdf5"): return "hdf5" elif "fits" in fname: return "fitszilla" else: warnings.warn("File {} is not in a known format".format(fname)) return None
[docs]def correct_offsets(obs_angle, xoffset, yoffset): """Correct feed offsets for derotation angle. All angles are in radians. Examples -------- >>> x = 2 ** 0.5 >>> y = 2 ** 0.5 >>> angle = np.pi / 4 >>> xoff, yoff = correct_offsets(angle, x, y) >>> np.allclose([xoff, yoff], 2 ** 0.5) True """ sep = np.sqrt(xoffset**2.0 + yoffset**2.0) new_xoff = sep * np.cos(obs_angle) new_yoff = sep * np.sin(obs_angle) return new_xoff, new_yoff
[docs]def observing_angle(rest_angle, derot_angle): """Calculate the observing angle of the multifeed. If values have no units, they are assumed in radians Parameters ---------- rest_angle : float or Astropy quantity, angle rest angle of the feeds derot_angle : float or Astropy quantity, angle derotator angle Examples -------- >>> observing_angle(0 * u.rad, TWOPI * u.rad).to(u.rad).value 0.0 >>> observing_angle(0, TWOPI).to(u.rad).value 0.0 """ if not hasattr(rest_angle, "unit"): rest_angle *= u.rad if not hasattr(derot_angle, "unit"): derot_angle *= u.rad return rest_angle + (TWOPI * u.rad - derot_angle)
def _rest_angle_default(n_lat_feeds): """Default rest angles for a multifeed, in units of a circle Assumes uniform coverage. Examples -------- >>> np.allclose(_rest_angle_default(5), ... np.array([1., 0.8, 0.6, 0.4, 0.2])) True >>> np.allclose(_rest_angle_default(6) * 360, ... np.array([360., 300., 240., 180., 120., 60.])) True """ return np.arange(1, 0, -1 / n_lat_feeds)
[docs]def get_rest_angle(xoffsets, yoffsets): """Calculate the rest angle for multifeed. The first feed is assumed to be at position 0, for it the return value is 0 Examples -------- >>> xoffsets = [0.0, -0.0382222, -0.0191226, 0.0191226, 0.0382222, ... 0.0191226, -0.0191226] >>> yoffsets = [0.0, 0.0, 0.0331014, 0.0331014, 0.0, -0.0331014, ... -0.0331014] >>> np.allclose(get_rest_angle(xoffsets, yoffsets).to(u.deg).value, ... np.array([0., 180., 120., 60., 360., 300., 240.])) True """ if len(xoffsets) <= 2: return np.array([0] * len(xoffsets)) xoffsets = np.asarray(xoffsets) yoffsets = np.asarray(yoffsets) n_lat_feeds = len(xoffsets) - 1 rest_angle_default = _rest_angle_default(n_lat_feeds) * TWOPI * u.rad w_0 = np.where((xoffsets[1:] > 0) & (yoffsets[1:] == 0.0))[0][0] return np.concatenate(([0], np.roll(rest_angle_default.to(u.rad).value, w_0))) * u.rad
def infer_skydip_from_elevation(elevation, azimuth=None): if azimuth is None: azimuth = np.array([0, 0]) el_condition = np.max(elevation) - np.min(elevation) > np.pi / 3.0 az_condition = np.max(azimuth) - np.min(azimuth) < 0.1 / 180.0 * np.pi return az_condition & el_condition def get_sun_coords_from_radec(obstimes, ra, dec, sun_frame=None): if sun_frame is None: # pragma: no cover sun_frame = DEFAULT_SUN_FRAME coords = GCRS( ra=Angle(ra), dec=Angle(dec), obstime=obstimes, distance=sun.earth_distance(obstimes), ) coords_asec = coords.transform_to(sun_frame(obstime=obstimes, observer="earth")) lon = coords_asec.Tx lat = coords_asec.Ty dist = coords_asec.distance return lon.to(u.radian), lat.to(u.radian), dist.to(u.m).value def update_table_with_sun_coords(new_table, feeds=None, sun_frame=None): lon_str, lat_str = "hpln", "hplt" if not ("dsun" in new_table.colnames): new_table[lon_str] = np.zeros_like(new_table["el"]) new_table[lat_str] = np.zeros_like(new_table["az"]) new_table["dsun"] = np.zeros(len(new_table["az"])) if feeds is None: feeds = np.arange(0, new_table["el"].shape[1], dtype=int) for i in feeds: obstimes = Time(new_table["time"] * u.day, format="mjd", scale="utc") lon, lat, dist = get_sun_coords_from_radec( obstimes, new_table["ra"][:, i], new_table["dec"][:, i], sun_frame=sun_frame, ) new_table[lon_str][:, i] = lon new_table[lat_str][:, i] = lat if i == 0: new_table["dsun"][:] = dist return new_table def get_coords_from_altaz_offset(obstimes, el, az, xoffs, yoffs, location, inplace=False): """""" # Calculate observing angle if not inplace: el = copy.deepcopy(el) az = copy.deepcopy(az) el += yoffs.to(u.rad).value az += xoffs.to(u.rad).value / np.cos(el) coords = AltAz(az=Angle(az), alt=Angle(el), location=location, obstime=obstimes) # According to line_profiler, coords.icrs is *by far* the longest # operation in this function, taking between 80 and 90% of the # execution time. Need to study a way to avoid this. coords_deg = coords.transform_to(ICRS()) ra = np.radians(coords_deg.ra) dec = np.radians(coords_deg.dec) return ra, dec def is_close_to_sun(ra, dec, obstime, tolerance=3 * u.deg): """Test if current source is close to the Sun. Examples -------- >>> ra, dec = 131.13535699 * u.deg, 18.08202663 * u.deg >>> obstime = Time("2017-08-01") >>> is_close_to_sun(ra, dec, obstime, tolerance=3 * u.deg) True >>> is_close_to_sun(ra, dec + 4 * u.deg, obstime, tolerance=3 * u.deg) False """ coords = SkyCoord(ra=ra, dec=dec, frame=GCRS(obstime=obstime)) sun_position = get_sun(obstime).transform_to(GCRS(obstime=obstime)) return (coords.separation(sun_position)).to(u.deg).value < tolerance.value def update_table_with_offsets( new_table, xoffsets, yoffsets, rest_angles, feeds=None, inplace=False ): if not inplace: new_table = copy.deepcopy(new_table) lon_str, lat_str = "ra", "dec" if not (lon_str in new_table.colnames): new_table[lon_str] = np.zeros_like(new_table["el"]) new_table[lat_str] = np.zeros_like(new_table["az"]) if feeds is None: feeds = np.arange(0, new_table["el"].shape[1], dtype=int) for i in feeds: obs_angle = observing_angle(rest_angles[i], new_table["derot_angle"]) # offsets < 0.001 arcseconds: don't correct (usually feed 0) if ( np.abs(xoffsets[i]) < np.radians(0.001 / 60.0) * u.rad and np.abs(yoffsets[i]) < np.radians(0.001 / 60.0) * u.rad ): continue xoffs, yoffs = correct_offsets(obs_angle, xoffsets[i], yoffsets[i]) obstimes = Time(new_table["time"] * u.day, format="mjd", scale="utc") location = locations[new_table.meta["site"]] lon, lat = get_coords_from_altaz_offset( obstimes, new_table["el"][:, i], new_table["az"][:, i], xoffs, yoffs, location=location, inplace=inplace, ) new_table[lon_str][:, i] = lon new_table[lat_str][:, i] = lat return new_table def _chan_name(f, p, c=None): if c is not None: return "Feed{}_{}_{}".format(f, p, c) else: return "Feed{}_{}".format(f, p)
[docs]def read_data_fitszilla(fname): with fits.open(fname, memmap=False) as lchdulist: retval = _read_data_fitszilla(lchdulist) return retval
def get_value_with_units(fitsext, keyword, default=""): if isinstance(fitsext, fits.BinTableHDU): fitsext = fitsext.data unitstr = fitsext.columns[keyword].unit if unitstr is None: if default not in ["", None]: unit = u.Unit(default) else: unit = 1 else: unit = u.Unit(unitstr) value = fitsext[keyword] is_string = isinstance(value, str) is_iterable = isinstance(value, Iterable) if is_string or (is_iterable and isinstance(value[0], str)): return value else: return value * unit def adjust_temperature_size_rough(temp, comparison_array): """Adjust the size of the temperature array. Examples -------- >>> temp = [1, 2, 3, 4] >>> adjust_temperature_size_rough(temp, [5, 6, 7]) array([1, 2, 3]) >>> adjust_temperature_size_rough(temp, [5, 6, 7, 5, 4]) array([1, 2, 3, 4, 4]) >>> adjust_temperature_size_rough(temp, [5, 6]) array([2, 3]) >>> adjust_temperature_size_rough(temp, [5, 6, 7, 5, 4, 6]) array([1, 1, 2, 3, 4, 4]) """ import copy temp = np.asarray(temp) comparison_array = np.asarray(comparison_array) temp_save = copy.deepcopy(temp) sizediff = temp.size - comparison_array.size if sizediff > 0: temp = temp[sizediff // 2 : sizediff // 2 + comparison_array.size] elif sizediff < 0: # make it positive sizediff = -sizediff temp = np.zeros_like(comparison_array) temp[sizediff // 2 : sizediff // 2 + temp_save.size] = temp_save temp[: sizediff // 2] = temp_save[0] temp[sizediff // 2 + temp_save.size - 1 :] = temp_save[-1] return temp def adjust_temperature_size(temp, comparison_array): """Adjust the size of the temperature array. Examples -------- >>> temp = [1, 2, 3, 4] >>> np.allclose(adjust_temperature_size(temp, [5, 6]), [1.0, 4.0]) True >>> temp = [1, 2, 3, 4] >>> np.allclose(adjust_temperature_size(temp, [5, 6, 4, 5]), temp) True """ temp = np.asarray(temp) comparison_array = np.asarray(comparison_array) Ntemp = temp.shape[0] Ndata = comparison_array.shape[0] if Ntemp == Ndata: return temp temp_func = interp1d(np.linspace(0, 1, Ntemp), temp) newtemp = temp_func(np.linspace(0, 1, Ndata)) return newtemp # from memory_profiler import profile # @profile def _read_data_fitszilla(lchdulist): """Open a fitszilla FITS file and read all relevant information.""" is_new_fitszilla = np.any(["coord" in i.name.lower() for i in lchdulist]) # ----------- Extract generic observation information ------------------ headerdict = dict(lchdulist[0].header.items()) source = lchdulist[0].header["SOURCE"] site = lchdulist[0].header["ANTENNA"].lower() receiver = lchdulist[0].header["RECEIVER CODE"] ra = lchdulist[0].header["RIGHTASCENSION"] * u.rad dec = lchdulist[0].header["DECLINATION"] * u.rad ra_offset = dec_offset = az_offset = el_offset = 0 * u.rad if "RightAscension Offset" in lchdulist[0].header: ra_offset = lchdulist[0].header["RightAscension Offset"] * u.rad if "Declination Offset" in lchdulist[0].header: dec_offset = lchdulist[0].header["Declination Offset"] * u.rad if "Azimuth Offset" in lchdulist[0].header: az_offset = lchdulist[0].header["Azimuth Offset"] * u.rad if "Elevation Offset" in lchdulist[0].header: el_offset = lchdulist[0].header["Elevation Offset"] * u.rad # ----------- Read the list of channel ids ------------------ section_table_data = lchdulist["SECTION TABLE"].data chan_ids = get_value_with_units(section_table_data, "id") nbin_per_chan = get_value_with_units(section_table_data, "bins") sample_rate = get_value_with_units(section_table_data, "sampleRate") try: bw_section = get_value_with_units(section_table_data, "bandWidth") fr_section = get_value_with_units(section_table_data, "frequency") except KeyError: bw_section = None fr_section = None integration_time = lchdulist["SECTION TABLE"].header["Integration"] * u.ms if len(list(set(nbin_per_chan))) > 1: raise ValueError( "Only datasets with the same nbin per channel are " "supported at the moment" ) nbin_per_chan = list(set(nbin_per_chan))[0] types = get_value_with_units(section_table_data, "type") if "stokes" in types: is_polarized = True else: is_polarized = False # Check. If backend is not specified, use Total Power try: backend = lchdulist[0].header["BACKEND NAME"] except Exception: if "stokes" in types: if nbin_per_chan == 2048: backend = "XARCOS" else: backend = "SARDARA" elif "spectra" in types: backend = "SARDARA" else: backend = "TP" # ----------- Read the list of RF inputs, feeds, polarization, etc. -- rf_input_data = lchdulist["RF INPUTS"].data feeds = get_value_with_units(rf_input_data, "feed") IFs = get_value_with_units(rf_input_data, "ifChain") polarizations = get_value_with_units(rf_input_data, "polarization") sections = get_value_with_units(rf_input_data, "section") frequencies_rf = get_value_with_units(rf_input_data, "frequency") bandwidths_rf = get_value_with_units(rf_input_data, "bandWidth") local_oscillator = get_value_with_units(rf_input_data, "localOscillator") try: cal_mark_temp = get_value_with_units(rf_input_data, "calibrationMark") except KeyError: # Old, stupid typo cal_mark_temp = get_value_with_units(rf_input_data, "calibratonMark") if bw_section is not None: bandwidths_section = [bw_section[i] for i in sections] frequencies_section = [fr_section[i] for i in sections] frequencies_section = [f + l for (f, l) in zip(frequencies_section, local_oscillator)] if backend == "TP" or bw_section is None: frequencies, bandwidths = frequencies_rf, bandwidths_rf else: frequencies, bandwidths = frequencies_section, bandwidths_section combinations = list(zip(frequencies, bandwidths)) combination_idx = np.arange(len(combinations)) # Solve stupid problem with old CCB data if receiver.lower() == "ccb": feeds[:] = 0 if len(set(combinations)) > 1: chan_names = [_chan_name(f, p, c) for f, p, c in zip(feeds, polarizations, combination_idx)] else: chan_names = [_chan_name(f, p) for f, p in zip(feeds, polarizations)] # ----- Read the offsets of different feeds (nonzero only if multifeed)-- feed_input_data = lchdulist["FEED TABLE"].data # Add management of historical offsets. # Note that we need to add the units by hand in this case. xoffsets = get_value_with_units(feed_input_data, "xOffset", default="rad") yoffsets = get_value_with_units(feed_input_data, "yOffset", default="rad") relpowers = get_value_with_units(feed_input_data, "relativePower") # -------------- Read data!----------------------------------------- datahdu = lchdulist["DATA TABLE"] # N.B.: there is an increase in memory usage here. This is just because # data are being read from the file at this point, not before. data_table_data = Table(datahdu.data) tempdata = Table(lchdulist["ANTENNA TEMP TABLE"].data) for col in data_table_data.colnames: if col == col.lower(): continue data_table_data.rename_column(col, col.lower()) for col in tempdata.colnames: if col == col.lower(): continue tempdata.rename_column(col, col.lower()) is_old_spectrum = "SPECTRUM" in list(datahdu.header.values()) if is_old_spectrum: data_table_data.rename_column("spectrum", "ch0") sections = np.array([0, 0]) unsupported_temperature = False if len(tempdata[tempdata.colnames[0]].shape) == 2: try: tempdata_new = Table() for i, (feed, ifnum) in enumerate(zip(feeds, IFs)): tempdata_new[f"ch{i}"] = tempdata[f"ch{feed}"][:, ifnum] tempdata = tempdata_new except Exception: # pragma: no cover warnings.warn("Temperature format not supported", UserWarning) unsupported_temperature = True pass existing_columns = [chn for chn in data_table_data.colnames if chn.startswith("ch")] if existing_columns == []: raise ValueError("Invalid data") is_spectrum = nbin_per_chan > 1 is_single_channel = len(set(combinations)) == 1 good = np.ones(len(feeds), dtype=bool) for i, s in enumerate(sections): section_name = "ch{}".format(s) if section_name not in existing_columns: good[i] = False allfeeds = feeds feeds = allfeeds[good] IFs = IFs[good] polarizations = polarizations[good] sections = sections[good] unique_feeds = np.unique(feeds) rest_angles = get_rest_angle(xoffsets, yoffsets) if is_spectrum: nchan = len(chan_ids) sample_channel = existing_columns[0] _, nbins = data_table_data[sample_channel].shape # Development version of SARDARA -- will it remain the same? if nbin_per_chan == nbins: IFs = np.zeros_like(IFs) if nbin_per_chan * nchan * 2 == nbins and not is_polarized: warnings.warn( "Data appear to contain polarization information " "but are classified as simple, not stokes, in the " "Section table." ) is_polarized = True if ( nbin_per_chan != nbins and nbin_per_chan * nchan != nbins and nbin_per_chan * nchan * 2 != nbins and not is_polarized ): raise ValueError( "Something wrong with channel subdivision: " "{} bins/channel, {} channels, " "{} total bins".format(nbin_per_chan, nchan, nbins) ) for f, ic, p, s in zip(feeds, IFs, polarizations, sections): c = s if is_single_channel: c = None section_name = "ch{}".format(s) ch = _chan_name(f, p, c) start, end = ic * nbin_per_chan, (ic + 1) * nbin_per_chan data_table_data[ch] = data_table_data[section_name][:, start:end] if is_polarized: # for f, ic, p, s in zip(feeds, IFs, polarizations, sections): for s in list(set(sections)): f = feeds[sections == s][0] c = s if is_single_channel: c = None section_name = "ch{}".format(s) qname, uname = _chan_name(f, "Q", c), _chan_name(f, "U", c) qstart, qend = 2 * nbin_per_chan, 3 * nbin_per_chan ustart, uend = 3 * nbin_per_chan, 4 * nbin_per_chan data_table_data[qname] = data_table_data[section_name][:, qstart:qend] data_table_data[uname] = data_table_data[section_name][:, ustart:uend] chan_names += [qname, uname] for f, ic, p, s in zip(feeds, IFs, polarizations, sections): section_name = "ch{}".format(s) if section_name in data_table_data.colnames: data_table_data.remove_column(section_name) else: for ic, ch in enumerate(chan_names): data_table_data[ch] = data_table_data["ch{}".format(chan_ids[ic])] # ----------- Read temperature data, if possible ---------------- for ic, ch in enumerate(chan_names): data_table_data[ch + "-Temp"] = 0.0 if unsupported_temperature: continue if len(chan_ids) <= ic: continue ch_string = f"ch{chan_ids[ic]}" if ch_string not in tempdata.colnames: continue td = np.asarray(tempdata[ch_string]) data_table_data[ch + "-Temp"] = adjust_temperature_size(td, data_table_data[ch + "-Temp"]) info_to_retrieve = [ "time", "derot_angle", "weather", "par_angle", "flag_track", "flag_cal", ] + [ch + "-Temp" for ch in chan_names] new_table = Table() new_table.meta.update(headerdict) new_table.meta["SOURCE"] = source new_table.meta["site"] = site new_table.meta["backend"] = backend new_table.meta["receiver"] = receiver new_table.meta["RA"] = ra new_table.meta["Dec"] = dec new_table.meta["channels"] = nbin_per_chan new_table.meta["VLSR"] = new_table.meta["VLSR"] * u.Unit("km/s") for i, off in zip( "ra,dec,el,az".split(","), [ra_offset, dec_offset, el_offset, az_offset], ): new_table.meta[i + "_offset"] = off for info in info_to_retrieve: new_table[info] = data_table_data[info] if not _check_derotator(new_table["derot_angle"]): logging.debug("Derotator angle looks weird. Setting to 0") new_table["derot_angle"][:] = 0 # Duplicate raj and decj columns (in order to be corrected later) Nfeeds = np.max(allfeeds) + 1 for newcol, oldcol in [("ra", "raj2000"), ("dec", "decj2000"), ("el", "el"), ("az", "az")]: new_table[newcol] = np.tile(data_table_data[oldcol], (Nfeeds, 1)).transpose() new_table.meta["is_skydip"] = infer_skydip_from_elevation( data_table_data["el"], data_table_data["az"] ) for info in ["ra", "dec", "az", "el", "derot_angle"]: new_table[info].unit = u.radian if not is_new_fitszilla: update_table_with_offsets( new_table, xoffsets, yoffsets, rest_angles, feeds=unique_feeds, inplace=True ) else: for i in range(len(xoffsets)): try: ext = lchdulist["Coord{}".format(i)] extdata = ext.data ra, dec = extdata["raj2000"], extdata["decj2000"] el, az = extdata["el"], extdata["az"] except KeyError: ra, dec = new_table["ra"][:, 0], new_table["dec"][:, 0] el, az = new_table["el"][:, 0], new_table["az"][:, 0] new_table["ra"][:, i] = ra new_table["dec"][:, i] = dec new_table["el"][:, i] = el new_table["az"][:, i] = az # Don't know if better euristics is needed obstime = Time(np.mean(new_table["time"]) * u.day, format="mjd", scale="utc") if is_close_to_sun( new_table.meta["RA"], new_table.meta["Dec"], obstime, tolerance=3 * u.deg, ): if DEFAULT_SUN_FRAME is None: raise ValueError("You need Sunpy to process Sun observations.") update_table_with_sun_coords( new_table, feeds=unique_feeds, sun_frame=DEFAULT_SUN_FRAME, ) lchdulist.close() # So ugly. But it works filtered_frequencies = [f for (f, g) in zip(frequencies, good) if g] for i, fr in enumerate(filtered_frequencies): f = feeds[i] s = sections[i] ic = IFs[i] p = polarizations[i] b = bandwidths[i] lo = local_oscillator[i] cal = cal_mark_temp[i] c = s if is_single_channel: c = None chan_name = _chan_name(f, p, c) if bandwidths[ic] < 0: frequencies[ic] -= bandwidths[ic] bandwidths[ic] *= -1 for i in range(data_table_data[chan_name].shape[0]): data_table_data[chan_name][f, :] = data_table_data[chan_name][f, ::-1] new_table[chan_name] = data_table_data[chan_name] * relpowers[feeds[ic]] new_table[chan_name + "-filt"] = np.ones(len(data_table_data[chan_name]), dtype=bool) data_table_data.remove_column(chan_name) newmeta = { "polarization": polarizations[ic], "feed": int(f), "IF": int(ic), "frequency": fr.to("MHz"), "bandwidth": b.to("MHz"), "sample_rate": sample_rate[s], "sample_time": (1 / (sample_rate[s].to(u.Hz))).to("s"), "local_oscillator": lo.to("MHz"), "cal_mark_temp": cal.to("K"), "integration_time": integration_time.to("s"), "xoffset": xoffsets[f].to(u.rad), "yoffset": yoffsets[f].to(u.rad), "relpower": float(relpowers[f]), } new_table[chan_name].meta.update(headerdict) new_table[chan_name].meta.update(new_table.meta) new_table[chan_name].meta.update(newmeta) if is_polarized: for s in list(set(sections)): feed = feeds[sections == s][0] c = s if is_single_channel: c = None for stokes_par in "QU": chan_name = _chan_name(feed, stokes_par, c) try: new_table[chan_name] = data_table_data[chan_name] except KeyError: continue sample_time = 1 / (sample_rate[s].to(u.Hz)) newmeta = { "polarization": stokes_par, "feed": int(feed), "IF": -1, # There are two IFs for each section "frequency": frequencies[2 * s].to("MHz"), "bandwidth": bandwidths[2 * s].to("MHz"), "sample_rate": sample_rate[s], "sample_time": sample_time.to("s"), "local_oscillator": local_oscillator[2 * s].to("MHz"), "cal_mark_temp": cal_mark_temp[2 * s].to("K"), "integration_time": integration_time.to("s"), "xoffset": xoffsets[feed].to(u.rad), "yoffset": yoffsets[feed].to(u.rad), "relpower": 1.0, } new_table[chan_name].meta.update(headerdict) new_table[chan_name].meta.update(new_table.meta) new_table[chan_name].meta.update(newmeta) new_table[chan_name + "-filt"] = np.ones( len(data_table_data[chan_name]), dtype=bool ) data_table_data.remove_column(chan_name) return new_table
[docs]def read_data(fname): """Read the data, whatever the format, and return them.""" kind = detect_data_kind(fname) if kind == "fitszilla": return read_data_fitszilla(fname) elif kind == "hdf5": return Table.read(fname) else: return None
[docs]def root_name(fname): """Return the file name without extension.""" fn, ext = os.path.splitext(fname) if "fits" in ext and not ext.endswith("fits"): fn += ext.replace("fits", "").replace(".", "") return fn
def _try_type(value, dtype): """ Examples -------- >>> _try_type("1", int) 1 >>> _try_type(1.0, int) 1 >>> _try_type("ab", float) 'ab' """ try: return dtype(value) except ValueError: return value def label_from_chan_name(ch): """ Examples -------- >>> label_from_chan_name('Feed0_LCP_1') 'LL' >>> label_from_chan_name('Feed0_Q_2') 'LR' >>> label_from_chan_name('Feed3_RCP_1') 'RR' >>> label_from_chan_name('Feed2_U_3') 'RL' """ _, polar, _ = interpret_chan_name(ch) if polar.startswith("L"): return "LL" elif polar.startswith("R"): return "RR" elif polar.startswith("Q"): return "LR" elif polar.startswith("U"): return "RL" else: raise ValueError("Unrecognized polarization") def bulk_change(file, path, value): """Bulk change keyword or column values in FITS file. Parameters ---------- file : str Input file path : str it has to be formatted as EXT,data,COLUMN or EXT,header,KEY depending on what is being changed (a data column or a header key resp.). Ex. 1,TIME to change the values of column TIME in ext. n. 1 value : any acceptable type Value to be filled in """ with fits.open(file, memmap=False) as hdul: ext, attr, key = path.split(",") ext = _try_type(ext, int) data = getattr(hdul[ext], attr) data[key] = value setattr(hdul[ext], attr, data) hdul.writeto("tmp.fits", overwrite=True) force_move_file("tmp.fits", file) def main_bulk_change(args=None): """Preprocess the data.""" import argparse description = "Change all values of a given column or header keyword in " "fits files" parser = argparse.ArgumentParser(description=description) parser.add_argument( "files", nargs="*", help="Single files to preprocess", default=None, type=str, ) parser.add_argument( "-k", "--key", type=str, default=None, help="Path to key or data column. E.g. " '"EXT,header,KEY" to change key KEY in the header' "in extension EXT; EXT,data,COL to change column" "COL in the data of extension EXT", ) parser.add_argument("-v", "--value", default=None, type=str, help="Value to be written") parser.add_argument( "--apply-cal-mark", action="store_true", default=False, help='Short for -k "DATA TABLE,data,flag_cal" -v 1', ) parser.add_argument( "--recursive", action="store_true", default=False, help="Look for file in up to two subdirectories", ) parser.add_argument( "--debug", action="store_true", default=False, help="Plot stuff and be verbose", ) args = parser.parse_args(args) if args.apply_cal_mark: args.key = "DATA TABLE,data,flag_cal" args.value = 1 if args.key is None: raise ValueError( "What should I do? Please specify either key and " "value, or apply-cal-mark" ) fnames = [] for fname in args.files: if args.recursive: if not fname == os.path.basename(fname): raise ValueError( "Options recursive requires a file name, not " "a full path: {}".format(fname) ) fs = glob.glob(os.path.join("**", fname), recursive=True) fnames.extend(fs) else: fnames.append(fname) for fname in fnames: print("Updating", fname, "...", end="") bulk_change(fname, args.key, args.value) print(fname, " Done.")