#!/usr/bin/env python
import numpy as np
from .lightcurve import LC
from astropy import constants as const, units as u
from astropy.io import fits, ascii
from astropy.wcs import WCS
from astropy.time import Time
from astropy.table import Table
import argparse
import matplotlib.pyplot as plt
import os
import warnings
import json
import re
import shutil
[docs]
def removebadcards(hdr):
"""
Remove problematic entries from a :class:`astropy.io.fits.header.Header`
"""
for card in hdr.cards:
try:
card.verify('fix')
except fits.verify.VerifyError as e:
print(e)
hdr.remove(card.keyword)
except Exception as e:
print(e)
hdr.remove(card.keyword)
return hdr
[docs]
def remove_duplicate_wcs(hdr, keep_number=0):
"""
Remove duplicate world coordinate system header keywords from a :class:`astropy.io.fits.header.Header`
"""
for key in ['CTYPE1', 'CTYPE2', 'CRPIX1', 'CRPIX2', 'CRVAL1', 'CRVAL2', 'CD1_1', 'CD2_2', 'CD1_2', 'CD2_1']:
if key in hdr and hdr.count(key) > 1:
card = hdr.cards[(key, keep_number)]
hdr.remove(card.keyword, remove_all=True)
hdr[card.keyword] = (card.value, card.comment)
[docs]
def readfitsspec(filename, header=False, ext=None):
"""
Read a spectrum from a FITS file
Parameters
----------
filename : str
Filename from which to read the spectrum
header : bool, optional
If True, also return the FITS header
ext : int, str, optional
FITS extension number or name in which the spectrum is stored
Returns
-------
wl : array-like
Wavelengths, typically in ångströms
flux : array-like
Observed fluxes in erg / (s cm2 angstrom), if units are identifiable
hdr : astropy.io.fits.header.Header, optional
FITS header, returned if ``header=True``
"""
hdulist = fits.open(filename)
if ext is None:
for hdu in hdulist: # try to find SCI extension
if hdu.header.get('extname') == 'SCI':
break
else: # if no SCI extension, pick the first extension with data
for hdu in hdulist:
if hdu.data is not None:
break
else:
raise Exception('no extensions have any data')
else:
hdu = hdulist[ext]
data = hdu.data
hdr = hdu.header
if isinstance(hdu, fits.BinTableHDU):
wl = data['wavelength']
flux = data['flux']
else:
data = np.moveaxis(data, np.arange(data.ndim), np.argsort(data.shape)) # put longest axis last
flux = data.flatten()[:max(data.shape)]
remove_duplicate_wcs(hdr) # some problem with Gemini pipeline
if hdr.get('CUNIT1') in ['Angstroms', 'angstroms', 'deg', 'pixel']:
hdr['CUNIT1'] = 'Angstrom' # WCS object needs recognizable units
with warnings.catch_warnings():
warnings.simplefilter('ignore')
wcs = WCS(removebadcards(hdr), naxis=1, relax=False, fix=False)
if 'CUNIT1' in hdr:
wl = wcs.pixel_to_world(np.arange(len(flux))).to(hdr['CUNIT1']).value
else:
wl = wcs.wcs_pix2world(np.arange(len(flux)), 0)[0]
if header:
return wl, flux, hdr
else:
return wl, flux
[docs]
def convert_spectrum_units(wl, flux, hdr, default_bunit='erg / (Angstrom cm2 s)', default_cunit='Angstrom'):
"""
Convert a spectrum to standard units, if information is available in the header to establish units (BUNIT & CUNIT1)
Parameters
----------
wl: array-like
Input wavelengths
flux: array-like
Input fluxes
hdr: dict-like
Metadata from which to determine the units of wl and flux
default_bunit: str, optional
Units of the output flux. Default: 'erg / (Angstrom cm2 s)'
default_cunit: str, optional
Units of the output wavelengths. Default: 'Angstrom'
Returns
-------
wl: numpy.ndarray
Input wavelengths converted to `default_cunit`
flux: numpy.ndarray
Input fluxes converted to `default_bunit`
"""
bunit = hdr.get('BUNIT', default_bunit)
if bunit == 'adu':
bunit = default_bunit
if 'Angstrom' not in bunit:
bunit = bunit.replace('Ang', 'Angstrom')
if 'Angstrom' not in bunit:
bunit = bunit.replace('A', 'Angstrom')
cunit = hdr.get('CUNIT1', hdr.get('XUNITS', default_cunit))
if cunit.lower() == 'angstroms':
cunit = cunit.rstrip('s')
wl = u.Quantity(wl, cunit).to(default_cunit)
with warnings.catch_warnings():
warnings.simplefilter('ignore')
flux = u.Quantity(flux, bunit).to(default_bunit, u.equivalencies.spectral_density(wl))
return wl.value, flux.value
[docs]
def readOSCspec(filepath):
"""
Read spectra from a JSON file in the Open Astronomy Catalog schema (https://github.com/astrocatalogs/schema)
Parameters
----------
filepath : str
Path to the JSON file containing the spectra
Returns
-------
filenames : list
List of filenames corresponding to each spectrum
times : list
List of :class:`astropy.time.Time` when each spectrum was observed
tel : list
List of telescope names (if given) where each spectrum was observed
inst : list
List of instrument names (if given) with which each spectrum was observed
wl : list
List of wavelength arrays for each spectrum
fx : list
List of flux arrays for each spectrum
scales : list
List of scaling factors for each spectrum (all ones for the OSC)
"""
with open(filepath) as f:
json_dict = json.load(f)
rows = json_dict[os.path.splitext(os.path.basename(filepath))[0]]
if 'spectra' in rows:
rows = rows['spectra']
else:
return [], [], [], [], [], [], []
superdict = {key: [d[key] if key in d else '0' for d in rows] for key in
set(np.concatenate([list(d.keys()) for d in rows]))}
times = [Time(float(t), format=u.lower()) for t, u in zip(superdict['time'], superdict['u_time'])]
wl = [0.1 * np.array(d, dtype=float)[:, 0] for d in superdict['data']]
fx = [np.array(d, dtype=float)[:, 1] for d in superdict['data']]
if 'telescope' in superdict:
tel = superdict['telescope']
else:
tel = np.tile('', len(rows))
if 'instrument' in superdict:
inst = superdict['instrument']
else:
inst = np.tile('', len(rows))
# t = Table([superdict['filename'], times, tel, inst, wl, fx, np.ones(len(rows))],
# names=['filename', 'date', 'telescope', 'instrument', 'wl', 'flux', 'scale'])
return superdict['filename'], times, tel, inst, wl, fx, np.ones(len(rows))
[docs]
def readspec(f, verbose=False, return_header=False):
"""
Read a spectrum from a FITS or ASCII file and try to identify where and when it was observed
Parameters
----------
f : str
Filename from which to read the spectrum
verbose : bool, optional
If True, print the date and filename of the spectrum
return_header : bool, optional
If True, return the full header as an additional return value
Returns
-------
x : array-like
Wavelengths, typically in ångströms
y : array-like
Fluxes in erg / (s cm2 angstrom), if units are identifiable
date : astropy.time.Time
Time at which the spectrum was observed, if identifiable (otherwise ``None``)
telescope : str
Name of the telescope at which the spectrum was observed, if identifiable (otherwise ``''``)
instrument: str
Name of the instrument used to observe the spectrum, if identifiable (otherwise ``''``)
hdr : dict-like
If ``return_header=True``, metadata is returned as a FITS header or dictionary (for ASCII files)
"""
ext = os.path.splitext(f)[1]
if ext == '.fits':
x, y, hdr = readfitsspec(f, header=True)
elif ext == '.json':
x, y, hdr = readOSCspec(f)
else: # assume it's some ASCII format
t = Table.read(f, format='ascii')
# assume it's the first two columns, regardless of if column names are given
x = t.columns[0]
y = t.columns[1]
hdr = {}
comments = t.meta.get('comments', [])
for line in comments:
match = re.search('([^ ]*) *[=:] *([^/]*)', line)
if match is None:
continue
kwd, val = match.groups() # (anything before first '='/':', anything between first '='/':' and '/')
hdr[kwd.strip(' #')] = val.strip(' "\'')
for kwd in ['MJD-OBS', 'MJD_OBS', 'MJD', 'JD', 'DATE-AVG', 'UTMIDDLE', 'DATE-OBS', 'DATE_BEG', 'UTSHUT', 'OBS_DATE',
'AVE_MJD']:
if kwd in hdr and hdr[kwd]:
if 'MJD' in kwd:
date = Time(float(hdr[kwd]), format='mjd')
elif 'JD' in kwd and float(hdr['JD']) > 2400000:
date = Time(float(hdr[kwd]), format='jd')
elif 'JD' in kwd:
date = Time(float(hdr[kwd]) + 2400000, format='jd')
elif 'T' in hdr[kwd]:
date = Time(hdr[kwd])
elif kwd == 'OBS_DATE':
date = Time(hdr[kwd].split('+')[0])
elif '-' in hdr[kwd]:
for kwd2 in ['UTMIDDLE', 'EXPSTART', 'UT']:
if kwd2 in hdr and type(hdr[kwd2]) == str and ':' in hdr[kwd2]:
date = Time(hdr[kwd] + 'T' + hdr[kwd2])
break
elif kwd2 in hdr:
h = int(np.floor(hdr[kwd2]))
m = int(np.floor((hdr[kwd2] * 60) % 60))
s = int(np.floor((hdr[kwd2] * 3600) % 60))
date = Time(hdr[kwd] + 'T{:02d}:{:02d}:{:02d}'.format(h, m, s))
break
else:
date = Time(hdr[kwd])
else:
continue
break
else: # hope it's in the filename
m1 = re.search('24[0-9][0-9][0-9][0-9][0-9]\.[0-9]+', f) # JD w/1 or more decimals
m_tns = re.search(
'(19|20)[0-9][0-9]-(0[0-9]|1[0-2])-(0[1-9]|[12][0-9]|3[01])_([01][0-9]|2[0-4])-[0-5][0-9]-[0-5][0-9]',
f) # YYYY-MM-DD_HH:MM:SS
m2 = re.search('([12][90][0-9][0-9])-?(0[0-9]|1[0-2])-?(0[1-9]|[12][0-9]|3[01])(\.[0-9]+)?', f) # YYYYMMDD.FFF
m3 = re.search('[0-9][0-9][0-9]d', f) # integer phase followed by 'd'
m4 = re.search('[0-9][0-9][0-9][0-9][0-9](\.[0-9]+)?', f) # MJD w/1 or more decimals
if m1 is not None:
m = m1.group()
date = Time(float(m), format='jd')
elif m_tns is not None:
m = m_tns.group()
d, t = m.split('_')
date = Time(d + 'T' + t.replace('-', ':'))
elif m2 is not None:
groups = m2.groups()
date = Time('-'.join(groups[:3]))
if groups[3] is not None:
date += float(groups[-1]) * u.day
elif m3 is not None:
m = m3.group()
date = Time(float(m[:-1]), format='mjd')
elif m4 is not None:
m = m4.group()
date = Time(float(m), format='mjd')
else:
date = None
if 'TELESCOP' in hdr:
telescope = hdr['TELESCOP'].strip()
elif 'TELESCOPE' in hdr:
telescope = hdr['TELESCOPE'].strip()
elif 'OBSERVAT' in hdr:
telescope = hdr['OBSERVAT'].strip()
else:
telescope = ''
if 'INSTRUME' in hdr:
instrument = hdr['INSTRUME'].strip()
elif 'INSTRUMENT' in hdr:
instrument = hdr['INSTRUMENT'].strip()
elif 'INSTR' in hdr:
instrument = hdr['INSTR'].strip()
elif 'INSTRUMENT_ID' in hdr:
instrument = hdr['INSTRUMENT_ID']
else:
instrument = ''
x, y = convert_spectrum_units(x, y, hdr)
if verbose:
print(date.isot, f)
if return_header:
return x, y, date, telescope, instrument, hdr
else:
return x, y, date, telescope, instrument
[docs]
def calibrate_spectra(spectra, lc, filters=None, order=0, subtract_percentile=None, max_extrapolate=1., show=False):
"""
Calibrate a set of spectra to an observed broadband light curve.
Each calibrated spectrum is saved to a text file that corresponds to the original filename prefixed by ``photcal_``.
Parameters
----------
spectra : list
List of filenames containing the spectra
lc : lightcurve_fitting.lightcurve.LC
Photometry table containing the observed light curve
filters : list, optional
Only use this subset of filters for calibration
order : int, optional
Polynomial order for the calibration function. Default: 0 (constant factor)
subtract_percentile : float, optional
Subtract flux corresponding to this percentile of the spectrum before calibration. Default: no subtraction
max_extrapolate : float, optional
Assume constant flux in a filter for this many days after the last observed point. Default: 1 day.
show : bool, optional
Plot the observed light curve and the uncalibrated and calibrated spectra, and ask whether to save the results
Returns
-------
fig : matplotlib.pyplot.Figure
If `show=True`, the Figure object is returned
"""
if filters is not None:
lc = lc.where(filter=filters)
lc.calcFlux()
lc.sort('MJD')
filts = set(lc['filter'])
for filt in filts:
filt.trans.sort('freq')
plt.ion()
fig = plt.figure(figsize=(8., 6.))
for spec in spectra:
wl, flux, time, _, _ = readspec(spec)
mjd = time.mjd
if show:
fig.clf()
ax1 = plt.subplot(211)
lc.plot(xcol='MJD', ycol='flux', offset_factor=0)
ax1.axvline(mjd)
ax1.set_xlabel('MJD')
ax1.set_ylabel('$F_\\nu$ (W Hz$^{-1}$)')
ax2 = plt.subplot(212)
good = ~np.isnan(flux)
lam = wl[good] * u.angstrom
Flam = flux[good] * u.erg / u.s / u.angstrom / u.cm**2
nu = const.c / lam
Fnu = (Flam * lam / nu).to(u.W / u.Hz / u.m**2).value[::-1]
nu = nu.to(u.THz).value[::-1]
if subtract_percentile is not None:
Fnu -= np.nanpercentile(Fnu, subtract_percentile)
freqs = []
ratios = []
for filt in filts:
freq0 = filt.freq_eff.value - filt.freq_range[0]
freq1 = filt.freq_range[1] + filt.freq_eff.value
if freq1 < np.min(nu) or freq0 > np.max(nu):
print(filt, "and spectrum don't overlap")
continue # filter and spectrum don't overlap
lc_filt = lc.where(filter=filt, nondet=False)
if len(lc_filt) == 0 or mjd - np.max(lc_filt['MJD']) > max_extrapolate or mjd < np.min(lc_filt['MJD']):
print(filt, "not observed before and after spectrum")
continue
flux_lc = np.interp(mjd, lc_filt['MJD'], lc_filt['flux'])
trans_interp = np.interp(nu, filt.trans['freq'], filt.trans['T_norm_per_freq'])
flux_spec = np.trapz(Fnu * trans_interp, nu) / np.trapz(trans_interp, nu)
ratio = flux_lc / flux_spec
if show:
ax2.axvspan(freq0, freq1, color=filt.color, alpha=0.2)
ax2.plot(filt.freq_eff, flux_lc, marker='o', zorder=5, **filt.plotstyle)
ratios.append(ratio)
freqs.append(filt.freq_eff.value)
if not ratios:
print('no filters for', spec)
if show:
plt.close(fig)
continue
scale = np.mean(ratios)
if order:
p = np.polyfit(freqs, np.array(ratios) / scale, order)
corr = np.polyval(p, nu) * scale
print(spec, scale, p[:-1])
else:
corr = np.array([scale])
print(spec, scale)
if show:
ax2.plot(nu, Fnu * scale, label='rescaled')
ax2.set_xlabel('Frequency (THz)')
ax2.set_ylabel('$F_\\nu$ (W Hz$^{-1}$)')
if order:
ax2.plot(nu, Fnu * corr, color='C2', label='rescaled & warped')
plt.legend(loc='best')
plt.pause(0.1)
ans = input('accept this scale? [Y/n] ')
if not show or ans.lower() != 'n':
data_out = np.array([wl[good], flux[good] * corr[::-1]]).T
path_in, filename_in = os.path.split(spec)
filename_out = os.path.join(path_in, 'photcal_' + filename_in).replace('.fits', '.txt')
np.savetxt(filename_out, data_out, fmt='%.1f %.2e')
print(filename_out)
if show:
return fig
[docs]
def create_wiserep_tsv(specpaths, wiserep_dir, verbose=False, instruments=None, date_fmt='iso'):
"""
Prepares a TSV file for uploading spectra to WISeREP (see https://www.wiserep.org/content/wiserep-getting-started).
Also copies all the spectrum files to a single directory, and converts any FITS files to ASCII.
Parameters
----------
specpaths: list
List of paths to the spectrum files. Optionally, data quality can be specified in a tuple, e.g.,
[(filename1.fits, 3), (filename2.txt, 2), ...], where the second element is 1 (low), 2 (medium), or 3 (high).
wiserep_dir: str
Directory in which to collect the spectra. If it already exists, it will be deleted and remade.
The TSV file will be saved alongside this directory using the same name but with a '.tsv' extension.
verbose: bool, optional
If True, print a message when any file is copied or created.
instruments: dict, optional
A dictionary of known instruments. The keys are the instrument from the header and the values are
the wiserep instrument ids from https://www.wiserep.org/aux. By default the script builds this dictionary
on the fly.
date_fmt: str, optional
Format for the Obs-date column in the output. WISeREP accepts 'iso' (default) or 'jd'.
"""
# prepare the output directory
if os.path.exists(wiserep_dir):
ans = input(f'Are you sure you want to delete the directory {wiserep_dir}? [y/N] ')
if ans.lower() != 'y':
return
shutil.rmtree(wiserep_dir)
os.mkdir(wiserep_dir)
# collect the metadata
bibcode = input('bibcode: ')
rows = []
if instruments is None:
instruments = {}
for specpath in specpaths:
if isinstance(specpath, tuple):
specpath, quality = specpath
quality = min(max(round(quality), 1), 3) # forces it to be 1, 2, or 3
else:
quality = 2 # medium quality
specfile = os.path.split(specpath)[-1]
ascii_file = specfile.replace('.fits', '.txt').replace('.csv', '.txt')
print() # empty line between spectra
wl, flux, date, tel, inst, hdr = readspec(specpath, verbose=True, return_header=True)
groups = input('https://www.wiserep.org/groups\ngroup IDs (comma sep.): ')
if inst not in instruments:
inst_id = input(f'https://www.wiserep.org/aux\nlook up instrument ID for {inst} (required): ')
if inst and inst_id:
instruments[inst] = int(inst_id)
else:
inst_id = instruments[inst]
row = [
ascii_file,
specfile if specfile.endswith('.fits') else None,
date if date is not None else Time(np.ma.array(np.ma.masked), format='mjd'), # a masked time
inst_id,
hdr.get('exptime'),
{'angstrom': 11, 'nm': 12, 'um': 13}.get(hdr.get('CUNIT1', hdr.get('XUNITS', 'angstrom')).lower()),
1, # wavelength in air, hardcoded for now
1., # erg/cm2/s/Å, hardcoded for now
6, # erg/cm2/s/Å, hardcoded for now
2 if specfile.startswith('photcal') else 1, # method of flux calibration (1 = std star, 2 = photometry)
0, # not extinction corrected
hdr.get('OBSERVER', 'Unknown'),
hdr.get('REDUCER'),
None, # reduction date
float(hdr['APERWID']) if 'APERWID' in hdr else None,
hdr.get('DICHROIC'),
hdr.get('GRISM'),
hdr.get('GRATING'),
float(hdr['BLAZE']) if 'BLAZE' in hdr else None,
float(hdr['AIRMASS']) if 'AIRMASS' in hdr else None,
hdr.get('HA') or None,
10, # spectrum of object
quality, # spectrum quality
0., # spectrum proprietary period
'days', # prop. period units
groups,
None, # comments
bibcode or None,
None, # contributor
None, # related files
None, # related files
None, # related files
None, # related files
]
rows.append(row)
# copy and convert files
if not specfile.endswith('.csv'):
shutil.copy(specpath, wiserep_dir)
if verbose:
print(f'copied {specfile} to {wiserep_dir}')
if specfile.endswith('.fits') or specfile.endswith('.csv'):
data_out = np.transpose([wl, flux])
np.savetxt(os.path.join(wiserep_dir, ascii_file), data_out, fmt=('%f', '%e'), header=hdr.__repr__())
if verbose:
print(f'wrote {wiserep_dir}/{ascii_file}')
# create the table and write the TSV file
t = Table(rows=rows, names=[
'Ascii-filename*',
'FITS-filename*',
'Obs-date* [YYYY-MM-DD HH:MM:SS] / JD',
'Instrument-Id*',
'Exp-time (sec)',
'WL Units-id',
'WL Medium-Id',
'Flux Unit Coeff',
'Flux Units-Id',
'Flux Calib. By-Id',
'Extinction-Corrected-Id',
'Observer/s ',
'Reducer/s ',
'Reduction-date [YYYY-MM-DD HH:MM:SS] / JD',
'Aperture (Slit)',
'Dichroic',
'Grism',
'Grating',
'Blaze',
'Airmass',
'Hour Angle',
'Spec Type-Id',
'Spec Quality-Id',
'Spec. Prop-period value',
'Prop-period units',
'Assoc. Groups',
'Spec-Remarks',
'Publish (bibcode)',
'Contrib',
'Related-file1',
'RF1 Comments',
'Related-file2',
'RF2 Comments'
], meta={'comments': ['TSV-type:\tspectra']})
t['Obs-date* [YYYY-MM-DD HH:MM:SS] / JD'].format = date_fmt
# use the lower-level interface to add the second line with the defaults
writer = ascii.get_writer(ascii.Tab, fast_writer=False, comment='',
fill_values=[('None', 'NULL'), ('', 'NULL'), ('UNKNOWN', 'NULL')])
lines = writer.write(t)
lines.insert(2, '\t\t\t\tNULL\t[default=11 (Angstrom)]\t[default=1 (Air)]\t[default=1.0]\t[default=6]\tNULL\tNULL\t[Unknown]\tNULL\tNULL\tNULL\tNULL\tNULL\tNULL\tNULL\tNULL\tNULL\t[default=10=Object]\tNULL\tNULL\t[days/months/years]\t[Comma delim.]\tNULL\tNULL\tNULL\tNULL\tNULL\tNULL\tNULL')
with open(wiserep_dir + '.tsv', 'w') as f:
f.write('\n'.join(lines) + '\n')
if verbose:
print(f'\nwrote {wiserep_dir}.tsv')
return t
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Calibrate spectra to photometry.')
parser.add_argument('spectra', nargs='+', help='filenames of spectra')
parser.add_argument('--lc', help='filename of photometry table (must have columns "MJD", "filter", "mag"/"flux", and'
'"dmag"/"dflux")')
parser.add_argument('--lc-format', default='ascii',
help='format of photometry table (passed to :func:`astropy.table.Table.read`)')
parser.add_argument('-f', '--filters', nargs='+', help='filters to use for calibration')
parser.add_argument('-o', '--order', type=int, default=0, help='polynomial order of correction function')
parser.add_argument('--subtract-percentile', type=float, help='subtract continuum from spectrum before correcting')
parser.add_argument('--max-extrapolate', type=float, default=1.,
help='assume constant flux in a filter for this many days after the last observed point')
parser.add_argument('--show', action='store_true')
args = parser.parse_args()
lc = LC.read(args.lc, format=args.format)
calibrate_spectra(args.spectra, lc, args.filters, args.order, args.subtract_percentile, args.max_extrapolate,
args.show)