Source code for merger.harmony.service

"""A Harmony service wrapper around the Concise module"""

from datetime import datetime, timezone
from pathlib import Path
from tempfile import TemporaryDirectory
from shutil import copyfile
from urllib.parse import urlsplit
from uuid import uuid4

from harmony.adapter import BaseHarmonyAdapter
from harmony.util import bbox_to_geometry, stage
from pystac import Catalog, Item
from pystac.item import Asset

from podaac.merger.merge import merge_netcdf_files
from podaac.merger.harmony.download_worker import multi_core_download
from podaac.merger.harmony.util import get_bbox, get_datetime, get_granule_url


NETCDF4_MIME = 'application/x-netcdf4'  # pylint: disable=invalid-name


[docs]class ConciseService(BaseHarmonyAdapter): """ A harmony-service-lib wrapper around the Concise module. This wrapper does not support Harmony calls that do not have STAC catalogs as support for this behavior is being depreciated in harmony-service-lib """
[docs] def invoke(self): """ Primary entrypoint into the service wrapper. Overrides BaseHarmonyAdapter.invoke """ if not self.catalog: # Message-only support is being depreciated in Harmony so we should expect to # only see requests with catalogs when invoked with a newer Harmony instance # https://github.com/nasa/harmony-service-lib-py/blob/21bcfbda17caf626fb14d2ac4f8673be9726b549/harmony/adapter.py#L71 raise RuntimeError('Invoking CONCISE without a STAC catalog is not supported') return (self.message, self.process_catalog(self.catalog))
[docs] def process_catalog(self, catalog: Catalog): """ Recursively process a catalog and all its children. Adapted from BaseHarmonyAdapter._process_catalog_recursive to specfifically support our particular use case for many-to-one Parameters ---------- catalog : pystac.Catalog or pystac.Collection a catalog/collection to process for merging Returns ------- pystac.Catalog A new catalog containing the results from the merge """ result = catalog.clone() result.id = str(uuid4()) result.clear_children() # Get all the items from the catalog, including from child or linked catalogs items = list(self.get_all_catalog_items(catalog)) # Quick return if catalog contains no items if len(items) == 0: return result # -- Process metadata -- bbox = [] granule_urls = [] datetimes = [ datetime.max.replace(tzinfo=timezone.utc), # start datetime.min.replace(tzinfo=timezone.utc) # end ] for item in items: get_bbox(item, bbox) get_granule_url(item, granule_urls) get_datetime(item, datetimes) # Items did not have a bbox; valid under spec if len(bbox) == 0: bbox = None # -- Perform merging -- collection = self._get_item_source(items[0]).collection filename = f'{collection}_merged.nc4' with TemporaryDirectory() as temp_dir: self.logger.info('Starting granule downloads') input_files = multi_core_download(granule_urls, temp_dir, self.message.accessToken, self.config) self.logger.info('Finished granule downloads') output_path = Path(temp_dir).joinpath(filename).resolve() merge_netcdf_files(input_files, output_path, logger=self.logger) staged_url = self._stage(str(output_path), filename, NETCDF4_MIME) # -- Output to STAC catalog -- result.clear_items() properties = dict( start_datetime=datetimes[0].isoformat(), end_datetime=datetimes[1].isoformat() ) item = Item(str(uuid4()), bbox_to_geometry(bbox), bbox, None, properties) asset = Asset(staged_url, title=filename, media_type=NETCDF4_MIME, roles=['data']) item.add_asset('data', asset) result.add_item(item) return result
def _stage(self, local_filename, remote_filename, mime): """ Stages a local file to either to S3 (utilizing harmony.util.stage) or to the local filesystem by performing a file copy. Staging location is determined by message.stagingLocation or the --harmony-data-location CLI argument override Parameters ---------- local_filename : string A path and filename to the local file that should be staged remote_filename : string The basename to give to the remote file mime : string The mime type to apply to the staged file for use when it is served, e.g. "application/x-netcdf4" Returns ------- url : string A URL to the staged file """ url_components = urlsplit(self.message.stagingLocation) scheme = url_components.scheme if scheme == 'file': dest_path = Path(url_components.path).joinpath(remote_filename) self.logger.info('Staging to local filesystem: \'%s\'', str(dest_path)) copyfile(local_filename, dest_path) return dest_path.as_uri() return stage(local_filename, remote_filename, mime, logger=self.logger, location=self.message.stagingLocation, cfg=self.config )