Source code for grizli.grismconf

"""
Demonstrate aXe trace polynomials.

Initial code taken from `(Brammer, Pirzkal, & Ryan 2014) <https://github.com/WFC3Grism/CodeDescription>`_, which contains a detailed
explanation how the grism configuration parameters and coefficients are defined and evaluated.
"""

import os
from collections import OrderedDict

import numpy as np

from . import GRIZLI_PATH, utils
from .jwst_utils import crds_reffiles

DEFAULT_CRDS_CONTEXT = 'jwst_1123.pmap'

NIRCAM_CONF_VERSION = 'V8.5'

if os.getenv('NIRCAM_CONF_VERSION') is not None:
    NIRCAM_CONF_VERSION = os.getenv('NIRCAM_CONF_VERSION')
    print('Use NIRCAM_CONF_VERSION={NIRCAM_CONF_VERSION}')

[docs]def show_available_nircam_versions(filter='F444W', module='B', grism='R', verbose=True): """ Show all available versions of the NIRCAM Grism config files """ import glob files = glob.glob(os.path.join(GRIZLI_PATH, f'CONF/GRISM_NIRCAM/*/NIRCAM_{filter}_mod{module}_{grism}.conf')) files.sort() versions = [] for file in files: versions.append(file.split('/')[-2]) if verbose: print(f'{versions[-1]:>8} {file}') return versions
[docs]class aXeConf(): def __init__(self, conf_file='WFC3.IR.G141.V2.5.conf'): """Read an aXe-compatible configuration file Parameters ---------- conf_file: str Filename of the configuration file to read """ if conf_file is not None: self.conf = self.read_conf_file(conf_file) self.conf_dict = self.conf self.conf_file = conf_file self.count_beam_orders() # Global XOFF/YOFF offsets if 'XOFF' in self.conf.keys(): self.xoff = float(self.conf['XOFF']) else: self.xoff = 0. if 'YOFF' in self.conf.keys(): self.yoff = float(self.conf['YOFF']) else: self.yoff = 0.
[docs] def read_conf_file(self, conf_file='WFC3.IR.G141.V2.5.conf'): """Read an aXe config file, convert floats and arrays Parameters ---------- conf_file: str Filename of the configuration file to read. Parameters are stored in an OrderedDict in `self.conf`. """ conf = OrderedDict() fp = open(conf_file) lines = fp.readlines() fp.close() for line in lines: # empty / commented lines if (line.startswith('#')) | (line.strip() == '') | ('"' in line): continue # split the line, taking out ; and # comments spl = line.split(';')[0].split('#')[0].split() param = spl[0] if len(spl) > 2: value = np.cast[float](spl[1:]) else: try: value = float(spl[1]) except: value = spl[1] conf[param] = value return conf
[docs] def count_beam_orders(self): """Get the maximum polynomial order in DYDX or DLDP for each beam """ self.orders = {} for beam in ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J']: order = 0 while 'DYDX_{0:s}_{1:d}'.format(beam, order) in self.conf.keys(): order += 1 while 'DLDP_{0:s}_{1:d}'.format(beam, order) in self.conf.keys(): order += 1 self.orders[beam] = order-1
[docs] def get_beams(self): """Get beam parameters and read sensitivity curves """ import os from astropy.table import Table, Column self.dxlam = OrderedDict() self.nx = OrderedDict() self.sens = OrderedDict() self.beams = [] for beam in self.orders: if self.orders[beam] > 0: self.beams.append(beam) self.dxlam[beam] = np.arange(self.conf['BEAM{0}'.format(beam)].min(), self.conf['BEAM{0}'.format(beam)].max(), dtype=int) self.nx[beam] = int(self.dxlam[beam].max()-self.dxlam[beam].min())+1 self.sens[beam] = Table.read('{0}/{1}'.format(os.path.dirname(self.conf_file), self.conf['SENSITIVITY_{0}'.format(beam)])) #self.sens[beam].wave = np.cast[np.double](self.sens[beam]['WAVELENGTH']) #self.sens[beam].sens = np.cast[np.double](self.sens[beam]['SENSITIVITY']) # Need doubles for interpolating functions for col in self.sens[beam].colnames: data = np.cast[np.double](self.sens[beam][col]) self.sens[beam].remove_column(col) self.sens[beam].add_column(Column(data=data, name=col)) # Scale BEAM F if (beam == 'F') & ('G141' in self.conf_file): self.sens[beam]['SENSITIVITY'] *= 0.35 if (beam == 'B') & ('G141' in self.conf_file): if self.conf['SENSITIVITY_B'] == 'WFC3.IR.G141.0th.sens.1.fits': self.sens[beam]['SENSITIVITY'] *= 2 # wave = np.cast[np.double](self.sens[beam]['WAVELENGTH']) # sens = np.cast[np.double](self.sens[beam]['SENSITIVITY'] # self.sens[beam]['WAVELENGTH'] = np.cast[np.double](self.sens[beam]['WAVELENGTH']) # self.sens[beam]['SENSITIVITY'] = ) self.beams.sort()
[docs] def remove_beam(self, beam): """ Remove a beam definition """ if beam in self.beams: ix = self.beams.index(beam) _ = self.beams.pop(ix)
[docs] def reset_beam_list(self): """ Reset full beam list, perhaps after removing some with ``remove_beam`` """ for beam in self.orders: if self.orders[beam] > 0: self.beams.append(beam)
[docs] def field_dependent(self, xi, yi, coeffs): """aXe field-dependent coefficients See the `aXe manual <http://axe.stsci.edu/axe/manual/html/node7.html#SECTION00721200000000000000>`_ for a description of how the field-dependent coefficients are specified. Parameters ---------- xi, yi : float or array-like Coordinate to evaluate the field dependent coefficients, where `xi = x-REFX` and `yi = y-REFY`. coeffs : array-like Field-dependency coefficients Returns ------- a : float or array-like Evaluated field-dependent coefficients """ # number of coefficients for a given polynomial order # 1:1, 2:3, 3:6, 4:10, order:order*(order+1)/2 if hasattr(coeffs, '__len__'): order = int(-1+np.sqrt(1+8*len(coeffs))) // 2 else: order = 1 # Build polynomial terms array # $a = a_0+a_1x_i+a_2y_i+a_3x_i^2+a_4x_iy_i+a_5yi^2+$ ... xy = [] for _p in range(order): for _py in range(_p+1): # print 'x**%d y**%d' %(p-py, px) xy.append(xi**(_p - _py) * yi**(_py)) # Evaluate the polynomial, allowing for N-dimensional inputs a = np.sum((np.array(xy).T*coeffs).T, axis=0) return a
[docs] def evaluate_dp(self, dx, dydx): """Evalate arc length along the trace given trace polynomial coefficients Parameters ---------- dx : array-like x pixel to evaluate dydx : array-like Coefficients of the trace polynomial Returns ------- dp : array-like Arc length along the trace at position `dx`. For `dydx` polynomial orders 0, 1 or 2, integrate analytically. Higher orders must be integrated numerically. **Constant:** .. math:: dp = dx **Linear:** .. math:: dp = \sqrt{1+\mathrm{DYDX}[1]}\cdot dx **Quadratic:** .. math:: u = \mathrm{DYDX}[1] + 2\ \mathrm{DYDX}[2]\cdot dx .. math:: dp = (u \sqrt{1+u^2} + \mathrm{arcsinh}\ u) / (4\cdot \mathrm{DYDX}[2]) """ # dp is the arc length along the trace # $\lambda = dldp_0 + dldp_1 dp + dldp_2 dp^2$ ... poly_order = len(dydx)-1 if (poly_order == 2): if np.abs(np.unique(dydx[2])).max() == 0: poly_order = 1 if poly_order == 0: # dy=0 dp = dx elif poly_order == 1: # constant dy/dx dp = np.sqrt(1+dydx[1]**2)*(dx) elif poly_order == 2: # quadratic trace u0 = dydx[1]+2*dydx[2]*(0) dp0 = (u0*np.sqrt(1+u0**2)+np.arcsinh(u0))/(4*dydx[2]) u = dydx[1]+2*dydx[2]*(dx) dp = (u*np.sqrt(1+u**2)+np.arcsinh(u))/(4*dydx[2])-dp0 else: # high order shape, numerical integration along trace # (this can be slow) xmin = np.minimum((dx).min(), 0) xmax = np.maximum((dx).max(), 0) xfull = np.arange(xmin, xmax) dyfull = 0 for i in range(1, poly_order): dyfull += i*dydx[i]*(xfull-0.5)**(i-1) # Integrate from 0 to dx / -dx dpfull = xfull*0. lt0 = xfull < 0 if lt0.sum() > 1: dpfull[lt0] = np.cumsum(np.sqrt(1+dyfull[lt0][::-1]**2))[::-1] dpfull[lt0] *= -1 # gt0 = xfull > 0 if gt0.sum() > 0: dpfull[gt0] = np.cumsum(np.sqrt(1+dyfull[gt0]**2)) dp = np.interp(dx, xfull, dpfull) if dp[-1] == dp[-2]: dp[-1] = dp[-2]+np.diff(dp)[-2] return dp
[docs] def get_beam_trace(self, x=507, y=507, dx=0., beam='A', fwcpos=None): """Get an aXe beam trace for an input reference pixel and list of output x pixels `dx` Parameters ---------- x, y : float or array-like Evaluate trace definition at detector coordinates `x` and `y`. dx : float or array-like Offset in x pixels from `(x,y)` where to compute trace offset and effective wavelength beam : str Beam name (i.e., spectral order) to compute. By aXe convention, `beam='A'` is the first order, 'B' is the zeroth order and additional beams are the higher positive and negative orders. fwcpos : None or float For NIRISS, specify the filter wheel position to compute the trace rotation Returns ------- dy : float or array-like Center of the trace in y pixels offset from `(x,y)` evaluated at `dx`. lam : float or array-like Effective wavelength along the trace evaluated at `dx`. """ NORDER = self.orders[beam]+1 xi, yi = x-self.xoff, y-self.yoff xoff_beam = self.field_dependent(xi, yi, self.conf['XOFF_{0}'.format(beam)]) yoff_beam = self.field_dependent(xi, yi, self.conf['YOFF_{0}'.format(beam)]) # y offset of trace (DYDX) dydx = np.zeros(NORDER) # 0 #+1.e-80 dydx = [0]*NORDER for i in range(NORDER): if 'DYDX_{0:s}_{1:d}'.format(beam, i) in self.conf.keys(): coeffs = self.conf['DYDX_{0:s}_{1:d}'.format(beam, i)] dydx[i] = self.field_dependent(xi, yi, coeffs) # $dy = dydx_0+dydx_1 dx+dydx_2 dx^2+$ ... dy = yoff_beam for i in range(NORDER): dy += dydx[i]*(dx-xoff_beam)**i # wavelength solution dldp = np.zeros(NORDER) dldp = [0]*NORDER for i in range(NORDER): if 'DLDP_{0:s}_{1:d}'.format(beam, i) in self.conf.keys(): coeffs = self.conf['DLDP_{0:s}_{1:d}'.format(beam, i)] dldp[i] = self.field_dependent(xi, yi, coeffs) self.eval_input = {'x': x, 'y': y, 'beam': beam, 'dx': dx, 'fwcpos': fwcpos} self.eval_output = {'xi': xi, 'yi': yi, 'dldp': dldp, 'dydx': dydx, 'xoff_beam': xoff_beam, 'yoff_beam': yoff_beam, 'dy': dy} dp = self.evaluate_dp(dx-xoff_beam, dydx) # ## dp is the arc length along the trace # ## $\lambda = dldp_0 + dldp_1 dp + dldp_2 dp^2$ ... # if self.conf['DYDX_ORDER_%s' %(beam)] == 0: ## dy=0 # dp = dx-xoff_beam # elif self.conf['DYDX_ORDER_%s' %(beam)] == 1: ## constant dy/dx # dp = np.sqrt(1+dydx[1]**2)*(dx-xoff_beam) # elif self.conf['DYDX_ORDER_%s' %(beam)] == 2: ## quadratic trace # u0 = dydx[1]+2*dydx[2]*(0) # dp0 = (u0*np.sqrt(1+u0**2)+np.arcsinh(u0))/(4*dydx[2]) # u = dydx[1]+2*dydx[2]*(dx-xoff_beam) # dp = (u*np.sqrt(1+u**2)+np.arcsinh(u))/(4*dydx[2])-dp0 # else: # ## high order shape, numerical integration along trace # ## (this can be slow) # xmin = np.minimum((dx-xoff_beam).min(), 0) # xmax = np.maximum((dx-xoff_beam).max(), 0) # xfull = np.arange(xmin, xmax) # dyfull = 0 # for i in range(1, NORDER): # dyfull += i*dydx[i]*(xfull-0.5)**(i-1) # # ## Integrate from 0 to dx / -dx # dpfull = xfull*0. # lt0 = xfull <= 0 # if lt0.sum() > 1: # dpfull[lt0] = np.cumsum(np.sqrt(1+dyfull[lt0][::-1]**2))[::-1] # dpfull[lt0] *= -1 # # # gt0 = xfull >= 0 # if gt0.sum() > 0: # dpfull[gt0] = np.cumsum(np.sqrt(1+dyfull[gt0]**2)) # # dp = np.interp(dx-xoff_beam, xfull, dpfull) # Evaluate dldp lam = dp*0. for i in range(NORDER): lam += dldp[i]*dp**i # NIRISS rotation? if fwcpos is not None: if 'FWCPOS_REF' not in self.conf.keys(): print('Parameter fwcpos={0} supplied but no FWCPOS_REF in {1:s}'.format(fwcpos, self.conf_file)) return dy, lam order = int(self.conf['DYDX_ORDER_{0}'.format(beam)]) if order > 2: print(f'ORDER={order} > 2 not supported for NIRISS rotation') return dy, lam theta = (fwcpos - self.conf['FWCPOS_REF'])/180*np.pi*1 theta *= -1 # DMS rotation # print('DMS') if theta == 0: return dy, lam # For the convention of swapping/inverting axes for GR150C # if 'GR150C' in self.conf_file: # theta = -theta # If theta is small, use a small angle approximation. # Otherwise, 1./tan(theta) blows up and results in numerical # noise. xp = (dx-xoff_beam)/np.cos(theta) if (1-np.cos(theta) < 5.e-8) | (np.abs(dydx[2]) < 1.e-8) | (order < 2): #print('Approximate!', xoff_beam, np.tan(theta)) dy = dy + (dx-xoff_beam)*np.tan(theta) delta = 0. # print('Approx') else: # Full transformed trace coordinates c = dydx #print('Not approx') beta = c[1]+2*c[2]*xp-1/np.tan(theta) chi = c[0]+c[1]*xp+c[2]*xp**2 if theta < 0: psi = (-beta+np.sqrt(beta**2-4*c[2]*chi)) psi *= 1./2/c[2]/np.tan(theta) delta = psi*np.tan(theta) dy = dx*np.tan(theta) + psi/np.cos(theta) else: psi = (-beta-np.sqrt(beta**2-4*c[2]*chi)) psi *= 1./2/c[2]/np.tan(theta) delta = psi*np.tan(theta) dy = dx*np.tan(theta) + psi/np.cos(theta) # Evaluate wavelength at 'prime position along the trace dp = self.evaluate_dp(xp+delta, dydx) lam = dp*0. for i in range(NORDER): lam += dldp[i]*dp**i return dy, lam
[docs] def show_beams(self, xy=None, beams=['E', 'D', 'C', 'B', 'A']): """ Make a demo plot of the beams of a given configuration file """ import matplotlib.pyplot as plt x0, x1 = 507, 507 dx = np.arange(-800, 1200) if 'WFC3.UV' in self.conf_file: x0, x1 = 2073, 250 dx = np.arange(-1200, 1200) if 'G800L' in self.conf_file: x0, x1 = 2124, 1024 dx = np.arange(-1200, 1200) if xy is not None: x0, x1 = xy s = 200 # marker size fig, ax = plt.subplots(figsize=[10, 3]) ax.scatter(0, 0, marker='s', s=s, color='black', edgecolor='0.8', label='Direct') for beam in beams: if 'XOFF_{0}'.format(beam) not in self.conf.keys(): continue xoff = self.field_dependent(x0, x1, self.conf['XOFF_{0}'.format(beam)]) dy, lam = self.get_beam_trace(x0, x1, dx=dx, beam=beam) xlim = self.conf['BEAM{0}'.format(beam)] ok = (dx >= xlim[0]) & (dx <= xlim[1]) sc = ax.scatter(dx[ok]+xoff, dy[ok], c=lam[ok]/1.e4, marker='s', s=s, alpha=0.5, edgecolor='None') ax.text(np.median(dx[ok]), np.median(dy[ok])+1, beam, ha='center', va='center', fontsize=14) print('Beam {0}, lambda=({1:.1f} - {2:.1f})'.format(beam, lam[ok].min(), lam[ok].max())) ax.grid() ax.set_xlabel(r'$\Delta x$' + f' (x0={x0})') ax.set_ylabel(r'$\Delta y$' + f' (y0={x1})') cb = plt.colorbar(sc, pad=0.01, fraction=0.05) cb.set_label(r'$\lambda\,(\mu\mathrm{m})$') ax.set_title(self.conf_file) fig.tight_layout(pad=0.1) #plt.savefig('{0}.pdf'.format(self.conf_file)) return fig
[docs] def load_nircam_sensitivity_curve(self, verbose=True, **kwargs): """ Replace +1 NIRCam sensitivity curves with Nov 10, 2023 updates Files generated with the calibration data of P330E from program CAL-1538 (K. Gordon) Download the FITS files from the link below and put them in ``$GRIZLI/CONF/GRISM_NIRCAM/``. https://s3.amazonaws.com/grizli-v2/JWSTGrism/NircamSensitivity/index.html """ if 'NIRCAM' not in self.conf_file: return None pars = os.path.basename(self.conf_file).split('_') path = os.path.join(GRIZLI_PATH, 'CONF', 'GRISM_NIRCAM') sens_base = 'nircam_wfss_sensitivity_{filter}_{pupil}_{module}.10nov23.fits' sens_file = sens_base.format(filter=pars[1], pupil='GRISM'+pars[3][0], module=pars[2][-1]) sens_file = os.path.join(path, sens_file) # print('xx', sens_file, os.path.exists(sens_file)) if os.path.exists(sens_file): msg = "grismconf.aXeConf: replace sensitivity curve with " msg += f"{sens_file}" utils.log_comment(utils.LOGFILE, msg, verbose=verbose) si = utils.read_catalog(sens_file).copy() si['ERROR'] = si['SENSITIVITY'].max()*0.01 self.SENS['A'] = si
[docs]def coeffs_from_astropy_polynomial(p): """ Get field-dependent coefficients in aXe format from an `astropy.modeling.polynomial.Polynomial2D` model Parameters ---------- p : `astropy.modeling.polynomial.Polynomial2D` Polynomial model Returns ------- coeffs : array-like Reordered array of coefficients """ coeffs = [] for _p in range(p.degree+1): for _py in range(_p+1): # print 'x**%d y**%d' %(_p-_py, _py) _px = _p - _py pname = f'c{_px}_{_py}' if pname in p.param_names: pix = p.param_names.index(pname) coeffs.append(p.parameters[pix]) else: coeffs.append(0.0) return np.array(coeffs)
[docs]def get_config_filename(instrume='WFC3', filter='F140W', grism='G141', pupil=None, module=None, chip=1, use_jwst_crds=False, crds_context=DEFAULT_CRDS_CONTEXT): """Generate a config filename based on the instrument, filter & grism combination. Config files assumed to be found the directory specified by the `$GRIZLI` environment variable, i.e., `${GRIZLI}/CONF`. Parameters ---------- instrume : {'ACS', 'WFC3', 'NIRISS', 'NIRCam', 'WFIRST'} Instrument used filter : str Direct image filter. This is only used for WFC3/IR, where the grism configuration files have been determined for each direct+grism combination separately based on the filter wedge offsets of the filters. grism : str Grism name. Valid combinations are the following: ACS : G800L (assumed) WFC3 : G102, G141 NIRISS : GR150R, GR150C NIRCam : F322W2, F356W, F430M, F444W, F460M WFIRST : (basic assumptions about the WFI grism) chip : int For ACS/WFC and UVIS, specifies the chip to use. Note that this is switched with respect to the header EXTNAME extensions: EXTVER = 1 is extension 1 / (SCI,1) of the flt/flc files but corresponds to CCDCHIP = 2 and the ACS.WFC3.CHIP2 config files. and EXTVER = 2 is extension 4 / (SCI,2) of the flt/flc files but corresponds to CCDCHIP = 1 and the ACS.WFC3.CHIP1 config files. use_jwst_crds : bool Use CRDS ``specwcs`` reference files for JWST instruments Returns ------- conf_file : str String path of the configuration file. """ if instrume == 'ACS': conf_file = os.path.join(GRIZLI_PATH, 'CONF/ACS.WFC.CHIP{0:d}.Stars.conf'.format(chip)) if not os.path.exists(conf_file): conf_file = os.path.join(GRIZLI_PATH, 'CONF/ACS.WFC.CHIP{0:d}.Cycle13.5.conf'.format(chip)) if instrume == 'WFC3': if grism == 'G280': conf_file = os.path.join(GRIZLI_PATH, 'CONF/G280/', 'WFC3.UVIS.G280.cal/WFC3.UVIS.G280.CHIP{0:d}.V2.0.conf'.format(chip)) return conf_file conf_file = os.path.join(GRIZLI_PATH, 'CONF/{0}.{1}.V4.32.conf'.format(grism, filter)) # When direct + grism combination not found for WFC3 assume F140W if not os.path.exists(conf_file): conf_file = os.path.join(GRIZLI_PATH, 'CONF/{0}.{1}.V4.32.conf'.format(grism, 'F140W')) if instrume == 'NIRISS': conf_files = [] conf_files.append(os.path.join(GRIZLI_PATH, 'CONF/{0}.{1}.221215.conf'.format(grism, filter))) conf_files.append(os.path.join(GRIZLI_PATH, 'CONF/{0}.{1}.220725.conf'.format(grism, filter))) conf_files.append(os.path.join(GRIZLI_PATH, 'CONF/{0}.{1}.conf'.format(grism, filter))) conf_files.append(os.path.join(GRIZLI_PATH, 'CONF/NIRISS.{0}.conf'.format(filter))) for conf_file in conf_files: if os.path.exists(conf_file): #print(f'NIRISS: {conf_file}') break else: #print(f'skip NIRISS: {conf_file}') pass # if not os.path.exists(conf_file): # print('CONF/{0}.{1}.conf'.format(grism, filter)) # conf_file = os.path.join(GRIZLI_PATH, # 'CONF/NIRISS.{0}.conf'.format(filter)) # if instrume == 'NIRCam': # conf_file = os.path.join(GRIZLI_PATH, # 'CONF/aXeSIM_NC_2016May/CONF/NIRCam_LWAR_{0}.conf'.format(grism)) if instrume in ['NIRCAM']: #conf_file = os.path.join(GRIZLI_PATH, # f'CONF/NIRCam.A.{filter}.{grism}.conf') fi = grism gr = filter[-1] # R, C # conf_file = os.path.join(GRIZLI_PATH, # f'CONF/GRISM_NIRCAM/gNIRCAM.{fi}.mod{module}.{gr}.conf') # # conf_file = os.path.join(GRIZLI_PATH, # f'CONF/GRISM_NIRCAM/V2/NIRCAM_{fi}_mod{module}_{gr}.conf') # NIRCam preference: 8.5 > 8 > 4 conf_file_base = os.path.join(GRIZLI_PATH, f'CONF/GRISM_NIRCAM/[[NIRCAM_VERSION]]/NIRCAM_{fi}_mod{module}_{gr}.conf') _conf_versions = [NIRCAM_CONF_VERSION, 'V8.5', 'V8', 'V4', 'V6'] conf_file = None for NIRCAM_VERSION in _conf_versions: conf_file = conf_file_base.replace('[[NIRCAM_VERSION]]', NIRCAM_VERSION) if os.path.exists(conf_file): break if conf_file is None: raise ValueError elif instrume == 'NIRCAMA': fi = grism gr = filter[-1] # R, C conf_file = os.path.join(GRIZLI_PATH, f'CONF/GRISM_NIRCAM/gNIRCAM.{fi}.modA.{gr}.conf') #conf_file = os.path.join(GRIZLI_PATH, # f'CONF/NIRCam.B.{filter}.{grism}.conf') elif instrume == 'NIRCAMB': fi = grism gr = filter[-1] # R, C conf_file = os.path.join(GRIZLI_PATH, f'CONF/GRISM_NIRCAM/gNIRCAM.{fi}.modB.{gr}.conf') #conf_file = os.path.join(GRIZLI_PATH, # f'CONF/NIRCam.B.{filter}.{grism}.conf') if (instrume in ['NIRCAM','NIRISS']) & use_jwst_crds: if instrume == 'NIRCAM': _pupil = filter _filter = grism else: _pupil = filter _filter = grism refs = crds_reffiles(instrument=instrume, filter=_filter, pupil=_pupil, module=module, date=None, reftypes=('photom', 'specwcs'), header=None, context=crds_context) conf_file = refs['specwcs'] print('get_conf: xxx', conf_file, grism, filter, pupil, module, instrume) if instrume == 'WFIRST': conf_file = os.path.join(GRIZLI_PATH, 'CONF/WFIRST.conf') if instrume == 'WFI': conf_file = os.path.join(GRIZLI_PATH, 'CONF/Roman.G150.conf') if instrume == 'SYN': conf_file = os.path.join(GRIZLI_PATH, 'CONF/syn.conf') # Euclid NISP, config files @ # http://www.astrodeep.eu/euclid-spectroscopic-simulations/ if instrume == 'NISP': if grism == 'BLUE': conf_file = os.path.join(GRIZLI_PATH, 'CONF/Euclid.Gblue.0.conf') else: conf_file = os.path.join(GRIZLI_PATH, 'CONF/Euclid.Gred.0.conf') return conf_file
[docs]class JwstDispersionTransform(object): """ Rotate NIRISS and NIRCam coordinates such that slitless dispersion has wavelength increasing towards +x. Also works for HST, but does nothing. """ def __init__(self, instrument='NIRCAM', module='A', grism='R', conf_file=None, header=None): self.instrument = instrument self.module = 'A' if module is None else module self.grism = grism if conf_file is not None: self.base = os.path.basename(conf_file.split('.conf')[0]) else: self.base = None if conf_file is not None: if 'NIRISS' in conf_file: # NIRISS_F200W_GR150R.conf self.instrument = 'NIRISS' self.grism = self.base.split('_')[2][-1] self.module = 'A' elif 'NIRCAM' in conf_file: # NIRCAM_F444W_modA_R.conf self.instrument = 'NIRCAM' self.grism = self.base[-1] self.module = self.base[-3] else: # NIRCAM_F444W_modA_R.conf self.instrument = 'HST' self.grism = 'G141' self.module = 'A' elif header is not None: if 'INSTRUME' in header: self.instrument = header['INSTRUME'] if 'MODULE' in header: self.module = module if self.instrument == 'NIRCAM': if 'PUPIL' in header: self.grism = header['PUPIL'] else: if 'FILTER' in header: self.grism = header['FILTER'] @property def array_center(self): """ Center of rotation Maybe this is 1020 for NIRISS? """ if self.instrument == 'HST': return np.array([507.5, 507.5]) else: return np.array([1024.5, 1024.5]) @property def rotation(self): """ Clockwise rotation (degrees) from detector to wavelength increasing towards +x direction """ if self.instrument == 'NIRCAM': if self.module == 'A': if self.grism in ['R', 'GRISMR']: rotation = 0. elif self.grism in ['C', 'GRISMC']: rotation = 90. else: raise ValueError(f'NIRCAM {self.grism} must be GRISMR/C') elif self.module == 'B': if self.grism in ['R', 'GRISMR']: rotation = 180. elif self.grism in ['C', 'GRISMC']: rotation = 90. else: raise ValueError(f'NIRCAM {self.grism} must be GRISMR/C') else: raise ValueError(f'NIRCAM {self.module} must be A/B') elif self.instrument == 'NIRISS': if self.grism in ['GR150R','R']: rotation = 270. elif self.grism in ['GR150C','C']: rotation = 180. else: raise ValueError(f'NIRISS {self.grism} must be GR150R/C') else: # e.g., WFC3, ACS rotation = 0. return rotation @property def rot90(self): """ Rotations are all multiples of 90 for now, so compute values that can be passed to `numpy.rot90` for rotating 2D image arrays """ return int(np.round(self.rotation/90)) @property def trace_axis(self): """Which detector axis corresponds to increasing wavelength """ if self.instrument == 'NIRCAM': if self.module == 'A': if self.grism == 'R': axis = '+x' else: axis = '+y' else: if self.grism == 'R': axis = '-x' else: axis = '+y' elif self.instrument == 'NIRISS': if self.grism == 'R': axis = '-y' else: axis = '-x' else: # e.g., WFC3, ACS axis = '+x' return axis
[docs] @staticmethod def rotate_coordinates(x, y, theta, center): """ Rotate cartesian coordinates ``x`` and ``y`` by angle ``theta`` (radians) about ``center`` """ _mat = np.array([[np.cos(theta), -np.sin(theta)], [np.sin(theta), np.cos(theta)]]) x1 = np.atleast_1d(x) y1 = np.atleast_1d(y) return ((np.array([x1, y1]).T-center).dot(_mat)+center).T
[docs] def forward(self, x, y): """ Forward transform, detector to +x """ theta = self.rotation/180*np.pi return self.rotate_coordinates(x, y, theta, self.array_center)
[docs] def reverse(self, x, y): """ Reverse transform, +x to detector """ theta = -self.rotation/180*np.pi return self.rotate_coordinates(x, y, theta, self.array_center)
[docs]class TransformGrismconf(object): """ Transform GRISMCONF-format configuration files to grizli convention of wavelength increasing towards +x See https://github.com/npirzkal/GRISMCONF and config files at, e.g., https://github.com/npirzkal/GRISM_NIRCAM. """ def __init__(self, conf_file=''): """ Parameters ---------- conf_file : str Configuration filename """ import grismconf self.conf_file = conf_file if 'specwcs' in conf_file: self.conf = CRDSGrismConf(conf_file, get_photom=True) kws = {'instrument': self.conf.instrument, 'module': self.conf.module, 'grism': self.conf.grism[-1], } self.transform = JwstDispersionTransform(**kws) else: self.conf = grismconf.Config(conf_file) self.transform = JwstDispersionTransform(conf_file=conf_file) self.order_names = {'A':'+1', 'B':'0', 'C':'+2', 'D':'+3', 'E':'-1', 'F':'+4'} self.beam_names = {} for k in self.order_names: self.beam_names[self.order_names[k]] = k self.dxlam = OrderedDict() self.nx = OrderedDict() self.sens = OrderedDict() self.xoff = 0.0 self.yoff = 0.0 self.conf_dict = {} @property def orders(self): """ GRISMCONF order names, like '+1', '0', '+2', etc. """ return self.conf.orders @property def beams(self): """ aXe beam names like 'A','B','C', etc. """ beams = [self.beam_names[k] for k in self.orders] return beams
[docs] def remove_beam(self, beam, verbose=True): """ Remove a beam from the orders list """ order_name = None for k in self.beam_names: if self.beam_names[k] == beam: order_name = k if order_name is None: return False if hasattr(self.conf, 'dm_orders'): olist = self.conf.dm_orders order_name = int(order_name) else: olist = self.conf.orders if order_name in olist: ix = olist.index(order_name) _ = olist.pop(ix) return True else: return False
[docs] def get_beam_trace(self, x=1024, y=1024, dx=0., beam='A', fwcpos=None): """ Function analogous to `grizli.grismconf.aXeConf.get_beam_trace` but that accounts for the different dispersion axes of JWST grisms Parameters ---------- x, y : float Reference position in the rotated frame dx : array-like Offset in pixels along the trace beam : str Grism order, translated from +1, 0, +2, +3, -1 = A, B, C, D, E fwcpos : float NIRISS rotation *(not implemented)* Returns ------- dy : float or array-like Center of the trace in y pixels offset from `(x,y)` evaluated at `dx`. lam : float or array-like Effective wavelength along the trace evaluated at `dx`. """ from astropy.modeling.models import Polynomial2D x0 = np.squeeze(self.transform.reverse(x, y)) if self.transform.trace_axis == '+x': t_func = self.conf.INVDISPX trace_func = self.conf.DISPY delta = 1*dx elif self.transform.trace_axis == '-x': t_func = self.conf.INVDISPX trace_func = self.conf.DISPY delta = -1*dx elif self.transform.trace_axis == '+y': t_func = self.conf.INVDISPY trace_func = self.conf.DISPX delta = 1*dx else: # -y t_func = self.conf.INVDISPY trace_func = self.conf.DISPX delta = -1*dx #print('xref: ', self.conf_file, self.transform.trace_axis) #print(x0, t_func) t = t_func(self.order_names[beam], *x0, delta) tdx = self.conf.DISPX(self.order_names[beam], *x0, t) tdy = self.conf.DISPY(self.order_names[beam], *x0, t) rev = self.transform.forward(x0[0]+tdx, x0[1]+tdy) trace_dy = rev[1,:] - y #trace_dy = y - rev[1,:] # Trace offsets for NIRCam if 'V4/NIRCAM_F444W_modB_R.conf' in self.conf_file: trace_dy += -0.5 # Shifts derived from FRESCO coeffs = {'c0_0': 0.3723992993620532, 'c1_0': -0.00011461411413576305, 'c2_0': -8.575199405062535e-08, 'c0_1': -0.0011862122093603026, 'c0_2': 4.1403439215806165e-07, 'c1_1': 1.6558275336712723e-07 } poly = Polynomial2D(degree=2, **coeffs) trace_dy += poly(x, y) # print(f'polynomial offset: {poly(x,y):.3f}') elif 'V4/NIRCAM_F444W_modA_R.conf' in self.conf_file: trace_dy += -2.5 # Shifts derived from FRESCO coeffs = {'c0_0': 0.34191256988768415, 'c1_0': -0.0003378232293429956, 'c2_0': -9.238111910134196e-09, 'c0_1': -7.063720696711682e-05, 'c0_2': 2.5217177632321527e-08, 'c1_1': -1.4345820074275903e-07 } poly = Polynomial2D(degree=2, **coeffs) trace_dy += poly(x, y) # print(f'polynomial offset: {poly(x,y):.3f}') elif ('V8/NIRCAM' in self.conf_file) | ('V8.5/NIRCAM' in self.conf_file) | ('V9/NIRCAM' in self.conf_file): #print('V8: do nothing') pass elif os.path.basename(self.conf_file) == 'NIRCAM_F444W_modA_R.conf': trace_dy += -2.5 elif os.path.basename(self.conf_file) == 'NIRCAM_F444W_modA_C.conf': trace_dy += -0.1 wave = self.conf.DISPL(self.order_names[beam], *x0, t) if self.transform.instrument != 'HST': wave *= 1.e4 return trace_dy, wave
[docs] def get_beams(self, nt=512, min_sens=1.e-3): """ Get beam parameters and read sensitivity curves Parameters ---------- nt : int Number of points to sample the GRISMCONF `t` parameter Returns ------- sets `dxlam`, `nx`, `sens`, attributes """ import os from astropy.table import Table, Column t = np.linspace(0, 1, nt) for beam in self.beams: order = self.order_names[beam] # Define t from sensitivity if self.conf.SENS is None: _swave, _ssens = self.conf.SENS_data[order] _swave = _swave[_ssens > min_sens*np.nanmax(_ssens)] _ssens = _ssens[_ssens > min_sens*np.nanmax(_ssens)] _dw = _swave.max() - _swave.min() _wgrid = np.linspace(_swave.min()-0.02*_dw, _swave.max()+_dw*0.02, nt) t = self.conf.INVDISPL(order, *self.transform.array_center, _wgrid) dx = self.conf.DISPX(order, *self.transform.array_center, t) dy = self.conf.DISPY(order, *self.transform.array_center, t) lam = self.conf.DISPL(order, *self.transform.array_center, t) trace_axis = self.transform.trace_axis if trace_axis == '+x': xarr = 1*dx elif trace_axis == '-x': xarr = -1*dx elif trace_axis == '+y': xarr = 1*dy elif trace_axis == '-y': xarr = -1*dy xarr = np.cast[int](np.round(xarr)) #self.beams.append(beam) self.dxlam[beam] = np.arange(xarr.min(), xarr.max(), dtype=int) self.nx[beam] = xarr.max() - xarr.min() +1 # Do we need to force zeroth order to be between first and second? if beam == 'B': bm1 = self.beam_names['-1'] bp1 = self.beam_names['+1'] if (self.nx[beam] > 500) & (bm1 in self.nx) & (bp1 in self.nx): _xmin = self.dxlam[bm1].max() _xmax = self.dxlam[bp1].min() self.dxlam[beam] = np.arange(_xmin, _xmax, dtype=int) self.nx[beam] = _xmax - _xmin +1 sens = Table() sens['WAVELENGTH'] = lam.astype(np.double) if self.conf.SENS is None: # specwcs _sens = np.interp(lam, *self.conf.SENS_data[order], left=0., right=0.) sens['SENSITIVITY'] = _sens.astype(np.double) else: sens['SENSITIVITY'] = self.conf.SENS[order](lam).astype(np.double) if lam.max() < 100: sens['WAVELENGTH'] *= 1.e4 self.sens[beam] = sens self.conf_dict[f'BEAM{beam}'] = np.array([xarr.min(), xarr.max()]) self.conf_dict[f'MMAG_EXTRACT_{beam}'] = 29 # Read updated sensitivity files if 'V4/NIRCAM_F444W' in self.conf_file: sens_file = self.conf_file.replace('.conf', '_ext_sensitivity.fits') if os.path.exists(sens_file): # print(f'Replace sensitivity: {sens_file}') new = utils.read_catalog(sens_file) _tab = utils.GTable() _tab['WAVELENGTH'] = new['WAVELENGTH'].astype(float) _tab['SENSITIVITY'] = new['SENSITIVITY'].astype(float) _tab['ERROR'] = new['ERROR'].astype(float) self.sens['A'] = _tab self.load_nircam_sensitivity_curve()
[docs] def load_nircam_sensitivity_curve(self, verbose=True, **kwargs): """ Replace +1 NIRCam sensitivity curves with Nov 10, 2023 updates Files generated with the calibration data of P330E from program CAL-1538 (K. Gordon) Download the FITS files from the link below and put them in ``$GRIZLI/CONF/GRISM_NIRCAM/``. https://s3.amazonaws.com/grizli-v2/JWSTGrism/NircamSensitivity/index.html """ if 'NIRCAM' not in self.conf_file: return None pars = os.path.basename(self.conf_file).split('_') path = os.path.join(GRIZLI_PATH, 'CONF', 'GRISM_NIRCAM') sens_base = 'nircam_wfss_sensitivity_{filter}_{pupil}_{module}.10nov23.fits' sens_file = sens_base.format(filter=pars[1], pupil='GRISM'+pars[3][0], module=pars[2][-1]) sens_file = os.path.join(path, sens_file) # print('xx', sens_file, os.path.exists(sens_file)) if os.path.exists(sens_file): msg = "grismconf.aXeConf: replace sensitivity curve with " msg += f"{sens_file}" utils.log_comment(utils.LOGFILE, msg, verbose=verbose) si = utils.read_catalog(sens_file).copy() si['ERROR'] = si['SENSITIVITY'].max()*0.01 _tab = utils.GTable() _tab['WAVELENGTH'] = si['WAVELENGTH'].astype(float) _tab['SENSITIVITY'] = si['SENSITIVITY'].astype(float) _tab['ERROR'] = si['ERROR'].astype(float) self.sens['A'] = _tab
[docs]def load_grism_config(conf_file, warnings=True): """Load parameters from an aXe configuration file Parameters ---------- conf_file : str Filename of the configuration file Returns ------- conf : `~grizli.grismconf.aXeConf` Configuration file object. Runs `conf.get_beams()` to read the sensitivity curves. """ if 'V3/NIRCAM' in conf_file: conf = TransformGrismconf(conf_file) conf.get_beams() elif 'V2/NIRCAM' in conf_file: conf = TransformGrismconf(conf_file) conf.get_beams() elif 'V4/NIRCAM' in conf_file: conf = TransformGrismconf(conf_file) conf.get_beams() elif 'V8/NIRCAM' in conf_file: conf = TransformGrismconf(conf_file) conf.get_beams() elif 'V8.5/NIRCAM' in conf_file: conf = TransformGrismconf(conf_file) conf.get_beams() elif 'V9/NIRCAM' in conf_file: conf = TransformGrismconf(conf_file) conf.get_beams() elif 'specwcs' in conf_file: conf = TransformGrismconf(conf_file) conf.get_beams() else: conf = aXeConf(conf_file) conf.get_beams() # Preliminary hacks for on-sky NIRISS if 'GR150' in conf_file: if 0: hack_niriss = 1./1.8 * 1.1 msg = f""" ! Scale NIRISS sensitivity by {hack_niriss:.3f} to hack gain correction ! and match GLASS MIRAGE simulations. Sensitivity will be updated when ! on-sky data available """ msg = f' ! Scale NIRISS sensitivity by {hack_niriss:.3f} prelim flux correction' utils.log_comment(utils.LOGFILE, msg, verbose=warnings) elif 'F090W' in conf_file: # hack_niriss = 0.8 hack_niriss = 1.0 else: hack_niriss = 1.0 for b in conf.sens: conf.sens[b]['SENSITIVITY'] *= hack_niriss if 'ERROR' in conf.sens[b].colnames: conf.sens[b]['ERROR'] *= hack_niriss if ('F115W' in conf_file) | ('.2212' in conf_file): pass # msg = f""" !! Shift F115W along dispersion""" # utils.log_comment(utils.LOGFILE, msg, verbose=warnings) # for b in conf.beams: # #conf.conf[f'DYDX_{b}_0'][0] += 0.25 # conf.conf[f'DLDP_{b}_0'] -= conf.conf[f'DLDP_{b}_1']*0.5 elif ('F090W' in conf_file) | ('.2212' in conf_file): pass elif isinstance(conf, TransformGrismconf): # Don't shift new format files pass else: msg = f""" !! Shift {os.path.basename(conf_file)} along dispersion""" utils.log_comment(utils.LOGFILE, msg, verbose=warnings) for b in conf.beams: #conf.conf[f'DYDX_{b}_0'][0] += 0.25 conf.conf[f'DLDP_{b}_0'] += conf.conf[f'DLDP_{b}_1']*0.5 # For red galaxy conf.conf[f'DLDP_{b}_0'] += conf.conf[f'DLDP_{b}_1']*0.5 # if 'F200W' in conf_file: # conf.conf[f'DLDP_{b}_0'] += conf.conf[f'DLDP_{b}_1']*0.5 # _w = conf.sens['A']['WAVELENGTH'] # _w0 = (_w*conf.sens['A']['SENSITIVITY']).sum() # _w0 /= conf.sens['A']['SENSITIVITY'].sum() # slope = 1.05 + 0.2 * (_w - _w0)/3000 # # print('xxx', conf_file, _w0) # conf.sens['A']['SENSITIVITY'] *= slope if ('F150W' in conf_file) & (hack_niriss > 1.01): conf.sens['A']['SENSITIVITY'] *= 1.08 # Scale 0th orders in F150W if ('F150W' in conf_file): # | ('F200W' in conf_file): msg = f""" ! Scale 0th order (B) by an additional x 1.5""" utils.log_comment(utils.LOGFILE, msg, verbose=warnings) conf.sens['B']['SENSITIVITY'] *= 1.5 if 'ERROR' in conf.sens['B'].colnames: conf.sens['B']['ERROR'] *= 1.5 # Another shift from 0723, 2744 # if ('GR150C.F200W' in conf_file): # msg = f""" !! Extra shift for GR150C.F200W""" # utils.log_comment(utils.LOGFILE, msg, verbose=warnings) # for b in conf.beams: # pass # #conf.conf[f'DYDX_{b}_0'][0] += 0.25 # # conf.conf[f'DLDP_{b}_0'] -= conf.conf[f'DLDP_{b}_1']*0.5 # Shift x by 1 px # msg = f""" ! Shift NIRISS by 0.5 pix along dispersion direction""" # utils.log_comment(utils.LOGFILE, msg, verbose=warnings) # # for b in conf.beams: # conf.conf[f'DLDP_{b}_0'] -= conf.conf[f'DLDP_{b}_1']*0.5 return conf
[docs]def download_jwst_crds_references(instruments=['NIRCAM','NIRISS'], filters=['F090W','F115W','F150W','F200W','F277W','F356W','F410M','F444W','F460M','F480M'], grisms=['GRISMR','GRISMC','GR150R','GR150C'], modules=['A','B'], context=DEFAULT_CRDS_CONTEXT, verbose=True): """ Run `~grizli.jwst_utils.crds_reffiles` with filter and grism combinations to prefetch a bunch of reference files """ for instrument in instruments: for f in filters: for g in grisms: if instrument == 'NIRCAM': if (f < 'F270') | g.startswith('GR150'): continue for m in modules: _ = crds_reffiles(instrument='NIRCAM', filter=f, pupil=g, module=m, reftypes=('photom', 'specwcs'), header=None, context=context, verbose=verbose) else: if (f > 'F270') | g.startswith('GRISM'): continue _ = crds_reffiles(instrument='NIRISS', filter=g, pupil=f, module='A', reftypes=('photom', 'specwcs'), header=None, context=context, verbose=verbose)
[docs]class CRDSGrismConf(): def __init__(self, file='references/jwst/nircam/jwst_nircam_specwcs_0136.asdf', get_photom=True, context=DEFAULT_CRDS_CONTEXT, **kwargs): """ Helper object to replicate `grismconf` config files from CRDS products Parameters ---------- file : str Filename of a CRDS ``specwcs`` file get_photom : bool Get sensitivity curves from the ``photom`` reference file context : str Explicit CRDS_CONTEXT Attributes ---------- dm : `jwst.datamodels.NIRCAMGrismModel` DataModel of the ``specwcs`` reference meta : dict Metadata dictionary from `jwst.datamodels.NIRCAMGrismModel.meta.instance` dm_orders : list List of orders from `dm.orders` crds_parameters : dict CRDS parameter dictionary from `dm.get_crds_parameters()` dispx, dispy, displ : list, list, list Parameter polynomials from the reference datamodel SENS_data : dict Sensitivity data like ``{order: (wave_microns, sensitity)}`` """ from . import jwst_utils if context is not None: jwst_utils.CRDS_CONTEXT = context jwst_utils.set_crds_context(verbose=False, override_environ=True) self.file = file if file.startswith('references'): full_path = os.path.join(os.environ['CRDS_PATH'], file) elif os.path.exists(file): full_path = file elif '/references' in file: # Replace CRDS_PATH in absolute path full_path = os.path.join(os.environ['CRDS_PATH'], 'references', file.split('references/')[1]) if not os.path.exists(full_path): # Try to download all of the JWST references download_jwst_crds_references(context=context) self.full_path = full_path self.initialize_from_datamodel() self.SENS = None self.SENS_data = None if get_photom: self.get_photom(**kwargs) self.load_new_sensitivity_curve(**kwargs)
[docs] def initialize_from_datamodel(self): """ Initialize polynomial objects from a `jwst.datamodel` """ import copy import jwst.datamodels full_path = self.full_path if 'nircam' in full_path: dm = jwst.datamodels.NIRCAMGrismModel(full_path) else: dm = jwst.datamodels.NIRISSGrismModel(full_path) self.meta = copy.deepcopy(dm.meta.instance) self.dm_orders = copy.deepcopy(dm.orders) self.crds_parameters = dm.get_crds_parameters() self.dispx = copy.deepcopy(dm.dispx) self.dispy = copy.deepcopy(dm.dispy) self.displ = copy.deepcopy(dm.displ)
# @property # def meta(self): # """metadata dictionary""" # return self.dm.meta.instance @property def module(self): if self.instrument == 'NIRCAM': return self.meta['instrument']['module'] else: return None @property def filter(self): return self.meta['instrument']['filter'] @property def pupil(self): return self.meta['instrument']['pupil'] @property def grism(self): """ Return filter for NIRISS, pupil for NIRCAM, filter for HST """ if self.instrument in ('NIRCAM','NIRISS'): if self.instrument == 'NIRCAM': return self.pupil else: return self.filter else: # e.g., HST return self.filter @property def instrument(self): return self.meta['instrument']['name'] @property def filter_grism(self): """Return combination of blocking filter + dispering element""" if self.instrument == 'NIRISS': return (self.pupil, self.filter) else: return (self.filter, self.pupil) @property def instrument_setup(self): """Return combination of blocking filter, dispering element, (module)""" if self.instrument == 'NIRISS': return (self.pupil, self.filter, None) else: return (self.filter, self.pupil, self.module) @property def orders(self): """String version of orders, like '+1', '+2', '0', '-1'""" orders = [] for o in self.dm_orders: if o > 0: orders.append(f'+{o}') else: orders.append(f'{o}') return orders
[docs] def DISPX(self, order, x0, y0, t): """Replicate grismconf.DISPX""" io = self.orders.index(order) dispx = self._eval_model(self.dispx[io], x0, y0, t) return dispx
[docs] def DDISPX(self, order, x0, y0, t, dt=0.01): """Replicate grismconf.DDISPX""" io = self.orders.index(order) v0 = self._eval_model(self.dispx[io], x0, y0, t) v1 = self._eval_model(self.dispx[io], x0, y0, t+dt) return (v1-v0)/dt
[docs] def DISPY(self, order, x0, y0, t): """Replicate grismconf.DISPY""" io = self.orders.index(order) dispy = self._eval_model(self.dispy[io], x0, y0, t) return dispy
[docs] def DDISPY(self, order, x0, y0, t, dt=0.01): """Replicate grismconf.DDISPY""" io = self.orders.index(order) v0 = self._eval_model(self.dispy[io], x0, y0, t) v1 = self._eval_model(self.dispy[io], x0, y0, t+dt) return (v1-v0)/dt
[docs] def DISPXY(self, order, x0, y0, t): """Replicate grismconf.DISPXY (combination of DISPX, DISPY)""" io = self.orders.index(order) dispx = self._eval_model(self.dispx[io], x0, y0, t) dispy = self._eval_model(self.dispy[io], x0, y0, t) return dispx, dispy
[docs] def DISPL(self, order, x0, y0, t): """Replicate grismconf.DISPL""" io = self.orders.index(order) displ = self._eval_model(self.displ[io], x0, y0, t) return displ
[docs] def DDISPL(self, order, x0, y0, t, dt=0.01): """Replicate grismconf.DDISPL""" io = self.orders.index(order) v0 = self._eval_model(self.displ[io], x0, y0, t) v1 = self._eval_model(self.displ[io], x0, y0, t+dt) return (v1-v0)/dt
[docs] def INVDISPX(self, order, x0, y0, dx, t0=np.linspace(-1, 2, 128), from_root=False): """ Inverse DISPX """ if from_root: func = self._root_inverse_model else: func = self._inverse_model io = self.orders.index(order) t = func(self.dispx[io], x0, y0, dx, t0=t0) return t
[docs] def INVDISPY(self, order, x0, y0, dx, t0=np.linspace(-1, 2, 128), from_root=False): """ Inverse DISPY """ if from_root: func = self._root_inverse_model else: func = self._inverse_model io = self.orders.index(order) t = func(self.dispy[io], x0, y0, dx, t0=t0) return t
[docs] def INVDISPL(self, order, x0, y0, dx, t0=np.linspace(-1, 2, 128), from_root=False): """ Inverse DISPL """ if from_root: func = self._root_inverse_model else: func = self._inverse_model io = self.orders.index(order) t = func(self.displ[io], x0, y0, dx, t0=t0) return t
def _eval_model(self, model, x0, y0, t, get_coeffs=False): """ General function for evaluating model polynomials """ import numpy as np if hasattr(model, 'n_inputs'): # model is a Polynomial if model.n_inputs == 1: if get_coeffs: value = model.parameters[::-1] else: value = model(t) else: value = model(x0, y0) elif len(model) == 1: # model is a single-element list, probably Polynomial1D if model[0].n_inputs == 1: if get_coeffs: value = model[0].parameters[::-1] else: value = model[0](t) else: value = model[0](x0, y0) else: # model is a list _c = [] for m in model: if m.n_inputs == 1: _c.append(m(t)) else: _c.append(m(x0, y0)) if get_coeffs: value = _c[::-1] else: value = np.polyval(_c[::-1], t) return value def _root_inverse_model(self, model, x0, y0, dx, **kwargs): """Inverse values interpolated from the forward model using polynomial roots""" coeffs = self._eval_model(model, x0, y0, 0, get_coeffs=True) if hasattr(coeffs, '__len__'): coeffs[-1] -= dx value = np.roots(coeffs)[-1] else: value = coeffs return value def _inverse_model(self, model, x0, y0, dx, t0=np.linspace(-1, 2, 128)): """Inverse values interpolated from the forward model """ values = self._eval_model(model, x0, y0, t0) if values[0] > values[1]: return np.interp(dx, values[::-1], t0[::-1]) else: return np.interp(dx, values, t0)
[docs] def get_photom(self, xyt=(1024, 1024, 0.5), date=None, photom_file=None, verbose=False, **kwargs): """ Load photom reference from CRDS and scale to grismconf / aXe convention Parameters ---------- xyt : (float, float, float) Coordinate (x0, y0, t) where to evaluate the grism dispersion DLDP date : str, None Observation date in ISO format, e.g., '2023-01-01 00:00:00'. If not specified, defaults to "now" photom_file : str Explicit filename of a CRDS ``photom`` reference file verbose : bool Print status message Returns ------- SENS_data : dict Dict of ``{'order': (wave, sens)}``. Also sets ``SENS_data``, ``SENS_dldp`` and ``SENS_xyt`` attributes. """ import astropy.time import astropy.table import astropy.units as u import crds import jwst.datamodels if photom_file is None: cpars = self.crds_parameters if self.instrument == 'NIRISS': cpars['meta.instrument.detector'] = 'NIS' cpars['meta.exposure.type'] = 'NIS_WFSS' else: cpars['meta.instrument.detector'] = f'NRC{self.module}LONG' cpars['meta.exposure.type'] = 'NRC_WFSS' if date is None: date = astropy.time.Time.now().iso cpars['meta.observation.date'] = date.split()[0] cpars['meta.observation.time'] = date.split()[1] refs = crds.getreferences(cpars, reftypes=('photom',)) if verbose: msg = f"Read photometry reference {refs['photom']} (date = '{date}')" print(msg) photom_file = refs['photom'] if self.instrument == 'NIRCAM': ph = jwst.datamodels.NrcWfssPhotomModel(photom_file) else: ph = jwst.datamodels.NisWfssPhotomModel(photom_file) phot = astropy.table.Table(ph.phot_table) pixel_area = ph.meta.photometry.pixelarea_steradians self.sens_ref_file = refs['photom'] self.SENS_xyt = xyt self.SENS_dldp = {} self.SENS_data = {} for i, order in enumerate(self.orders): ix = phot['filter'] == self.filter ix &= phot['pupil'] == self.pupil ix &= phot['order'] == self.dm_orders[i] if ix.sum() == 0: msg = f"Order {order} = {self.dm_orders[i]} not found in " msg += f"{refs['photom']} for {self.filter} {self.pupil}" print(msg) continue row = phot[ix] wave = np.squeeze(row['wavelength'].data) sens_fnu = np.squeeze(row['relresponse'].data) sens_fnu *= pixel_area*row['photmjsr'] # Dispersion, DLAM/DPIX dl = self.DDISPL(order, *xyt) dx = self.DDISPX(order, *xyt) dy = self.DDISPY(order, *xyt) dldp = dl/np.sqrt(dx**2+dy**2) self.SENS_dldp[order] = dldp mask = (wave > 0) & (sens_fnu > 0) _flam_unit = u.erg/u.second/u.cm**2/u.micron sens_flam = (sens_fnu[mask]*dldp*u.megaJansky).to(_flam_unit, equivalencies=u.spectral_density(wave[mask]*u.micron)) self.SENS_data[order] = [wave[mask], 1./sens_flam.value] return self.SENS_data
[docs] def load_new_sensitivity_curve(self, verbose=True, **kwargs): """ Replace +1 NIRCam sensitivity curves with Nov 10, 2023 updates Files generated with the calibration data of P330E from program CAL-1538 (K. Gordon) Download the FITS files from the link below and put them in ``$GRIZLI/CONF/GRISM_NIRCAM/``. https://s3.amazonaws.com/grizli-v2/JWSTGrism/NircamSensitivity/index.html """ path = os.path.join(GRIZLI_PATH, 'CONF', 'GRISM_NIRCAM') meta = self.crds_parameters if meta['meta.instrument.name'] != 'NIRCAM': if verbose: msg = 'load_new_sensitivity_curve: only defined for NIRCAM ({0})' print(msg.format(meta['meta.instrument.name'])) return None sens_base = 'nircam_wfss_sensitivity_{filter}_{pupil}_{module}.10nov23.fits' sens_file = sens_base.format(filter=meta['meta.instrument.filter'], pupil=meta['meta.instrument.pupil'], module=meta['meta.instrument.module']) sens_file = os.path.join(path, sens_file) # print('xx', sens_file, os.path.exists(sens_file)) if os.path.exists(sens_file): msg = "grismconf.CRDSGrismConf: replace sensitivity curve with " msg += f"{sens_file}" utils.log_comment(utils.LOGFILE, msg, verbose=verbose) si = utils.read_catalog(sens_file) self.SENS_data['+1'] = [si['WAVELENGTH']/1.e4, si['SENSITIVITY']]