Source code for srttools.convert

from astropy.io import fits
import astropy.units as u
from astropy.time import Time
from astropy.table import Table
import numpy as np
import copy
import warnings
import os
import glob
import shutil

from astropy.utils.exceptions import AstropyWarning

from .io import get_coords_from_altaz_offset, correct_offsets
from .io import get_rest_angle, observing_angle, locations
from .converters.mbfits import MBFITS_creator
from .converters.classfits import CLASSFITS_creator
from .converters.sdfits import SDFITS_creator


[docs]def convert_to_complete_fitszilla(fname, outname): if outname == fname: raise ValueError("Files cannot have the same name") with fits.open(fname, memmap=False) as lchdulist: _convert_to_complete_fitszilla(lchdulist, outname) lchdulist.writeto(outname + ".fits", overwrite=True)
def _convert_to_complete_fitszilla(lchdulist, outname): feed_input_data = lchdulist["FEED TABLE"].data xoffsets = feed_input_data["xOffset"] * u.rad yoffsets = feed_input_data["yOffset"] * u.rad # ----------- Extract generic observation information ------------------ site = lchdulist[0].header["ANTENNA"].lower() location = locations[site] rest_angles = get_rest_angle(xoffsets, yoffsets) datahdu = lchdulist["DATA TABLE"] data_table_data = Table(datahdu.data) new_table = Table() info_to_retrieve = [ "time", "derot_angle", "el", "az", "raj2000", "decj2000", ] for info in info_to_retrieve: new_table[info.replace("j2000", "")] = data_table_data[info] el_save = new_table["el"] az_save = new_table["az"] derot_angle = new_table["derot_angle"] el_save.unit = u.rad az_save.unit = u.rad derot_angle.unit = u.rad times = new_table["time"] for i, (xoffset, yoffset) in enumerate(zip(xoffsets, yoffsets)): obs_angle = observing_angle(rest_angles[i], derot_angle) # offsets < 0.001 arcseconds: don't correct (usually feed 0) if ( np.abs(xoffset) < np.radians(0.001 / 60.0) * u.rad and np.abs(yoffset) < np.radians(0.001 / 60.0) * u.rad ): continue el = copy.deepcopy(el_save) az = copy.deepcopy(az_save) xoffs, yoffs = correct_offsets(obs_angle, xoffset, yoffset) obstimes = Time(times * u.day, format="mjd", scale="utc") # el and az are also changed inside this function (inplace is True) ra, dec = get_coords_from_altaz_offset( obstimes, el, az, xoffs, yoffs, location=location, inplace=True ) ra = fits.Column(array=ra, name="raj2000", format="1D") dec = fits.Column(array=dec, name="decj2000", format="1D") el = fits.Column(array=el, name="el", format="1D") az = fits.Column(array=az, name="az", format="1D") new_data_extension = fits.BinTableHDU.from_columns([ra, dec, el, az]) new_data_extension.name = "Coord{}".format(i) lchdulist.append(new_data_extension)
[docs]def launch_convert_coords(name, label, save_locally=False): allfiles = [] if os.path.isdir(name): allfiles += glob.glob(os.path.join(name, "*.fits")) else: allfiles += [name] for fname in allfiles: if "summary.fits" in fname: continue outroot = fname.replace(".fits", "_" + label) if save_locally: outroot = os.path.basename(outroot) convert_to_complete_fitszilla(fname, outroot) return outroot
# from memory_profiler import profile # fp = open('memory_profiler_basic_mean.log', 'w+') # precision = 10 # # @profile(precision=precision, stream=fp)
[docs]def launch_mbfits_creator(name, label, test=False, wrap=False, detrend=False, save_locally=False): if not os.path.isdir(name): raise ValueError("Input for MBFITS conversion must be a directory.") name = name.rstrip("/") random_name = "tmp_" + str(np.random.random()) mbfits = MBFITS_creator(random_name, test=test) summary = os.path.join(name, "summary.fits") if os.path.exists(summary): mbfits.fill_in_summary(summary) for fname in sorted(glob.glob(os.path.join(name, "*.fits"))): if "summary.fits" in fname: continue mbfits.add_subscan(fname, detrend=detrend) mbfits.update_scan_info() if save_locally: name = os.path.basename(name) outname = name + "_" + label if os.path.exists(outname): shutil.rmtree(outname) if wrap: fnames = mbfits.wrap_up_file() for febe, fname in fnames.items(): shutil.move(fname, name + "." + febe + ".fits") shutil.move(random_name, outname) return outname, mbfits
[docs]def launch_classfits_creator(name, label, test=False, save_locally=False): if not os.path.isdir(name): raise ValueError("Input for CLASSFITS conversion must be a directory.") name = name.rstrip("/") outname = name + "_" + label if save_locally: outname = os.path.basename(outname) if os.path.exists(outname): shutil.rmtree(outname) random_name = "tmp_" + str(np.random.random()) classfits = CLASSFITS_creator(random_name, scandir=name, average=True) shutil.move(random_name, outname) return outname, classfits
[docs]def launch_sdfits_creator(name, label, test=False, save_locally=False): if not os.path.isdir(name): raise ValueError("Input for SDFITS conversion must be a directory.") name = name.rstrip("/") outname = name + "_" + label if save_locally: outname = os.path.basename(outname) if os.path.exists(outname): shutil.rmtree(outname) random_name = "tmp_" + str(np.random.random()) classfits = SDFITS_creator(random_name, scandir=name) shutil.move(random_name, outname) return outname, classfits
[docs]def match_srt_name(name): """ Examples -------- >>> matchobj = match_srt_name('blabla/20180212-150835-S0000-3C84_RA/') >>> matchobj.group(1) '20180212' >>> matchobj.group(2) '150835' >>> matchobj.group(3) 'S0000' >>> matchobj.group(4) '3C84_RA' """ name = os.path.basename(name.rstrip("/")) import re name_re = re.compile(r"([0-9]+).([0-9]+)-([^\-]+)-([^\-]+)") return name_re.match(name)
[docs]def main_convert(args=None): import argparse description = "Load a series of scans and convert them to various" "formats" parser = argparse.ArgumentParser(description=description) parser.add_argument( "files", nargs="*", help="Single files to process or directories", default=None, type=str, ) parser.add_argument( "-f", "--format", type=str, default="fitsmod", help="Format of output files (options: " "mbfits, indicating MBFITS v. 1.65; " "mbfitsw, indicating MBFITS v. 1.65 wrapped in a" "single file for each FEBE; " "fitsmod (default), indicating a fitszilla with " "converted coordinates for feed number *n* in " "a separate COORDn extensions); " "classfits, indicating a FITS file readable into " "CLASS, calibrated when possible;" "sdfits, for the SDFITS convention", ) parser.add_argument( "--test", help="Only to be used in tests!", action="store_true", default=False, ) parser.add_argument( "--detrend", help="Detrend data before converting to MBFITS", action="store_true", default=False, ) parser.add_argument( "--save-locally", help="Save all data in the current directory, not" "alongside the original data.", action="store_true", default=False, ) args = parser.parse_args(args) outnames = [] for fname in args.files: if args.format == "fitsmod": outname = launch_convert_coords(fname, args.format, save_locally=args.save_locally) outnames.append(outname) elif args.format == "mbfits": outname, mbfits = launch_mbfits_creator( fname, args.format, test=args.test, wrap=False, detrend=args.detrend, save_locally=args.save_locally, ) if args.test: fname = "20180212-150835-S0000-3C84_RA" matchobj = match_srt_name(fname) if matchobj: date = matchobj.group(1) new_name = "{site}_{date}_{scanno:04d}_{febe}".format( site=mbfits.site.strip().upper(), date=date, scanno=mbfits.obsid, febe=list(mbfits.FEBE.keys())[0], ) if os.path.exists(new_name): shutil.rmtree(new_name) shutil.move(outname, new_name) outname = new_name outnames.append(outname) elif args.format == "mbfitsw": outname, mbfits = launch_mbfits_creator( fname, args.format, test=args.test, wrap=True, detrend=args.detrend, save_locally=args.save_locally, ) outnames.append(outname) elif args.format == "classfits": outname, mbfits = launch_classfits_creator( fname, args.format, test=args.test, save_locally=args.save_locally, ) outnames.append(outname) elif args.format == "sdfits": outname, mbfits = launch_sdfits_creator( fname, args.format, test=args.test, save_locally=args.save_locally, ) outnames.append(outname) else: warnings.warn("Unknown output format", AstropyWarning) return outnames