"""
Demonstrate aXe trace polynomials.
Initial code taken from `(Brammer, Pirzkal, & Ryan 2014) <https://github.com/WFC3Grism/CodeDescription>`_, which contains a detailed
explanation how the grism configuration parameters and coefficients are defined and evaluated.
"""
import os
from collections import OrderedDict
import numpy as np
from . import GRIZLI_PATH, utils
from .jwst_utils import crds_reffiles
DEFAULT_CRDS_CONTEXT = 'jwst_1123.pmap'
NIRCAM_CONF_VERSION = 'V8.5'
if os.getenv('NIRCAM_CONF_VERSION') is not None:
NIRCAM_CONF_VERSION = os.getenv('NIRCAM_CONF_VERSION')
print('Use NIRCAM_CONF_VERSION={NIRCAM_CONF_VERSION}')
[docs]def show_available_nircam_versions(filter='F444W', module='B', grism='R', verbose=True):
"""
Show all available versions of the NIRCAM Grism config files
"""
import glob
files = glob.glob(os.path.join(GRIZLI_PATH,
f'CONF/GRISM_NIRCAM/*/NIRCAM_{filter}_mod{module}_{grism}.conf'))
files.sort()
versions = []
for file in files:
versions.append(file.split('/')[-2])
if verbose:
print(f'{versions[-1]:>8} {file}')
return versions
[docs]class aXeConf():
def __init__(self, conf_file='WFC3.IR.G141.V2.5.conf'):
"""Read an aXe-compatible configuration file
Parameters
----------
conf_file: str
Filename of the configuration file to read
"""
if conf_file is not None:
self.conf = self.read_conf_file(conf_file)
self.conf_dict = self.conf
self.conf_file = conf_file
self.count_beam_orders()
# Global XOFF/YOFF offsets
if 'XOFF' in self.conf.keys():
self.xoff = float(self.conf['XOFF'])
else:
self.xoff = 0.
if 'YOFF' in self.conf.keys():
self.yoff = float(self.conf['YOFF'])
else:
self.yoff = 0.
[docs] def read_conf_file(self, conf_file='WFC3.IR.G141.V2.5.conf'):
"""Read an aXe config file, convert floats and arrays
Parameters
----------
conf_file: str
Filename of the configuration file to read.
Parameters are stored in an OrderedDict in `self.conf`.
"""
conf = OrderedDict()
fp = open(conf_file)
lines = fp.readlines()
fp.close()
for line in lines:
# empty / commented lines
if (line.startswith('#')) | (line.strip() == '') | ('"' in line):
continue
# split the line, taking out ; and # comments
spl = line.split(';')[0].split('#')[0].split()
param = spl[0]
if len(spl) > 2:
value = np.cast[float](spl[1:])
else:
try:
value = float(spl[1])
except:
value = spl[1]
conf[param] = value
return conf
[docs] def count_beam_orders(self):
"""Get the maximum polynomial order in DYDX or DLDP for each beam
"""
self.orders = {}
for beam in ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J']:
order = 0
while 'DYDX_{0:s}_{1:d}'.format(beam, order) in self.conf.keys():
order += 1
while 'DLDP_{0:s}_{1:d}'.format(beam, order) in self.conf.keys():
order += 1
self.orders[beam] = order-1
[docs] def get_beams(self):
"""Get beam parameters and read sensitivity curves
"""
import os
from astropy.table import Table, Column
self.dxlam = OrderedDict()
self.nx = OrderedDict()
self.sens = OrderedDict()
self.beams = []
for beam in self.orders:
if self.orders[beam] > 0:
self.beams.append(beam)
self.dxlam[beam] = np.arange(self.conf['BEAM{0}'.format(beam)].min(), self.conf['BEAM{0}'.format(beam)].max(), dtype=int)
self.nx[beam] = int(self.dxlam[beam].max()-self.dxlam[beam].min())+1
self.sens[beam] = Table.read('{0}/{1}'.format(os.path.dirname(self.conf_file), self.conf['SENSITIVITY_{0}'.format(beam)]))
#self.sens[beam].wave = np.cast[np.double](self.sens[beam]['WAVELENGTH'])
#self.sens[beam].sens = np.cast[np.double](self.sens[beam]['SENSITIVITY'])
# Need doubles for interpolating functions
for col in self.sens[beam].colnames:
data = np.cast[np.double](self.sens[beam][col])
self.sens[beam].remove_column(col)
self.sens[beam].add_column(Column(data=data, name=col))
# Scale BEAM F
if (beam == 'F') & ('G141' in self.conf_file):
self.sens[beam]['SENSITIVITY'] *= 0.35
if (beam == 'B') & ('G141' in self.conf_file):
if self.conf['SENSITIVITY_B'] == 'WFC3.IR.G141.0th.sens.1.fits':
self.sens[beam]['SENSITIVITY'] *= 2
# wave = np.cast[np.double](self.sens[beam]['WAVELENGTH'])
# sens = np.cast[np.double](self.sens[beam]['SENSITIVITY']
# self.sens[beam]['WAVELENGTH'] = np.cast[np.double](self.sens[beam]['WAVELENGTH'])
# self.sens[beam]['SENSITIVITY'] = )
self.beams.sort()
[docs] def remove_beam(self, beam):
"""
Remove a beam definition
"""
if beam in self.beams:
ix = self.beams.index(beam)
_ = self.beams.pop(ix)
[docs] def reset_beam_list(self):
"""
Reset full beam list, perhaps after removing some with ``remove_beam``
"""
for beam in self.orders:
if self.orders[beam] > 0:
self.beams.append(beam)
[docs] def field_dependent(self, xi, yi, coeffs):
"""aXe field-dependent coefficients
See the `aXe manual <http://axe.stsci.edu/axe/manual/html/node7.html#SECTION00721200000000000000>`_ for a description of how the field-dependent coefficients are specified.
Parameters
----------
xi, yi : float or array-like
Coordinate to evaluate the field dependent coefficients, where
`xi = x-REFX` and `yi = y-REFY`.
coeffs : array-like
Field-dependency coefficients
Returns
-------
a : float or array-like
Evaluated field-dependent coefficients
"""
# number of coefficients for a given polynomial order
# 1:1, 2:3, 3:6, 4:10, order:order*(order+1)/2
if hasattr(coeffs, '__len__'):
order = int(-1+np.sqrt(1+8*len(coeffs))) // 2
else:
order = 1
# Build polynomial terms array
# $a = a_0+a_1x_i+a_2y_i+a_3x_i^2+a_4x_iy_i+a_5yi^2+$ ...
xy = []
for _p in range(order):
for _py in range(_p+1):
# print 'x**%d y**%d' %(p-py, px)
xy.append(xi**(_p - _py) * yi**(_py))
# Evaluate the polynomial, allowing for N-dimensional inputs
a = np.sum((np.array(xy).T*coeffs).T, axis=0)
return a
[docs] def evaluate_dp(self, dx, dydx):
"""Evalate arc length along the trace given trace polynomial coefficients
Parameters
----------
dx : array-like
x pixel to evaluate
dydx : array-like
Coefficients of the trace polynomial
Returns
-------
dp : array-like
Arc length along the trace at position `dx`.
For `dydx` polynomial orders 0, 1 or 2, integrate analytically.
Higher orders must be integrated numerically.
**Constant:**
.. math:: dp = dx
**Linear:**
.. math:: dp = \sqrt{1+\mathrm{DYDX}[1]}\cdot dx
**Quadratic:**
.. math:: u = \mathrm{DYDX}[1] + 2\ \mathrm{DYDX}[2]\cdot dx
.. math:: dp = (u \sqrt{1+u^2} + \mathrm{arcsinh}\ u) / (4\cdot \mathrm{DYDX}[2])
"""
# dp is the arc length along the trace
# $\lambda = dldp_0 + dldp_1 dp + dldp_2 dp^2$ ...
poly_order = len(dydx)-1
if (poly_order == 2):
if np.abs(np.unique(dydx[2])).max() == 0:
poly_order = 1
if poly_order == 0: # dy=0
dp = dx
elif poly_order == 1: # constant dy/dx
dp = np.sqrt(1+dydx[1]**2)*(dx)
elif poly_order == 2: # quadratic trace
u0 = dydx[1]+2*dydx[2]*(0)
dp0 = (u0*np.sqrt(1+u0**2)+np.arcsinh(u0))/(4*dydx[2])
u = dydx[1]+2*dydx[2]*(dx)
dp = (u*np.sqrt(1+u**2)+np.arcsinh(u))/(4*dydx[2])-dp0
else:
# high order shape, numerical integration along trace
# (this can be slow)
xmin = np.minimum((dx).min(), 0)
xmax = np.maximum((dx).max(), 0)
xfull = np.arange(xmin, xmax)
dyfull = 0
for i in range(1, poly_order):
dyfull += i*dydx[i]*(xfull-0.5)**(i-1)
# Integrate from 0 to dx / -dx
dpfull = xfull*0.
lt0 = xfull < 0
if lt0.sum() > 1:
dpfull[lt0] = np.cumsum(np.sqrt(1+dyfull[lt0][::-1]**2))[::-1]
dpfull[lt0] *= -1
#
gt0 = xfull > 0
if gt0.sum() > 0:
dpfull[gt0] = np.cumsum(np.sqrt(1+dyfull[gt0]**2))
dp = np.interp(dx, xfull, dpfull)
if dp[-1] == dp[-2]:
dp[-1] = dp[-2]+np.diff(dp)[-2]
return dp
[docs] def get_beam_trace(self, x=507, y=507, dx=0., beam='A', fwcpos=None):
"""Get an aXe beam trace for an input reference pixel and list of output x pixels `dx`
Parameters
----------
x, y : float or array-like
Evaluate trace definition at detector coordinates `x` and `y`.
dx : float or array-like
Offset in x pixels from `(x,y)` where to compute trace offset and
effective wavelength
beam : str
Beam name (i.e., spectral order) to compute. By aXe convention,
`beam='A'` is the first order, 'B' is the zeroth order and
additional beams are the higher positive and negative orders.
fwcpos : None or float
For NIRISS, specify the filter wheel position to compute the
trace rotation
Returns
-------
dy : float or array-like
Center of the trace in y pixels offset from `(x,y)` evaluated at
`dx`.
lam : float or array-like
Effective wavelength along the trace evaluated at `dx`.
"""
NORDER = self.orders[beam]+1
xi, yi = x-self.xoff, y-self.yoff
xoff_beam = self.field_dependent(xi, yi,
self.conf['XOFF_{0}'.format(beam)])
yoff_beam = self.field_dependent(xi, yi,
self.conf['YOFF_{0}'.format(beam)])
# y offset of trace (DYDX)
dydx = np.zeros(NORDER) # 0 #+1.e-80
dydx = [0]*NORDER
for i in range(NORDER):
if 'DYDX_{0:s}_{1:d}'.format(beam, i) in self.conf.keys():
coeffs = self.conf['DYDX_{0:s}_{1:d}'.format(beam, i)]
dydx[i] = self.field_dependent(xi, yi, coeffs)
# $dy = dydx_0+dydx_1 dx+dydx_2 dx^2+$ ...
dy = yoff_beam
for i in range(NORDER):
dy += dydx[i]*(dx-xoff_beam)**i
# wavelength solution
dldp = np.zeros(NORDER)
dldp = [0]*NORDER
for i in range(NORDER):
if 'DLDP_{0:s}_{1:d}'.format(beam, i) in self.conf.keys():
coeffs = self.conf['DLDP_{0:s}_{1:d}'.format(beam, i)]
dldp[i] = self.field_dependent(xi, yi, coeffs)
self.eval_input = {'x': x, 'y': y, 'beam': beam, 'dx': dx,
'fwcpos': fwcpos}
self.eval_output = {'xi': xi, 'yi': yi, 'dldp': dldp, 'dydx': dydx,
'xoff_beam': xoff_beam, 'yoff_beam': yoff_beam,
'dy': dy}
dp = self.evaluate_dp(dx-xoff_beam, dydx)
# ## dp is the arc length along the trace
# ## $\lambda = dldp_0 + dldp_1 dp + dldp_2 dp^2$ ...
# if self.conf['DYDX_ORDER_%s' %(beam)] == 0: ## dy=0
# dp = dx-xoff_beam
# elif self.conf['DYDX_ORDER_%s' %(beam)] == 1: ## constant dy/dx
# dp = np.sqrt(1+dydx[1]**2)*(dx-xoff_beam)
# elif self.conf['DYDX_ORDER_%s' %(beam)] == 2: ## quadratic trace
# u0 = dydx[1]+2*dydx[2]*(0)
# dp0 = (u0*np.sqrt(1+u0**2)+np.arcsinh(u0))/(4*dydx[2])
# u = dydx[1]+2*dydx[2]*(dx-xoff_beam)
# dp = (u*np.sqrt(1+u**2)+np.arcsinh(u))/(4*dydx[2])-dp0
# else:
# ## high order shape, numerical integration along trace
# ## (this can be slow)
# xmin = np.minimum((dx-xoff_beam).min(), 0)
# xmax = np.maximum((dx-xoff_beam).max(), 0)
# xfull = np.arange(xmin, xmax)
# dyfull = 0
# for i in range(1, NORDER):
# dyfull += i*dydx[i]*(xfull-0.5)**(i-1)
#
# ## Integrate from 0 to dx / -dx
# dpfull = xfull*0.
# lt0 = xfull <= 0
# if lt0.sum() > 1:
# dpfull[lt0] = np.cumsum(np.sqrt(1+dyfull[lt0][::-1]**2))[::-1]
# dpfull[lt0] *= -1
# #
# gt0 = xfull >= 0
# if gt0.sum() > 0:
# dpfull[gt0] = np.cumsum(np.sqrt(1+dyfull[gt0]**2))
#
# dp = np.interp(dx-xoff_beam, xfull, dpfull)
# Evaluate dldp
lam = dp*0.
for i in range(NORDER):
lam += dldp[i]*dp**i
# NIRISS rotation?
if fwcpos is not None:
if 'FWCPOS_REF' not in self.conf.keys():
print('Parameter fwcpos={0} supplied but no FWCPOS_REF in {1:s}'.format(fwcpos, self.conf_file))
return dy, lam
order = self.conf['DYDX_ORDER_{0}'.format(beam)]
if order != 2:
print('ORDER={0:d} not supported for NIRISS rotation'.format(order))
return dy, lam
theta = (fwcpos - self.conf['FWCPOS_REF'])/180*np.pi*1
theta *= -1 # DMS rotation
# print('DMS')
if theta == 0:
return dy, lam
# For the convention of swapping/inverting axes for GR150C
# if 'GR150C' in self.conf_file:
# theta = -theta
# If theta is small, use a small angle approximation.
# Otherwise, 1./tan(theta) blows up and results in numerical
# noise.
xp = (dx-xoff_beam)/np.cos(theta)
if (1-np.cos(theta) < 5.e-8):
#print('Approximate!', xoff_beam, np.tan(theta))
dy = dy + (dx-xoff_beam)*np.tan(theta)
delta = 0.
# print('Approx')
else:
# Full transformed trace coordinates
c = dydx
#print('Not approx')
beta = c[1]+2*c[2]*xp-1/np.tan(theta)
chi = c[0]+c[1]*xp+c[2]*xp**2
if theta < 0:
psi = (-beta+np.sqrt(beta**2-4*c[2]*chi))
psi *= 1./2/c[2]/np.tan(theta)
delta = psi*np.tan(theta)
dy = dx*np.tan(theta) + psi/np.cos(theta)
else:
psi = (-beta-np.sqrt(beta**2-4*c[2]*chi))
psi *= 1./2/c[2]/np.tan(theta)
delta = psi*np.tan(theta)
dy = dx*np.tan(theta) + psi/np.cos(theta)
# Evaluate wavelength at 'prime position along the trace
dp = self.evaluate_dp(xp+delta, dydx)
lam = dp*0.
for i in range(NORDER):
lam += dldp[i]*dp**i
return dy, lam
[docs] def show_beams(self, xy=None, beams=['E', 'D', 'C', 'B', 'A']):
"""
Make a demo plot of the beams of a given configuration file
"""
import matplotlib.pyplot as plt
x0, x1 = 507, 507
dx = np.arange(-800, 1200)
if 'WFC3.UV' in self.conf_file:
x0, x1 = 2073, 250
dx = np.arange(-1200, 1200)
if 'G800L' in self.conf_file:
x0, x1 = 2124, 1024
dx = np.arange(-1200, 1200)
if xy is not None:
x0, x1 = xy
s = 200 # marker size
fig, ax = plt.subplots(figsize=[10, 3])
ax.scatter(0, 0, marker='s', s=s, color='black', edgecolor='0.8',
label='Direct')
for beam in beams:
if 'XOFF_{0}'.format(beam) not in self.conf.keys():
continue
xoff = self.field_dependent(x0, x1, self.conf['XOFF_{0}'.format(beam)])
dy, lam = self.get_beam_trace(x0, x1, dx=dx, beam=beam)
xlim = self.conf['BEAM{0}'.format(beam)]
ok = (dx >= xlim[0]) & (dx <= xlim[1])
sc = ax.scatter(dx[ok]+xoff, dy[ok], c=lam[ok]/1.e4, marker='s', s=s,
alpha=0.5, edgecolor='None')
ax.text(np.median(dx[ok]), np.median(dy[ok])+1, beam,
ha='center', va='center', fontsize=14)
print('Beam {0}, lambda=({1:.1f} - {2:.1f})'.format(beam, lam[ok].min(), lam[ok].max()))
ax.grid()
ax.set_xlabel(r'$\Delta x$' + f' (x0={x0})')
ax.set_ylabel(r'$\Delta y$' + f' (y0={x1})')
cb = plt.colorbar(sc, pad=0.01, fraction=0.05)
cb.set_label(r'$\lambda\,(\mu\mathrm{m})$')
ax.set_title(self.conf_file)
fig.tight_layout(pad=0.1)
#plt.savefig('{0}.pdf'.format(self.conf_file))
return fig
[docs]def coeffs_from_astropy_polynomial(p):
"""
Get field-dependent coefficients in aXe format from an
`astropy.modeling.polynomial.Polynomial2D` model
Parameters
----------
p : `astropy.modeling.polynomial.Polynomial2D`
Polynomial model
Returns
-------
coeffs : array-like
Reordered array of coefficients
"""
coeffs = []
for _p in range(p.degree+1):
for _py in range(_p+1):
# print 'x**%d y**%d' %(_p-_py, _py)
_px = _p - _py
pname = f'c{_px}_{_py}'
if pname in p.param_names:
pix = p.param_names.index(pname)
coeffs.append(p.parameters[pix])
else:
coeffs.append(0.0)
return np.array(coeffs)
[docs]def get_config_filename(instrume='WFC3', filter='F140W',
grism='G141', pupil=None, module=None, chip=1,
use_jwst_crds=False, crds_context=DEFAULT_CRDS_CONTEXT):
"""Generate a config filename based on the instrument, filter & grism combination.
Config files assumed to be found the directory specified by the `$GRIZLI`
environment variable, i.e., `${GRIZLI}/CONF`.
Parameters
----------
instrume : {'ACS', 'WFC3', 'NIRISS', 'NIRCam', 'WFIRST'}
Instrument used
filter : str
Direct image filter. This is only used for WFC3/IR, where the grism
configuration files have been determined for each direct+grism
combination separately based on the filter wedge offsets of the
filters.
grism : str
Grism name. Valid combinations are the following:
ACS : G800L (assumed)
WFC3 : G102, G141
NIRISS : GR150R, GR150C
NIRCam : F322W2, F356W, F430M, F444W, F460M
WFIRST : (basic assumptions about the WFI grism)
chip : int
For ACS/WFC and UVIS, specifies the chip to use. Note that this
is switched with respect to the header EXTNAME extensions:
EXTVER = 1 is extension 1 / (SCI,1) of the flt/flc files but
corresponds to CCDCHIP = 2 and the ACS.WFC3.CHIP2 config files.
and
EXTVER = 2 is extension 4 / (SCI,2) of the flt/flc files but
corresponds to CCDCHIP = 1 and the ACS.WFC3.CHIP1 config files.
use_jwst_crds : bool
Use CRDS ``specwcs`` reference files for JWST instruments
Returns
-------
conf_file : str
String path of the configuration file.
"""
if instrume == 'ACS':
conf_file = os.path.join(GRIZLI_PATH,
'CONF/ACS.WFC.CHIP{0:d}.Stars.conf'.format(chip))
if not os.path.exists(conf_file):
conf_file = os.path.join(GRIZLI_PATH,
'CONF/ACS.WFC.CHIP{0:d}.Cycle13.5.conf'.format(chip))
if instrume == 'WFC3':
if grism == 'G280':
conf_file = os.path.join(GRIZLI_PATH, 'CONF/G280/',
'WFC3.UVIS.G280.cal/WFC3.UVIS.G280.CHIP{0:d}.V2.0.conf'.format(chip))
return conf_file
conf_file = os.path.join(GRIZLI_PATH,
'CONF/{0}.{1}.V4.32.conf'.format(grism, filter))
# When direct + grism combination not found for WFC3 assume F140W
if not os.path.exists(conf_file):
conf_file = os.path.join(GRIZLI_PATH,
'CONF/{0}.{1}.V4.32.conf'.format(grism, 'F140W'))
if instrume == 'NIRISS':
conf_files = []
conf_files.append(os.path.join(GRIZLI_PATH,
'CONF/{0}.{1}.221215.conf'.format(grism, filter)))
conf_files.append(os.path.join(GRIZLI_PATH,
'CONF/{0}.{1}.220725.conf'.format(grism, filter)))
conf_files.append(os.path.join(GRIZLI_PATH,
'CONF/{0}.{1}.conf'.format(grism, filter)))
conf_files.append(os.path.join(GRIZLI_PATH,
'CONF/NIRISS.{0}.conf'.format(filter)))
for conf_file in conf_files:
if os.path.exists(conf_file):
#print(f'NIRISS: {conf_file}')
break
else:
#print(f'skip NIRISS: {conf_file}')
pass
# if not os.path.exists(conf_file):
# print('CONF/{0}.{1}.conf'.format(grism, filter))
# conf_file = os.path.join(GRIZLI_PATH,
# 'CONF/NIRISS.{0}.conf'.format(filter))
# if instrume == 'NIRCam':
# conf_file = os.path.join(GRIZLI_PATH,
# 'CONF/aXeSIM_NC_2016May/CONF/NIRCam_LWAR_{0}.conf'.format(grism))
if instrume in ['NIRCAM']:
#conf_file = os.path.join(GRIZLI_PATH,
# f'CONF/NIRCam.A.{filter}.{grism}.conf')
fi = grism
gr = filter[-1] # R, C
# conf_file = os.path.join(GRIZLI_PATH,
# f'CONF/GRISM_NIRCAM/gNIRCAM.{fi}.mod{module}.{gr}.conf')
#
# conf_file = os.path.join(GRIZLI_PATH,
# f'CONF/GRISM_NIRCAM/V2/NIRCAM_{fi}_mod{module}_{gr}.conf')
# NIRCam preference: 8.5 > 8 > 4
conf_file_base = os.path.join(GRIZLI_PATH,
f'CONF/GRISM_NIRCAM/[[NIRCAM_VERSION]]/NIRCAM_{fi}_mod{module}_{gr}.conf')
_conf_versions = [NIRCAM_CONF_VERSION, 'V8.5', 'V8', 'V4', 'V6']
conf_file = None
for NIRCAM_VERSION in _conf_versions:
conf_file = conf_file_base.replace('[[NIRCAM_VERSION]]', NIRCAM_VERSION)
if os.path.exists(conf_file):
break
if conf_file is None:
raise ValueError
elif instrume == 'NIRCAMA':
fi = grism
gr = filter[-1] # R, C
conf_file = os.path.join(GRIZLI_PATH,
f'CONF/GRISM_NIRCAM/gNIRCAM.{fi}.modA.{gr}.conf')
#conf_file = os.path.join(GRIZLI_PATH,
# f'CONF/NIRCam.B.{filter}.{grism}.conf')
elif instrume == 'NIRCAMB':
fi = grism
gr = filter[-1] # R, C
conf_file = os.path.join(GRIZLI_PATH,
f'CONF/GRISM_NIRCAM/gNIRCAM.{fi}.modB.{gr}.conf')
#conf_file = os.path.join(GRIZLI_PATH,
# f'CONF/NIRCam.B.{filter}.{grism}.conf')
if (instrume in ['NIRCAM','NIRISS']) & use_jwst_crds:
if instrume == 'NIRCAM':
_pupil = filter
_filter = grism
else:
_pupil = filter
_filter = grism
refs = crds_reffiles(instrument=instrume,
filter=_filter,
pupil=_pupil,
module=module,
date=None,
reftypes=('photom', 'specwcs'),
header=None,
context=crds_context)
conf_file = refs['specwcs']
print('get_conf: xxx', conf_file, grism, filter, pupil, module, instrume)
if instrume == 'WFIRST':
conf_file = os.path.join(GRIZLI_PATH, 'CONF/WFIRST.conf')
if instrume == 'WFI':
conf_file = os.path.join(GRIZLI_PATH, 'CONF/Roman.G150.conf')
if instrume == 'SYN':
conf_file = os.path.join(GRIZLI_PATH, 'CONF/syn.conf')
# Euclid NISP, config files @
# http://www.astrodeep.eu/euclid-spectroscopic-simulations/
if instrume == 'NISP':
if grism == 'BLUE':
conf_file = os.path.join(GRIZLI_PATH, 'CONF/Euclid.Gblue.0.conf')
else:
conf_file = os.path.join(GRIZLI_PATH, 'CONF/Euclid.Gred.0.conf')
return conf_file
[docs]def load_grism_config(conf_file, warnings=True):
"""Load parameters from an aXe configuration file
Parameters
----------
conf_file : str
Filename of the configuration file
Returns
-------
conf : `~grizli.grismconf.aXeConf`
Configuration file object. Runs `conf.get_beams()` to read the
sensitivity curves.
"""
if 'V3/NIRCAM' in conf_file:
conf = TransformGrismconf(conf_file)
conf.get_beams()
elif 'V2/NIRCAM' in conf_file:
conf = TransformGrismconf(conf_file)
conf.get_beams()
elif 'V4/NIRCAM' in conf_file:
conf = TransformGrismconf(conf_file)
conf.get_beams()
elif 'V8/NIRCAM' in conf_file:
conf = TransformGrismconf(conf_file)
conf.get_beams()
elif 'V8.5/NIRCAM' in conf_file:
conf = TransformGrismconf(conf_file)
conf.get_beams()
elif 'V9/NIRCAM' in conf_file:
conf = TransformGrismconf(conf_file)
conf.get_beams()
elif 'specwcs' in conf_file:
conf = TransformGrismconf(conf_file)
conf.get_beams()
else:
conf = aXeConf(conf_file)
conf.get_beams()
# Preliminary hacks for on-sky NIRISS
if 'GR150' in conf_file:
if 0:
hack_niriss = 1./1.8 * 1.1
msg = f"""
! Scale NIRISS sensitivity by {hack_niriss:.3f} to hack gain correction
! and match GLASS MIRAGE simulations. Sensitivity will be updated when
! on-sky data available
"""
msg = f' ! Scale NIRISS sensitivity by {hack_niriss:.3f} prelim flux correction'
utils.log_comment(utils.LOGFILE, msg, verbose=warnings)
else:
hack_niriss = 1.0
for b in conf.sens:
conf.sens[b]['SENSITIVITY'] *= hack_niriss
if 'ERROR' in conf.sens[b].colnames:
conf.sens[b]['ERROR'] *= hack_niriss
if ('F115W' in conf_file) | ('.2212' in conf_file):
pass
# msg = f""" !! Shift F115W along dispersion"""
# utils.log_comment(utils.LOGFILE, msg, verbose=warnings)
# for b in conf.beams:
# #conf.conf[f'DYDX_{b}_0'][0] += 0.25
# conf.conf[f'DLDP_{b}_0'] -= conf.conf[f'DLDP_{b}_1']*0.5
elif isinstance(conf, TransformGrismconf):
# Don't shift new format files
pass
else:
msg = f""" !! Shift {os.path.basename(conf_file)} along dispersion"""
utils.log_comment(utils.LOGFILE, msg, verbose=warnings)
for b in conf.beams:
#conf.conf[f'DYDX_{b}_0'][0] += 0.25
conf.conf[f'DLDP_{b}_0'] += conf.conf[f'DLDP_{b}_1']*0.5
# For red galaxy
conf.conf[f'DLDP_{b}_0'] += conf.conf[f'DLDP_{b}_1']*0.5
# if 'F200W' in conf_file:
# conf.conf[f'DLDP_{b}_0'] += conf.conf[f'DLDP_{b}_1']*0.5
# _w = conf.sens['A']['WAVELENGTH']
# _w0 = (_w*conf.sens['A']['SENSITIVITY']).sum()
# _w0 /= conf.sens['A']['SENSITIVITY'].sum()
# slope = 1.05 + 0.2 * (_w - _w0)/3000
# # print('xxx', conf_file, _w0)
# conf.sens['A']['SENSITIVITY'] *= slope
if ('F150W' in conf_file) & (hack_niriss > 1.01):
conf.sens['A']['SENSITIVITY'] *= 1.08
# Scale 0th orders in F150W
if ('F150W' in conf_file): # | ('F200W' in conf_file):
msg = f""" ! Scale 0th order (B) by an additional x 1.5"""
utils.log_comment(utils.LOGFILE, msg, verbose=warnings)
conf.sens['B']['SENSITIVITY'] *= 1.5
if 'ERROR' in conf.sens['B'].colnames:
conf.sens['B']['ERROR'] *= 1.5
# Another shift from 0723, 2744
# if ('GR150C.F200W' in conf_file):
# msg = f""" !! Extra shift for GR150C.F200W"""
# utils.log_comment(utils.LOGFILE, msg, verbose=warnings)
# for b in conf.beams:
# pass
# #conf.conf[f'DYDX_{b}_0'][0] += 0.25
# # conf.conf[f'DLDP_{b}_0'] -= conf.conf[f'DLDP_{b}_1']*0.5
# Shift x by 1 px
# msg = f""" ! Shift NIRISS by 0.5 pix along dispersion direction"""
# utils.log_comment(utils.LOGFILE, msg, verbose=warnings)
#
# for b in conf.beams:
# conf.conf[f'DLDP_{b}_0'] -= conf.conf[f'DLDP_{b}_1']*0.5
return conf
[docs]def download_jwst_crds_references(instruments=['NIRCAM','NIRISS'], filters=['F090W','F115W','F150W','F200W','F277W','F356W','F410M','F444W','F460M','F480M'], grisms=['GRISMR','GRISMC','GR150R','GR150C'], modules=['A','B'], context=DEFAULT_CRDS_CONTEXT, verbose=True):
"""
Run `~grizli.jwst_utils.crds_reffiles` with filter and grism combinations to
prefetch a bunch of reference files
"""
for instrument in instruments:
for f in filters:
for g in grisms:
if instrument == 'NIRCAM':
if (f < 'F270') | g.startswith('GR150'):
continue
for m in modules:
_ = crds_reffiles(instrument='NIRCAM',
filter=f, pupil=g,
module=m,
reftypes=('photom', 'specwcs'),
header=None,
context=context,
verbose=verbose)
else:
if (f > 'F270') | g.startswith('GRISM'):
continue
_ = crds_reffiles(instrument='NIRISS',
filter=g, pupil=f,
module='A',
reftypes=('photom', 'specwcs'),
header=None,
context=context,
verbose=verbose)
[docs]class CRDSGrismConf():
def __init__(self, file='references/jwst/nircam/jwst_nircam_specwcs_0136.asdf', get_photom=True, context=DEFAULT_CRDS_CONTEXT, **kwargs):
"""
Helper object to replicate `grismconf` config files from CRDS products
Parameters
----------
file : str
Filename of a CRDS ``specwcs`` file
get_photom : bool
Get sensitivity curves from the ``photom`` reference file
context : str
Explicit CRDS_CONTEXT
Attributes
----------
dm : `jwst.datamodels.NIRCAMGrismModel`
DataModel of the ``specwcs`` reference
meta : dict
Metadata dictionary from `jwst.datamodels.NIRCAMGrismModel.meta.instance`
dm_orders : list
List of orders from `dm.orders`
crds_parameters : dict
CRDS parameter dictionary from `dm.get_crds_parameters()`
dispx, dispy, displ : list, list, list
Parameter polynomials from the reference datamodel
SENS_data : dict
Sensitivity data like ``{order: (wave_microns, sensitity)}``
"""
from . import jwst_utils
if context is not None:
jwst_utils.CRDS_CONTEXT = context
jwst_utils.set_crds_context(verbose=False, override_environ=True)
self.file = file
if file.startswith('references'):
full_path = os.path.join(os.environ['CRDS_PATH'], file)
elif os.path.exists(file):
full_path = file
elif '/references' in file:
# Replace CRDS_PATH in absolute path
full_path = os.path.join(os.environ['CRDS_PATH'], 'references',
file.split('references/')[1])
if not os.path.exists(full_path):
# Try to download all of the JWST references
download_jwst_crds_references(context=context)
self.full_path = full_path
self.initialize_from_datamodel()
self.SENS = None
self.SENS_data = None
if get_photom:
self.get_photom(**kwargs)
self.load_new_sensitivity_curve(**kwargs)
[docs] def initialize_from_datamodel(self):
"""
Initialize polynomial objects from a `jwst.datamodel`
"""
import copy
import jwst.datamodels
full_path = self.full_path
if 'nircam' in full_path:
dm = jwst.datamodels.NIRCAMGrismModel(full_path)
else:
dm = jwst.datamodels.NIRISSGrismModel(full_path)
self.meta = copy.deepcopy(dm.meta.instance)
self.dm_orders = copy.deepcopy(dm.orders)
self.crds_parameters = dm.get_crds_parameters()
self.dispx = copy.deepcopy(dm.dispx)
self.dispy = copy.deepcopy(dm.dispy)
self.displ = copy.deepcopy(dm.displ)
# @property
# def meta(self):
# """metadata dictionary"""
# return self.dm.meta.instance
@property
def module(self):
if self.instrument == 'NIRCAM':
return self.meta['instrument']['module']
else:
return None
@property
def filter(self):
return self.meta['instrument']['filter']
@property
def pupil(self):
return self.meta['instrument']['pupil']
@property
def grism(self):
"""
Return filter for NIRISS, pupil for NIRCAM, filter for HST
"""
if self.instrument in ('NIRCAM','NIRISS'):
if self.instrument == 'NIRCAM':
return self.pupil
else:
return self.filter
else:
# e.g., HST
return self.filter
@property
def instrument(self):
return self.meta['instrument']['name']
@property
def filter_grism(self):
"""Return combination of blocking filter + dispering element"""
if self.instrument == 'NIRISS':
return (self.pupil, self.filter)
else:
return (self.filter, self.pupil)
@property
def instrument_setup(self):
"""Return combination of blocking filter, dispering element, (module)"""
if self.instrument == 'NIRISS':
return (self.pupil, self.filter, None)
else:
return (self.filter, self.pupil, self.module)
@property
def orders(self):
"""String version of orders, like '+1', '+2', '0', '-1'"""
orders = []
for o in self.dm_orders:
if o > 0:
orders.append(f'+{o}')
else:
orders.append(f'{o}')
return orders
[docs] def DISPX(self, order, x0, y0, t):
"""Replicate grismconf.DISPX"""
io = self.orders.index(order)
dispx = self._eval_model(self.dispx[io], x0, y0, t)
return dispx
[docs] def DDISPX(self, order, x0, y0, t, dt=0.01):
"""Replicate grismconf.DDISPX"""
io = self.orders.index(order)
v0 = self._eval_model(self.dispx[io], x0, y0, t)
v1 = self._eval_model(self.dispx[io], x0, y0, t+dt)
return (v1-v0)/dt
[docs] def DISPY(self, order, x0, y0, t):
"""Replicate grismconf.DISPY"""
io = self.orders.index(order)
dispy = self._eval_model(self.dispy[io], x0, y0, t)
return dispy
[docs] def DDISPY(self, order, x0, y0, t, dt=0.01):
"""Replicate grismconf.DDISPY"""
io = self.orders.index(order)
v0 = self._eval_model(self.dispy[io], x0, y0, t)
v1 = self._eval_model(self.dispy[io], x0, y0, t+dt)
return (v1-v0)/dt
[docs] def DISPXY(self, order, x0, y0, t):
"""Replicate grismconf.DISPXY (combination of DISPX, DISPY)"""
io = self.orders.index(order)
dispx = self._eval_model(self.dispx[io], x0, y0, t)
dispy = self._eval_model(self.dispy[io], x0, y0, t)
return dispx, dispy
[docs] def DISPL(self, order, x0, y0, t):
"""Replicate grismconf.DISPL"""
io = self.orders.index(order)
displ = self._eval_model(self.displ[io], x0, y0, t)
return displ
[docs] def DDISPL(self, order, x0, y0, t, dt=0.01):
"""Replicate grismconf.DDISPL"""
io = self.orders.index(order)
v0 = self._eval_model(self.displ[io], x0, y0, t)
v1 = self._eval_model(self.displ[io], x0, y0, t+dt)
return (v1-v0)/dt
[docs] def INVDISPX(self, order, x0, y0, dx, t0=np.linspace(-1, 2, 128), from_root=False):
""" Inverse DISPX """
if from_root:
func = self._root_inverse_model
else:
func = self._inverse_model
io = self.orders.index(order)
t = func(self.dispx[io], x0, y0, dx, t0=t0)
return t
[docs] def INVDISPY(self, order, x0, y0, dx, t0=np.linspace(-1, 2, 128), from_root=False):
""" Inverse DISPY """
if from_root:
func = self._root_inverse_model
else:
func = self._inverse_model
io = self.orders.index(order)
t = func(self.dispy[io], x0, y0, dx, t0=t0)
return t
[docs] def INVDISPL(self, order, x0, y0, dx, t0=np.linspace(-1, 2, 128), from_root=False):
"""
Inverse DISPL
"""
if from_root:
func = self._root_inverse_model
else:
func = self._inverse_model
io = self.orders.index(order)
t = func(self.displ[io], x0, y0, dx, t0=t0)
return t
def _eval_model(self, model, x0, y0, t, get_coeffs=False):
"""
General function for evaluating model polynomials
"""
import numpy as np
if hasattr(model, 'n_inputs'):
# model is a Polynomial
if model.n_inputs == 1:
if get_coeffs:
value = model.parameters[::-1]
else:
value = model(t)
else:
value = model(x0, y0)
elif len(model) == 1:
# model is a single-element list, probably Polynomial1D
if model[0].n_inputs == 1:
if get_coeffs:
value = model[0].parameters[::-1]
else:
value = model[0](t)
else:
value = model[0](x0, y0)
else:
# model is a list
_c = []
for m in model:
if m.n_inputs == 1:
_c.append(m(t))
else:
_c.append(m(x0, y0))
if get_coeffs:
value = _c[::-1]
else:
value = np.polyval(_c[::-1], t)
return value
def _root_inverse_model(self, model, x0, y0, dx, **kwargs):
"""Inverse values interpolated from the forward model using polynomial roots"""
coeffs = self._eval_model(model, x0, y0, 0, get_coeffs=True)
if hasattr(coeffs, '__len__'):
coeffs[-1] -= dx
value = np.roots(coeffs)[-1]
else:
value = coeffs
return value
def _inverse_model(self, model, x0, y0, dx, t0=np.linspace(-1, 2, 128)):
"""Inverse values interpolated from the forward model """
values = self._eval_model(model, x0, y0, t0)
if values[0] > values[1]:
return np.interp(dx, values[::-1], t0[::-1])
else:
return np.interp(dx, values, t0)
[docs] def get_photom(self, xyt=(1024, 1024, 0.5), date=None, photom_file=None, verbose=False, **kwargs):
"""
Load photom reference from CRDS and scale to grismconf / aXe convention
Parameters
----------
xyt : (float, float, float)
Coordinate (x0, y0, t) where to evaluate the grism dispersion DLDP
date : str, None
Observation date in ISO format, e.g., '2023-01-01 00:00:00'. If not
specified, defaults to "now"
photom_file : str
Explicit filename of a CRDS ``photom`` reference file
verbose : bool
Print status message
Returns
-------
SENS_data : dict
Dict of ``{'order': (wave, sens)}``. Also sets ``SENS_data``,
``SENS_dldp`` and ``SENS_xyt`` attributes.
"""
import astropy.time
import astropy.table
import astropy.units as u
import crds
import jwst.datamodels
if photom_file is None:
cpars = self.crds_parameters
if self.instrument == 'NIRISS':
cpars['meta.instrument.detector'] = 'NIS'
cpars['meta.exposure.type'] = 'NIS_WFSS'
else:
cpars['meta.instrument.detector'] = f'NRC{self.module}LONG'
cpars['meta.exposure.type'] = 'NRC_WFSS'
if date is None:
date = astropy.time.Time.now().iso
cpars['meta.observation.date'] = date.split()[0]
cpars['meta.observation.time'] = date.split()[1]
refs = crds.getreferences(cpars, reftypes=('photom',))
if verbose:
msg = f"Read photometry reference {refs['photom']} (date = '{date}')"
print(msg)
photom_file = refs['photom']
if self.instrument == 'NIRCAM':
ph = jwst.datamodels.NrcWfssPhotomModel(photom_file)
else:
ph = jwst.datamodels.NisWfssPhotomModel(photom_file)
phot = astropy.table.Table(ph.phot_table)
pixel_area = ph.meta.photometry.pixelarea_steradians
self.sens_ref_file = refs['photom']
self.SENS_xyt = xyt
self.SENS_dldp = {}
self.SENS_data = {}
for i, order in enumerate(self.orders):
ix = phot['filter'] == self.filter
ix &= phot['pupil'] == self.pupil
ix &= phot['order'] == self.dm_orders[i]
if ix.sum() == 0:
msg = f"Order {order} = {self.dm_orders[i]} not found in "
msg += f"{refs['photom']} for {self.filter} {self.pupil}"
print(msg)
continue
row = phot[ix]
wave = np.squeeze(row['wavelength'].data)
sens_fnu = np.squeeze(row['relresponse'].data)
sens_fnu *= pixel_area*row['photmjsr']
# Dispersion, DLAM/DPIX
dl = self.DDISPL(order, *xyt)
dx = self.DDISPX(order, *xyt)
dy = self.DDISPY(order, *xyt)
dldp = dl/np.sqrt(dx**2+dy**2)
self.SENS_dldp[order] = dldp
mask = (wave > 0) & (sens_fnu > 0)
_flam_unit = u.erg/u.second/u.cm**2/u.micron
sens_flam = (sens_fnu[mask]*dldp*u.megaJansky).to(_flam_unit,
equivalencies=u.spectral_density(wave[mask]*u.micron))
self.SENS_data[order] = [wave[mask], 1./sens_flam.value]
return self.SENS_data
[docs] def load_new_sensitivity_curve(self, verbose=True, **kwargs):
"""
Replace +1 NIRCam sensitivity curves with Nov 10, 2023 updates
Files generated with the calibration data of P330E from program
CAL-1538 (K. Gordon)
Download the FITS files from the link below and put them in
``$GRIZLI/CONF/GRISM_NIRCAM/``.
https://s3.amazonaws.com/grizli-v2/JWSTGrism/NircamSensitivity/index.html
"""
path = os.path.join(GRIZLI_PATH, 'CONF', 'GRISM_NIRCAM')
meta = self.crds_parameters
if meta['meta.instrument.name'] != 'NIRCAM':
if verbose:
msg = 'load_new_sensitivity_curve: only defined for NIRCAM ({0})'
print(msg.format(meta['meta.instrument.name']))
return None
sens_base = 'nircam_wfss_sensitivity_{filter}_{pupil}_{module}.10nov23.fits'
sens_file = sens_base.format(filter=meta['meta.instrument.filter'],
pupil=meta['meta.instrument.pupil'],
module=meta['meta.instrument.module'])
sens_file = os.path.join(path, sens_file)
# print('xx', sens_file, os.path.exists(sens_file))
if os.path.exists(sens_file):
msg = "grismconf.CRDSGrismConf: replace sensitivity curve with "
msg += f"{sens_file}"
utils.log_comment(utils.LOGFILE, msg, verbose=verbose)
si = utils.read_catalog(sens_file)
self.SENS_data['+1'] = [si['WAVELENGTH']/1.e4, si['SENSITIVITY']]