Source code for liger_iris_pipeline.pipeline.preprocess_flatfield
#!/usr/bin/env python
from collections import defaultdict
import os.path as op
from jwst import datamodels
from jwst.associations.load_as_asn import LoadAsLevel2Asn
from jwst.stpipe import Pipeline
from ..dark_current import dark_current_step
from ..normalize import normalize_step
__all__ = ["ProcessFlatfieldL2"]
[docs]
class ProcessFlatfieldL2(Pipeline):
"""
ProcessFlatfieldL2: Remove dark and normalize exposure to create
a flat field to be later added to the CRDS.
Included steps are:
dark_current, normalize
"""
# Define alias to steps
step_defs = {
"dark_current": dark_current_step.DarkCurrentStep,
"normalize": normalize_step.NormalizeStep,
}
[docs]
def process(self, input):
self.log.info("Starting preprocess flatfield ...")
# Retrieve the input(s)
asn = LoadAsLevel2Asn.load(input, basename=self.output_file)
# Each exposure is a product in the association.
# Process each exposure.
results = []
for product in asn["products"]:
self.log.info("Processing product {}".format(product["name"]))
if self.save_results:
self.output_file = product["name"]
try:
getattr(asn, 'filename')
except AttributeError:
asn.filename = "singleton"
result = self.process_exposure_product(
product, asn["asn_pool"], op.basename(asn.filename)
)
# Save result
suffix = "flat"
result.meta.filename = self.make_output_path(suffix=suffix)
results.append(result)
self.log.info("... ending preprocess flatfield")
self.output_use_model = True
self.suffix = False
return results
# Process each exposure
[docs]
def process_exposure_product(self, exp_product, pool_name=" ", asn_file=" "):
"""Process an exposure found in the association product
Parameters
----------
exp_product: dict
A Level2b association product.
pool_name: str
The pool file name. Used for recording purposes only.
asn_file: str
The name of the association file.
Used for recording purposes only.
"""
# Find all the member types in the product
members_by_type = defaultdict(list)
for member in exp_product["members"]:
members_by_type[member["exptype"].lower()].append(member["expname"])
# Get the science member. Technically there should only be
# one. We'll just get the first one found.
science = members_by_type["science"]
if len(science) != 1:
self.log.warning(
"Wrong number of science files found in {}".format(exp_product["name"])
)
self.log.warning(" Using only first one.")
science = science[0]
self.log.info("Working on input %s ...", science)
if isinstance(science, datamodels.JwstDataModel):
input = science
else:
input = datamodels.open(science)
# Record ASN pool and table names in output
input.meta.asn.pool_name = pool_name
input.meta.asn.table_name = asn_file
input = self.dark_current(input)
input = self.normalize(input)
self.log.info("Finished processing product {}".format(exp_product["name"]))
return input