PO.DAAC, Jet Propulsion Laboratory, California Institution of Technology
Author: Ayush Nag
Background
Kerchunk is a library that provides a unified way to represent a variety of chunked, compressed data formats (e.g. NetCDF/HDF5, GRIB2, TIFF, …), allowing efficient access to the data from traditional file systems or cloud object storage. It also provides a flexible way to create virtual datasets from multiple files. It does this by extracting the byte ranges, compression information and other information about the data and storing this metadata in a new, separate object. This means that you can create a virtual aggregate dataset over potentially many source files, for efficient, parallel and cloud-friendly in-situ access without having to copy or translate the originals. It is a gateway to in-the-cloud massive data processing for legacy data archival formats. (description from Kerchunk documentation)
## Objectives - Generate a Kerchunk JSON file for a PODAAC SWOT Collection - Create individual JSON’s for each netCDF. Then, combine into one file using MultiZarrToZarr. - Allows data to be read from PODAAC s3 as a Zarr combined data store - output file: SWOT_SIMULATED_L2_KARIN_SSH_GLORYS_SCIENCE_V1_kerchunk_DEMO.json
import osimport reimport s3fsimport globimport zarrimport daskimport ujsonimport fsspecimport requestsimport numpy as npimport xarray as xrimport xml.etree.ElementTree as ETfrom tqdm.notebook import tqdmfrom dask import delayed, computefrom dask.distributed import Client# from kerchunk.combine import auto_daskfrom kerchunk.hdf import SingleHdf5ToZarrfrom kerchunk.combine import MultiZarrToZarr
Start Dask cluster
Data can be read via s3 in parallel and used by Kerchunk
NOTE: Performance is greatly improved by more CPU’s/threads. 4 hours can become 1 hour, 90 mins can become 20 mins, etc.
For over 10,000 granules at least 4 threads is recommended
CPU times: user 205 ms, sys: 31.1 ms, total: 236 ms
Wall time: 4.98 s
Option 1: Get list of remote netCDF files by collection (s3 endpoints)
%%times3 = s3fs.S3FileSystem(anon=False, key=creds["accessKeyId"], secret=creds["secretAccessKey"], token=creds["sessionToken"])s3path =f"s3://podaac-ops-cumulus-protected/{collection}/*.nc"remote_urls = s3.glob(s3path)remote_urls = ['s3://'+ f for f in remote_urls]print(len(remote_urls))
17564
CPU times: user 6.94 s, sys: 214 ms, total: 7.15 s
Wall time: 9.96 s
Option 2: Find and filter s3 urls using NASA CMR Search
If you would like a certain cycle number, datetime, etc. You can choose the granules to cloud optimize using the powerful CMR search tool
## Large amount of s3 netCDF’s to individual JSON’s (dask) - The PODAAC SWOT_SIMULATED_L2_KARIN_SSH_GLORYS_SCIENCE_V1collection has 17564 granules! - Recommended to test with a smaller dataset such as remote_files[:20] - Single threaded - 625 granules = ~25 mins - 13680 granules = ~540 mins - 4 threads/workers - 625 granules = ~8 mins - 13680 granules = ~180 mins
def remaining_jsons(remote_urls: list, directory: str):""" Find set difference between remote_urls and JSONS already made in directory. Extracts granule name from remote url path and JSON file name """# Extract granule names from s3 endpoints list and jsons list remote_granules = np.asarray([path.split('/')[-1] for path in remote_urls]) done_granules = np.asarray([path[:-5] for path in os.listdir(directory)])# Find remaining files (set difference: s3 endpoints - json directory) remaining_indices = np.where(~np.isin(remote_granules, done_granules))[0] new_remote_urls = [remote_urls[idx] for idx in remaining_indices]print(f"{len(remote_urls)}/{len(new_remote_urls)} files already done")return remote_urls
%%timeout_dir =f"{collection}_kerchunk_DEMO"ifnot os.path.exists(out_dir): os.makedirs(out_dir)# If session expired, only re-run on remote_urls we don't have a JSON forremaining_urls = remaining_jsons(remote_urls, f"{collection}_kerchunk_DEMO")@dask.delayeddef gen_json(u: str):"Generate JSON reference file for one netCDF"with fsspec.open(u, mode="rb", anon=False, key=creds['accessKeyId'], secret=creds['secretAccessKey'], token=creds["sessionToken"]) as inf: single = SingleHdf5ToZarr(inf, u, inline_threshold=0)# Extract granule name from s3 URL granule = re.search(r'[^/]+$', u).group(0) full_filename =f'{out_dir}/{granule}.json'# Generate single kerchunk reference s = single.translate()iflen(s) ==0: warnings.warn(f"{granule} JSON generation failed")# Write JSON to folderwithopen(full_filename, 'w') as f: ujson.dump(s, f)return full_filename# Define delayed list of jsons jsons = [dask.delayed(gen_json)(file) forfilein remaining_urls]# Run kerchunk single netcdf to JSON conversion in parallelfull_filenames = dask.compute(jsons)
1066/1368 files remaining
CPU times: user 3min 33s, sys: 22.4 s, total: 3min 55s
Wall time: 19min 36s
Create combined kerchunk
%%timeout_dir =f"{collection}_kerchunk_DEMO"json_files = os.listdir(out_dir)json_files = [f"{out_dir}/{f}"for f in json_files if f.endswith(".json")]mzz = MultiZarrToZarr(json_files, remote_protocol='s3', remote_options={"anon": False, "key": creds['accessKeyId'], "secret": creds['secretAccessKey'], "token": creds["sessionToken"]}, coo_map={"cycle_num": "attr:cycle_number", "pass_num": "attr:pass_number"}, concat_dims=["cycle_num", "pass_num"])out = mzz.translate()
CPU times: user 1min 50s, sys: 14.3 s, total: 2min 4s
Wall time: 15min 47s
The args passed to SingleHdf5ToZarr and MultiZarrToZarr define how the data is read and concatenated. These parameters will need to be modified depending on how you want to concat the datasets (time, cycles). The documentation for those functions is linked above. Linked in this cell is more resources for understanding and how to use Kerchunk.
%%timejson_file =f'{collection}_kerchunk_DEMO.json'withopen(json_file, 'wb') as f: f.write(ujson.dumps(out).encode())
CPU times: user 141 ms, sys: 44.4 ms, total: 185 ms
Wall time: 181 ms
Model estimate of the effect on sea surface topography due to high frequency air pressure and wind effects and the low-frequency height from inverted barometer effect (inv_bar_cor). This value is subtracted from the ssh_karin and ssh_karin_2 to compute ssha_karin and ssha_karin_2, respectively. Use only one of inv_bar_cor and dac.
Geoid height above the reference ellipsoid with a correction to refer the value to the mean tide system, i.e. includes the permanent tide (zero frequency).
Height correction from KaRIn crossover calibration. To apply this correction the value of height_cor_xover should be added to the value of ssh_karin, ssh_karin_2, ssha_karin, and ssha_karin_2.
Estimate of static effect of atmospheric pressure on sea surface height. Above average pressure lowers sea surface height. Computed by interpolating ECMWF pressure fields in space and time. The value is included in dac. To apply, add dac to ssha_karin and ssha_karin_2 and subtract inv_bar_cor.
long_name :
static inverse barometer effect on sea surface height
Equivalent vertical correction due to ionosphere delay. The reported sea surface height, latitude and longitude are computed after adding negative media corrections to uncorrected range along slant-range paths, accounting for the differential delay between the two KaRIn antennas. The equivalent vertical correction is computed by applying obliquity factors to the slant-path correction. Adding the reported correction to the reported sea surface height results in the uncorrected sea surface height.
Latitude of measurement [-80,80]. Positive latitude is North latitude, negative latitude is South latitude. This value may be biased away from a nominal grid location if some of the native, unsmoothed samples were discarded during processing.
long_name :
weighted average latitude of samples used to compute SSH
Geocentric load tide height. The effect of the ocean tide loading of the Earth's crust. This value has already been added to the corresponding ocean tide height value (ocean_tide_fes).
Geocentric load tide height. The effect of the ocean tide loading of the Earth's crust. This value has already been added to the corresponding ocean tide height value (ocean_tide_got).
Longitude of measurement. East longitude relative to Greenwich meridian. This value may be biased away from a nominal grid location if some of the native, unsmoothed samples were discarded during processing.
long_name :
weighted average longitude of samples used to compute SSH
Mean sea surface height above the reference ellipsoid. The value is referenced to the mean tide system, i.e. includes the permanent tide (zero frequency).
Mean sea surface height above the reference ellipsoid. The value is referenced to the mean tide system, i.e. includes the permanent tide (zero frequency).
Equivalent vertical correction due to dry troposphere delay. The reported sea surface height, latitude and longitude are computed after adding negative media corrections to uncorrected range along slant-range paths, accounting for the differential delay between the two KaRIn antennas. The equivalent vertical correction is computed by applying obliquity factors to the slant-path correction. Adding the reported correction to the reported sea surface height results in the uncorrected sea surface height.
institution :
ECMWF
long_name :
dry troposphere vertical correction
source :
European Centre for Medium-Range Weather Forecasts
Equivalent vertical correction due to wet troposphere delay from weather model data. The reported pixel height, latitude and longitude are computed after adding negative media corrections to uncorrected range along slant-range paths, accounting for the differential delay between the two KaRIn antennas. The equivalent vertical correction is computed by applying obliquity factors to the slant-path correction. Adding the reported correction to the reported sea surface height (ssh_karin_2) results in the uncorrected sea surface height.
institution :
ECMWF
long_name :
wet troposphere vertical correction from weather model data
source :
European Centre for Medium-Range Weather Forecasts
Equilibrium long-period ocean tide height. This value has already been added to the corresponding ocean tide height values (ocean_tide_fes and ocean_tide_got).
Geocentric ocean tide height. Includes the sum total of the ocean tide, the corresponding load tide (load_tide_fes) and equilibrium long-period ocean tide height (ocean_tide_eq).
Geocentric ocean tide height. Includes the sum total of the ocean tide, the corresponding load tide (load_tide_got) and equilibrium long-period ocean tide height (ocean_tide_eq).
Non-equilibrium long-period ocean tide height. This value is reported as a relative displacement with repsect to ocean_tide_eq. This value can be added to ocean_tide_eq, ocean_tide_fes, or ocean_tide_got, or subtracted from ssha_karin and ssha_karin_2, to account for the total long-period ocean tides from equilibrium and non-equilibrium contributions.
Geocentric pole tide height. The total of the contribution from the solid-Earth (body) pole tide height, the ocean pole tide height, and the load pole tide height (i.e., the effect of the ocean pole tide loading of the Earth's crust).
Flag indicating the validity and type of processing applied to generate the wet troposphere correction (rad_wet_tropo_cor). A value of 0 indicates that open ocean processing is used, a value of 1 indicates coastal processing, and a value of 2 indicates that rad_wet_tropo_cor is invalid due to land contamination.
Equivalent vertical correction due to wet troposphere delay from radiometer measurements. The reported pixel height, latitude and longitude are computed after adding negative media corrections to uncorrected range along slant-range paths, accounting for the differential delay between the two KaRIn antennas. The equivalent vertical correction is computed by applying obliquity factors to the slant-path correction. Adding the reported correction to the reported sea surface height (ssh_karin) results in the uncorrected sea surface height.
long_name :
wet troposphere vertical correction from radiometer data
KMSF attitude yaw angle relative to the nadir track. The yaw angle is a right-handed rotation about the nadir (downward) direction. A yaw value of 0 deg indicates that the KMSF +x axis is aligned with the horizontal component of the Earth-relative velocity vector. A yaw value of 180 deg indicates that the spacecraft is in a yaw-flipped state, with the KMSF -x axis aligned with the horizontal component of the Earth-relative velocity vector.
Sea state bias correction to ssh_karin. Adding the reported correction to the reported sea surface height results in the uncorrected sea surface height. The wind_speed_karin value is used to compute this quantity.
Sea state bias correction to ssh_karin_2. Adding the reported correction to the reported sea surface height results in the uncorrected sea surface height. The wind_speed_karin_2 value is used to compute this quantity.
Atmospheric correction to sigma0 from weather model data as a linear power multiplier (not decibels). sig0_cor_atmos_model is already applied in computing sig0_karin_2.
institution :
ECMWF
long_name :
two-way atmospheric correction to sigma0 from model
source :
European Centre for Medium-Range Weather Forecasts
Atmospheric correction to sigma0 from radiometer data as a linear power multiplier (not decibels). sig0_cor_atmos_rad is already applied in computing sig0_karin.
long_name :
two-way atmospheric correction to sigma0 from radiometer data
Normalized radar cross section (sigma0) from KaRIn in real, linear units (not decibels). The value may be negative due to noise subtraction. The value is corrected for instrument calibration and atmospheric attenuation. Radiometer measurements provide the atmospheric attenuation (sig0_cor_atmos_rad).
long_name :
normalized radar cross section (sigma0) from KaRIn
Normalized radar cross section (sigma0) from KaRIn in real, linear units (not decibels). The value may be negative due to noise subtraction. The value is corrected for instrument calibration and atmospheric attenuation. A meteorological model provides the atmospheric attenuation (sig0_cor_atmos_model).
long_name :
normalized radar cross section (sigma0) from KaRIn
Fully corrected sea surface height measured by KaRIn. The height is relative to the reference ellipsoid defined in the global attributes. This value is computed using radiometer measurements for wet troposphere effects on the KaRIn measurement (e.g., rad_wet_tropo_cor and sea_state_bias_cor).
Fully corrected sea surface height measured by KaRIn. The height is relative to the reference ellipsoid defined in the global attributes. This value is computed using model-based estimates for wet troposphere effects on the KaRIn measurement (e.g., model_wet_tropo_cor and sea_state_bias_cor_2).
Time of measurement in seconds in the UTC time scale since 1 Jan 2000 00:00:00 UTC. [tai_utc_difference] is the difference between TAI and UTC reference time (seconds) for the first measurement of the data set. If a leap second occurs within the data set, the attribute leap_second is set to the UTC time at which the leap second occurs.
Time of measurement in seconds in the TAI time scale since 1 Jan 2000 00:00:00 TAI. This time scale contains no leap seconds. The difference (in seconds) with time in UTC is given by the attribute [time:tai_utc_difference].
Angle with respect to true north of the horizontal component of the spacecraft Earth-relative velocity vector. A value of 90 deg indicates that the spacecraft velocity vector pointed due east. Values between 0 and 90 deg indicate that the velocity vector has a northward component, and values between 90 and 180 deg indicate that the velocity vector has a southward component.
long_name :
heading of the spacecraft Earth-relative velocity vector
Gaultier, L., C. Ubelmann, and L.-L. Fu, 2016: The Challenge of Using Future SWOT Data for Oceanic Field Reconstruction. J. Atmos. Oceanic Technol., 33, 119-126, doi:10.1175/jtech-d-15-0160.1. http://dx.doi.org/10.1175/JTECH-D-15-0160.1.
right_first_latitude :
-78.29203183241484
right_first_longitude :
122.70935482261133
right_last_latitude :
77.03284214129418
right_last_longitude :
289.6585533138908
source :
Simulate product
time_coverage_end :
2014-07-24T10:18:18.533147Z
time_coverage_start :
2014-07-24T09:26:52.109265Z
title :
Level 2 Low Rate Sea Surface Height Data Product - Expert SSH with Wind and Wave
wavelength :
0.008385803020979
Alternate: Large amount of s3 netCDF’s to combined kerchunk JSON (auto_dask)
%%time# Create combined kerchunk/zarr reference. Reads cycle_number and pass_number from attributes of each netCDF# Concats along new dimensions cycle_number and pass_numberout_dir =f"{collection}_kerchunk_DEMO"json_files = os.listdir(out_dir)json_files = [f"{out_dir}/{f}"for f in json_files if f.endswith(".json")]auto_dask(urls=json_files, n_batches=4, single_driver=JustLoad, single_kwargs={}, mzz_kwargs={"coo_map": {"cycle_num": "attr:cycle_number", "pass_num": "attr:pass_number"}, "concat_dims": ["cycle_num", "pass_num"]}, remote_protocol="s3", remote_options={"anon": False, "key": creds['accessKeyId'], "secret": creds['secretAccessKey'], "token": creds["sessionToken"]}, filename=f'{collection}_kerchunk_CYCLE_3.json' )
Appendix/Extras
Find granules with matching dimension (SWOT_SIMULATED granules are uneven dimensions: num_lines could be 9864, 9865, 9866, etc.)
New: Read DMR++ metadata paired with each netCDF for dimensions
Old: Open each dataset with xarray and check dimensions
Loop/auto-dask implementatations of creating Kerchunks
# Define a function to check the condition and return remote_files if condition is metdef is_match(f): ds = xr.open_dataset(f, engine="h5netcdf")if ds['simulated_true_ssh_karin'].encoding['chunksizes'] == (9866, 71):return f.pathreturnNone# Create a Dask bag from the filesetfileset_bag = dask.bag.from_sequence(fileset)# Use Dask to parallelize the processing and filter matcheswith dask.diagnostics.ProgressBar(): matches_bag = fileset_bag.map(is_match).filter(lambda x: x isnotNone) matches = matches_bag.compute()print(f"{len(matches)} matches found")
file=open('SWOT_SIMULATED_L2_KARIN_SSH_GLORYS_SCIENCE_V1_9866_paths.txt','w')for item in tqdm(flist):file.write(item +"\n")file.close()
Read file
f =open("SWOT_SIMULATED_L2_KARIN_SSH_GLORYS_SCIENCE_V1_9866_same_passes_paths.txt", "r")remote_urls = f.read().splitlines()# remote_urls = remote_urls.split("\n")# remote_urls.pop() # remove extra '' for EOFf.close()print(len(remote_urls))remote_urls[0]
CPU times: user 1.35 s, sys: 139 ms, total: 1.49 s
Wall time: 8.25 s
s3 NetCDF’s to Kerchunk (loop)
%%timesingles = []for i, u inenumerate(tqdm(remote_urls[:20])):with fsspec.open(u, mode="rb", anon=False, key=creds['accessKeyId'], secret=creds['secretAccessKey'], token=creds["sessionToken"]) as inf: single = SingleHdf5ToZarr(inf, u, inline_threshold=0) filename = re.sub(r'.*/', '', u) singles.append(single.translate())
## kerchunk auto_dask definition - Note: this is built in to kerchunk as of v0.1.0 using from kerchunk.combine import auto_dask
# Author: Martin Durant# https://fsspec.github.io/kerchunk/_modules/kerchunk/combine.html#auto_daskfrom typing import Listdef auto_dask( urls: List[str], single_driver: str, single_kwargs: dict, mzz_kwargs: dict, n_batches: int, remote_protocol=None, remote_options=None, filename=None, output_options=None,):"""Batched tree combine using dask. If you wish to run on a distributed cluster (recommended), create a client before calling this function. Parameters ---------- urls: list[str] input dataset URLs single_driver: class class with ``translate()`` method single_kwargs: to pass to single-input driver mzz_kwargs: passed to ``MultiZarrToZarr`` for each batch n_batches: int Number of MZZ instances in the first combine stage. Maybe set equal to the number of dask workers, or a multple thereof. remote_protocol: str | None remote_options: dict To fsspec for opening the remote files filename: str | None Ouput filename, if writing output_options If ``filename`` is not None, open it with these options Returns ------- reference set """import dask# make delayed functions single_task = dask.delayed(lambda x: single_driver(x, **single_kwargs).translate()) post = mzz_kwargs.pop("postprocess", None) inline = mzz_kwargs.pop("inline_threshold", None)# TODO: if single files produce list of reference sets (e.g., grib2) batch_task = dask.delayed(lambda u, x: MultiZarrToZarr( u, indicts=x, remote_protocol=remote_protocol, remote_options=remote_options,**mzz_kwargs, ).translate() )# sort out kwargs dims = mzz_kwargs.get("concat_dims", []) dims += [k for k in mzz_kwargs.get("coo_map", []) if k notin dims] kwargs = {"concat_dims": dims}if post: kwargs["postprocess"] = postif inline: kwargs["inline_threshold"] = inlinefor field in ["remote_protocol", "remote_options", "coo_dtypes", "identical_dims"]:if field in mzz_kwargs: kwargs[field] = mzz_kwargs[field] final_task = dask.delayed(lambda x: MultiZarrToZarr( x, remote_options=remote_options, remote_protocol=remote_protocol, **kwargs ).translate(filename, output_options) )# make delayed calls tasks = [single_task(u) for u in urls] tasks_per_batch =-(-len(tasks) // n_batches) tasks2 = []for batch inrange(n_batches): in_tasks = tasks[batch * tasks_per_batch : (batch +1) * tasks_per_batch] u = urls[batch * tasks_per_batch : (batch +1) * tasks_per_batch]# if in_tasks:# skip if on last iteration and no remaining tasks# tasks2.append(batch_task(u, in_tasks))return dask.compute(final_task(tasks2))[0]class JustLoad:"""For auto_dask, in the case that single file references already exist"""def__init__(self, url, storage_options=None):self.url = urlself.storage_options = storage_options or {}def translate(self):with fsspec.open(self.url, mode="rt", **self.storage_options) as f:return ujson.load(f)