Source code for podaac.subsetter.subset_harmony

# Copyright 2019, by the California Institute of Technology.
# ALL RIGHTS RESERVED. United States Government Sponsorship acknowledged.
# Any commercial use must be negotiated with the Office of Technology
# Transfer at the California Institute of Technology.
#
# This software may be subject to U.S. export control laws. By accepting
# this software, the user agrees to comply with all applicable U.S. export
# laws and regulations. User has the responsibility to obtain export
# licenses, or other export authority as may be required before exporting
# such information to foreign countries or providing access to foreign
# persons.

"""
=================
subset_harmony.py
=================

Implementation of harmony-service-lib that invokes the Level 2 subsetter.
"""
import argparse
import os
import subprocess
import shutil
from tempfile import mkdtemp
import traceback
from typing import List, Union
import sys

import pystac
from pystac import Asset

import harmony_service_lib
import numpy as np
from harmony_service_lib import BaseHarmonyAdapter
from harmony_service_lib.util import download, stage, generate_output_filename, bbox_to_geometry
from harmony_service_lib.exceptions import HarmonyException

from podaac.subsetter import subset
from podaac.subsetter.subset import SERVICE_NAME

DATA_DIRECTORY_ENV = "DATA_DIRECTORY"


[docs] class L2SSException(HarmonyException): """L2SS Exception class for custom error messages to see in harmony api calls."""
[docs] def __init__(self, original_exception): # Ensure we can extract traceback information if original_exception.__traceback__ is None: # Capture the current traceback if not already present try: raise original_exception except type(original_exception): original_exception.__traceback__ = sys.exc_info()[2] # Extract the last traceback entry (most recent call) for the error location tb = traceback.extract_tb(original_exception.__traceback__)[-1] # Get the error details: file, line, function, and message filename = tb.filename lineno = tb.lineno funcname = tb.name error_msg = str(original_exception) # Format the error message to be more readable readable_message = (f"Error in file '{filename}', line {lineno}, in function '{funcname}': " f"{error_msg}") # Call the parent class constructor with the formatted message and category super().__init__(readable_message, 'podaac/l2-subsetter') # Store the original exception for potential further investigation self.original_exception = original_exception
[docs] def podaac_to_harmony_bbox(bbox: np.ndarray) -> Union[np.ndarray, float]: """ Convert PO.DAAC bbox ((west, east), (south, north)) to Harmony bbox (west, south, east, north) Parameters ---------- bbox : np.array Podaac bbox Returns ------- array, int or float Harmony bbox TODO - fix this docstring type, type hint, and code to match (code currently returns a list) """ return_box = [bbox.item(0), bbox.item(2), bbox.item(1), bbox.item(3)] return return_box
[docs] def harmony_to_podaac_bbox(bbox: list) -> np.ndarray: """ Convert Harmony bbox (west, south, east, north) to PO.DAAC bbox ((west, east), (south, north)) Parameters ---------- bbox : list Harmony bbox Returns ------- np.array PO.DAAC bbox """ return np.array(((bbox[0], bbox[2]), (bbox[1], bbox[3])))
[docs] class L2SubsetterService(BaseHarmonyAdapter): """ See https://github.com/nasa/harmony-service-lib-py for documentation and examples. """
[docs] def __init__(self, message, catalog=None, config=None): super().__init__(message, catalog, config) self.data_dir = os.getenv(DATA_DIRECTORY_ENV, '/home/dockeruser/data')
[docs] def process_item(self, item: pystac.Item, source: harmony_service_lib.message.Source) -> pystac.Item: # pylint: disable=too-many-branches """ Performs variable and bounding box subsetting on the input STAC Item's data, returning an output STAC item Parameters ---------- item : pystac.Item the item that should be subset source : harmony.message.Source the input source defining the variables, if any, to subset from the item Returns ------- pystac.Item a STAC item describing the output of the subsetter """ result = item.clone() result.assets = {} # Create a temporary dir for processing we may do temp_dir = mkdtemp() output_dir = self.data_dir self.prepare_output_dir(output_dir) try: # Get the data file asset = next(v for k, v in item.assets.items() if 'data' in (v.roles or [])) input_filename = download(asset.href, temp_dir, logger=self.logger, access_token=self.message.accessToken, cfg=self.config) message = self.message # Dictionary of keywords params that will be passed into # the l2ss-py subset function subset_params = {} # Transform params to PO.DAAC subsetter arguments and invoke the subsetter harmony_bbox = [-180, -90, 180, 90] if message.subset and message.subset.bbox: harmony_bbox = message.subset.bbox if message.subset and message.subset.shape: subset_params['shapefile'] = download( message.subset.shape.href, temp_dir, logger=self.logger, access_token=self.message.accessToken, cfg=self.config ) if message.temporal: subset_params['min_time'] = message.temporal.start subset_params['max_time'] = message.temporal.end if message.pixelSubset: subset_params['pixel_subset'] = message.pixelSubset subset_params['bbox'] = harmony_to_podaac_bbox(harmony_bbox) try: harmony_cut = message.extraArgs['cut'] if harmony_cut is None: subset_params['cut'] = True else: subset_params['cut'] = harmony_cut except (KeyError, AttributeError, TypeError): subset_params['cut'] = True if source.variables: subset_params['variables'] = [variable.name for variable in source.process('variables')] if source.coordinateVariables: coordinate_variables = list( filter(lambda var: var.type and var.subtype, source.coordinateVariables) ) def filter_by_subtype(variables, subtype): return list(map(lambda var: var.name, filter( lambda var: var.subtype == subtype, variables ))) subset_params['lat_var_names'] = filter_by_subtype(coordinate_variables, 'LATITUDE') subset_params['lon_var_names'] = filter_by_subtype(coordinate_variables, 'LONGITUDE') subset_params['time_var_names'] = filter_by_subtype(coordinate_variables, 'TIME') subset_params['output_file'] = f'{output_dir}/{os.path.basename(input_filename)}' subset_params['file_to_subset'] = input_filename operations = { "variable_subset": subset_params.get('variables'), "is_subsetted": True } original_file_extension = os.path.splitext(input_filename)[1] staged_filename_true = generate_output_filename(asset.href, original_file_extension, **operations) operations = { "variable_subset": subset_params.get('variables'), "is_subsetted": False } staged_filename_false = generate_output_filename(asset.href, original_file_extension, **operations) subset_params['stage_file_name_subsetted_true'] = staged_filename_true subset_params['stage_file_name_subsetted_false'] = staged_filename_false self.logger.info('Calling l2ss-py subset with params %s', subset_params) result_bbox = subset.subset(**subset_params) # Stage the output file with a conventional filename mime = 'application/x-netcdf4' operations = { "variable_subset": subset_params.get('variables'), "is_subsetted": bool(result_bbox is not None) } staged_filename = generate_output_filename(asset.href, original_file_extension, **operations) url = stage(subset_params['output_file'], staged_filename, mime, location=message.stagingLocation, logger=self.logger, cfg=self.config) # Update the STAC record asset = Asset(url, title=staged_filename, media_type=mime, roles=['data']) result.assets['data'] = asset if result_bbox is not None: if message.subset: message.subset.process('bbox') bounding_box_array = np.array(podaac_to_harmony_bbox(result_bbox)) if not np.all(np.isnan(bounding_box_array)): result.bbox = podaac_to_harmony_bbox(result_bbox) result.geometry = bbox_to_geometry(result.bbox) # Return the STAC record return result except Exception as ex: raise L2SSException(ex) from ex finally: # Clean up any intermediate resources shutil.rmtree(temp_dir)
[docs] def prepare_output_dir(self, output_dir: str) -> None: """ Deletes (if present) and recreates the given output_dir, ensuring it exists and is empty Parameters ---------- output_dir : string the directory to delete and recreate """ self.cmd('rm', '-rf', output_dir) self.cmd('mkdir', '-p', output_dir)
[docs] def cmd(self, *args) -> List[str]: """ Logs and then runs command. Parameters ---------- args Command and args to run Returns ------- Command output """ self.logger.info("%s %s", args[0], " ".join(["'{}'".format(arg) for arg in args[1:]])) # pylint: disable=C0209 result_str = subprocess.check_output(args).decode("utf-8") return result_str.split("\n")
[docs] def main(config: harmony_service_lib.util.Config = None) -> None: """Parse command line arguments and invoke the service to respond to them. Parameters ---------- config : harmony.util.Config Returns ------- None """ parser = argparse.ArgumentParser(prog=SERVICE_NAME, description='Run the l2_subsetter service') harmony_service_lib.setup_cli(parser) args = parser.parse_args() if harmony_service_lib.is_harmony_cli(args): harmony_service_lib.run_cli(parser, args, L2SubsetterService, cfg=config) else: parser.error("Only --harmony CLIs are supported")
if __name__ == "__main__": main()