#!/usr/bin/env python
"""
specter.throughput
==================
A class for tracking throughput.
Hacks:
- Doesn't support spatial variation of input sources.
- Doesn't support per-fiber throughput.
- Do I really want to impose a clumsy ObjType ENUM?
How to handle fiber size and sky units?
"""
from __future__ import absolute_import, division, print_function, unicode_literals
import sys
import os
import warnings
import numbers
import numpy as np
from astropy.io import fits
from specter import util
#- ObjType enum
class ObjType:
STAR = 'STAR'
STD = 'STD'
GALAXY = 'GALAXY' #- pretty meaningless for fiberloss, ok for others
ELG = 'ELG'
LRG = 'LRG'
QSO = 'QSO'
SKY = 'SKY'
CALIB = 'CALIB'
[docs]def load_throughput(filename):
"""
Create Throughput object from FITS file with EXTNAME=THROUGHPUT HDU
"""
#- memmap=False so that fits will really close the file upon fx.close()
fx = fits.open(filename, memmap=False)
thru = fx['THROUGHPUT'].data
hdr = fx['THROUGHPUT'].header
#- Check for FIBERINPUT HDU
if 'FIBERINPUT' in fx:
tmp = fx['FIBERINPUT'].data
assert(len(tmp) == len(thru))
fiberinput = dict()
for key in tmp.dtype.names:
fiberinput[key.upper()] = tmp[key]
else:
print("no FIBERINPUT extention found")
fiberinput = thru['fiberinput']
if 'wavelength' in thru.dtype.names:
w = thru['wavelength']
elif 'loglam' in thru.dtype.names:
w = 10**thru['loglam']
else:
fx.close()
raise ValueError('throughput must include wavelength or loglam')
if 'GEOMAREA' in hdr:
area = hdr['GEOMAREA']
elif 'EFFAREA' in hdr:
area = hdr['EFFAREA'] #- misnomer, but for backwards compatibility
elif 'AREA' in hdr:
area = hdr['AREA']
else:
fx.close()
raise ValueError("throughput file missing GEOMAREA keyword")
fx.close()
return Throughput(
wave = w,
throughput = thru['throughput'],
extinction = thru['extinction'],
fiberinput = fiberinput,
exptime = hdr['EXPTIME'],
area = area,
fiberdia = hdr['FIBERDIA'],
)
class Throughput:
def __init__(self, wave, throughput, extinction,
exptime, area, fiberdia, fiberinput=None):
"""
Create Throughput object
Inputs
------
wave : wavelength array [Angstroms]
throughput : array of system throughput for elements which apply to
all types of sources, e.g. mirrors, lenses, CCD efficiency
extinction : atmospheric extinction array [mags/airmass]
exptime: float, default exposure time [sec]
area: float, input geometric area [cm^2]
fiberdia: float, fiber diameter [arcsec]
Optional Inputs
---------------
fiberinput : float, array, or dictionary of arrays keyed by objtype.
Geometric throughput due to finite sized fiber input.
Default to no loss = 1.0.
Notes
-----
fiberinput is a placeholder, since it really depends upon the
spatial extent of the object and the seeing.
"""
self._wave = np.copy(wave)
self._thru = np.copy(throughput)
self._extinction = np.copy(extinction)
self.exptime = float(exptime)
self.area = float(area)
self.fiberdia = float(fiberdia)
#- Flux -> photons conversion constant
#- h [erg s] * c [m/s] * [1e10 A/m] = [erg A]
self._hc = 6.62606957e-27 * 2.99792458e8 * 1e10
#- Create fiber input dict keyed by object type, including 'default'
if fiberinput is not None:
if isinstance(fiberinput, numbers.Real):
self._fiberinput = dict(default=np.ones(len(wave)) * fiberinput)
elif isinstance(fiberinput, np.ndarray):
self._fiberinput = dict(default=np.copy(fiberinput))
elif isinstance(fiberinput, dict):
self._fiberinput = fiberinput
if 'default' not in fiberinput:
self._fiberinput['default'] = np.ones(len(wave))
else:
raise ValueError('Unrecognized type for fiberinput: {}'.format(type(fiberinput)))
else:
self._fiberinput = dict(default=np.ones(len(wave)))
#- special cases: QSO and STD are STAR for fiber input losses
if 'STAR' in self._fiberinput and 'STD' not in self._fiberinput:
self._fiberinput['STD'] = self._fiberinput['STAR']
if 'STAR' in self._fiberinput and 'QSO' not in self._fiberinput:
self._fiberinput['QSO'] = self._fiberinput['STAR']
@property
def fiberarea(self):
"""Average fiber area [arcsec^2] used for fiber input calculations"""
return np.pi * self.fiberdia**2 / 4.0
def extinction(self, wavelength):
"""
Return atmospheric extinction [magnitudes/airmass]
evaluated at wavelength (float or array)
"""
return np.interp(wavelength, self._wave, self._extinction)
def atmospheric_throughput(self, wavelength, airmass=1.0):
"""
Return atmospheric throughput [0 - 1] at given airmass,
evaluated at wavelength (float or array)
"""
ext = self.extinction(wavelength)
return 10**(-0.4 * airmass * ext)
def fiberinput_throughput(self, wavelength=None, objtype=ObjType.STAR):
"""
Return fiber input geometric throughput [0 - 1]
evaluated at wavelength (float or array).
If wavelength is None, do not interpolate.
"""
if objtype in self._fiberinput:
t = self._fiberinput[objtype]
else:
msg = 'Unknown objtype {}; using default fiber input loss'.format(objtype)
msg += '\nKnown objtypes are '+str(list(self._fiberinput.keys()))
warnings.warn(msg)
t = self._fiberinput['default']
if wavelength is None:
return t
else:
return np.interp(wavelength, self._wave, t)
def hardware_throughput(self, wavelength):
"""
Return hardware throughput (optics, fiber run, CCD, but
not including atmosphere or geometric fiber input losses)
evaluated at wavelength (float or array)
"""
return np.interp(wavelength, self._wave, self._thru, left=0.0, right=0.0)
def _throughput(self, objtype=ObjType.STAR, airmass=1.0):
"""
Returns system throughput for this object type and airmass
at the native wavelengths of this Throughput object (self._wave)
objtype may be any of the ObjType enumerated types
CALIB : atmospheric extinction and fiber input losses not applied
SKY : fiber input losses are not applied
other : all throughput losses are applied
"""
objtype = objtype.strip().upper()
Tatm = 10**(-0.4*airmass*self._extinction)
if objtype == ObjType.CALIB:
T = self._thru
elif objtype == ObjType.SKY:
T = self._thru * Tatm
else:
Tfiber = self.fiberinput_throughput(wavelength=None, objtype=objtype)
T = self._thru * Tatm * Tfiber
return T
def __call__(self, wavelength, objtype=ObjType.STAR, airmass=1.0):
"""
Returns system throughput at requested wavelength(s)
objtype may be any of the ObjType enumerated types
CALIB : atmospheric extinction and fiber input losses not applied
SKY : fiber input losses are not applied
other : all throughput losses are applied
"""
T = self._throughput(objtype=objtype, airmass=airmass)
return np.interp(wavelength, self._wave, T, left=0.0, right=0.0)
def thru(self, *args, **kwargs):
"""
same as calling self(*args, **kwargs)
"""
return self(*args, **kwargs)
def photons(self, wavelength, flux, units="erg/s/cm^2/A", \
objtype="STAR", exptime=None, airmass=1.0):
"""
Returns photons per bin given input flux vs. wavelength,
flux units, object type, exposure time, and airmass.
NOTE: Returns raw photons per wavelength bin,
*not* photons/A sampled at these wavelengths
Inputs
------
wavelength : input wavelength array in Angstroms
flux : input flux; same length as `wavelength`
units : units of `flux`
* Treated as delta functions at each given wavelength:
- "photons"
- "erg/s/cm^2"
* Treated as function values to be multipled by bin width:
- "photon/A"
- "erg/s/cm^2/A"
- "erg/s/cm^2/A/arcsec^2"
"photon" and "photon/A" are as observed by CCD and are returned
without applying throughput terms.
Optional Inputs
---------------
objtype : string, optional; object type for Throughput object.
SKY - atmospheric extinction and telescope+instrument throughput
applied, but not fiber input geometric losses.
CALIB - telescope+instrument throughtput applied, but not
atmospheric extinction or fiber input geometric losses.
Anything else (default) - treated as astronomical object
with all throughput terms applied.
exptime : float, optional; exposure time, default self.exptime
airmass : float, optional, default 1.0
Returns
-------
array of number of photons observed by CCD at each wavelength,
i.e. not per-Angstrom.
Stephen Bailey, LBL
May 2013
"""
#- Wavelength bin size
dw = np.gradient(wavelength)
#- Standardize units; allow some sloppiness
units = units.strip() #- FITS pads short strings with spaces (!)
units = units.replace("ergs", "erg")
units = units.replace("photons", "photon")
units = units.replace("Angstroms", "A")
units = units.replace("Angstrom", "A")
units = units.replace("Ang", "A")
units = units.replace("**", "^")
units = units.replace("cm2", "cm^2")
units = units.replace("arcsec2", "arcsec^2")
#- Check for units prefactor like "1e-17 erg/s/cm^2/A"
scale = 1.0
tmp = units.split()
if len(tmp) == 2:
try:
scale = float(tmp[0])
flux = flux * scale
units = tmp[1]
except ValueError:
raise ValueError("Non-numeric units scale factor {}".format(tmp[0]))
#- Default exposure time
if exptime is None:
exptime = self.exptime
#- Input photons; return photons per bin (not photons per Angstrom)
if units == "photon":
return flux
elif units == "photon/A":
return flux * dw
#- Sanity check on units
if not units.startswith('erg'):
raise ValueError("Unrecognized units {}".format(units))
#- If we got here, we need to apply throughputs
flux = self.apply_throughput(wavelength, flux,
objtype=objtype, airmass=airmass)
#- Convert to photons
phot = flux * wavelength / self._hc
#- erg/s/cm^2/A (i.e. Flambda, astronomical object)
if units == "erg/s/cm^2/A":
return phot * exptime * self.area * dw
#- erg/s/cm^2/A/arcsec^2 (e.g. sky)
elif units == "erg/s/cm^2/A/arcsec^2":
return phot * exptime * self.area * dw * self.fiberarea
#- erg/s/cm^2 (not per A; flux delta functions at given wavelengths)
elif units == "erg/s/cm^2":
return phot * exptime * self.area
#- erg/s/cm^2/arcsec^2 (not per A; intensity delta functions)
elif units == "erg/s/cm^2/arcsec^2":
return phot * exptime * self.area * self.fiberarea
else:
raise ValueError("Unrecognized units {}".format(units))
def apply_throughput(self, wavelength, flux, objtype="STAR", airmass=1.0):
"""
Returns flux array with throughputs applied for given
objtype and airmass.
TODO: this is a simple throughput model that can be wrong if there
is meaningful structure smaller than the wavelength sampling.
"""
if flux.ndim == 1 or isinstance(objtype, str):
thru = self.thru(wavelength, objtype=objtype, airmass=airmass)
return flux * thru
else:
assert flux.ndim == 2
assert flux.shape[0] == len(objtype)
t = dict()
objtype = np.array(objtype)
outflux = flux.copy()
for xt in set(objtype):
thru = self.thru(wavelength, objtype=xt, airmass=airmass)
ii = np.where(objtype == xt)[0]
outflux[ii] *= thru
return outflux
@property
def wavemin(self):
"""Minimum wavelength [Angstroms] covered by this throughput model"""
return self._wave[0]
@property
def wavemax(self):
"""Maximum wavelength [Angstroms] covered by this throughput model"""
return self._wave[-1]