Source code for liger_iris_pipeline.parse_subarray_map.parse_subarray_map_step

import numpy as np

from jwst.stpipe import Step
from .. import datamodels
import stdatamodels

__all__ = ["ParseSubarrayMapStep"]

SUBARRAY_DQ_BIT = 4

# NOTE: xstart/ystart use 1-based indexing
def parse_subarray_map(subarray_map):
    subarray_metadata = []
    for subarray_id in range(1, 10 + 1):
        subarray_indices = np.where(subarray_map == subarray_id)
        if len(subarray_indices[0]) == 0:
            break
        subarray_metadata.append(
            {
                "xstart": int(subarray_indices[1][0] + 1),
                "ystart": int(subarray_indices[0][0] + 1),
                "xsize": int(subarray_indices[1][-1] - subarray_indices[1][0] + 1),
                "ysize": int(subarray_indices[0][-1] - subarray_indices[0][0] + 1),
            }
        )
    return subarray_metadata


[docs] class ParseSubarrayMapStep(Step): """ ParseSubarrayMapStep: Parse a subarray map extension, if available, and create header metadata and data quality flag accordingly """
[docs] def process(self, input): if isinstance(input, str): input_model = datamodels.open(input) else: input_model = input if "subarr_map" in input_model: self.log.info("Parsing the SUBARR_MAP extension") result = input_model.copy() # Create metadata from image ID map for each in parse_subarray_map(result["subarr_map"]): result.meta.subarray_map.append(each) # Indicate subarrays in dq flags result.dq[result["subarr_map"] != 0] = np.bitwise_or( result.dq[result["subarr_map"] != 0], 2 ** SUBARRAY_DQ_BIT ) else: self.log.info("No SUBARR_MAP extension found") result = input_model return result