From the PO.DAAC Cookbook, to access the GitHub version of the notebook, follow this link.

Hydrocron API: SWOT Time Series Examples

Authors: Nikki Tebaldi and Cassandra Nickles, NASA PO.DAAC

Summary:

Hydrocron is an API that repackages the SWOT_L2_HR_RIVERSP_2.0 dataset into csv or geojson formats that make time series analysis easier. This notebook will highlight how to utilize hydrocron and convert its output into a readable geodatabase of data from multiple SWOT reaches identified from the SWORD Database.

Requirements:

Any compute environment, local or the cloud.

Learning Objectives:

  • Obtain a list of SWORD IDs for a region of interest
  • Access SWOT river vector product attributes for multiple reach IDs via the Hydrocron API
  • Convert accessed time series data into readable database
  • Plot one time series variable from multiple reaches

Cite the Hydrocron API via the following:

Frank Greguska, Nikki Tebaldi, Victoria McDonald, Vanesa Gomez Gonzalez, & Cassie Nickles. (2024). podaac/hydrocron: 1.2.0 (1.2.0). Zenodo. https://doi.org/10.5281/zenodo.11176234

Import Packages

import dask
import dask.dataframe as dd
from dask.distributed import Client
import hvplot.dask
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pprint
import requests

import datetime
from io import StringIO

Set up Dask Workers for Parallelization

A note on Dask

We use Dask for the notebook as it aggregates the data into a Dask DataFrame which can then be used in a variety of ways to explore, visualize, and analyze the data and it optimizes these operations by preforming various tasks in parallel. See the Dask Documentation for more information.

We also recognize that it may be a bit complex to use for smaller queries and offer the recommedation below to set the number of workers n_workers to 1 which will run the operations on a single thread instead of parallel.

client = Client(n_workers=4)    # Set to n workers for number of CPUs or parallel processes or set to 1 worker to run things on a single thread
client

Client

Cluster

  • Workers: 4
  • Cores: 16
  • Memory: 34.16 GB

Obtain SWORD reach IDs

In this section, we query the Feature Translation Service (FTS) to give us a list of river IDs from the SWOT River Database (SWORD). One river reach ID has the format CBBBBBRRRRT and a node ID has the format, CBBBBBRRRRNNNT where C stands for continent, B for basin, R for reach, N for node, and T for type. The first 6 digits of the id (CBBBBB) are the HydroBASINS Pfafstetter level 6 basin code, where the first digit represents one of nine continental regions (1 = Africa, 2 = Europe, 3 = Siberia, 4 = Asia, 5 = Oceania, 6 = South America, 7 = North America, 8 = Arctic, 9 = Greenland), and the remaining digits are nested basin levels 2–6. In this example, we search by the beginning of a SWORD ID (basin code). For example, ID 732520 represents multiple reaches along the Savannah River and its tributaries in Georgia, USA.

The query_fts function can query the Feature Translation Service for reach identifiers by river name, reach identifier, HUC, or region. In this notebook, the function is used to query the FTS for reach identifiers associated with the basin identifier, and can be altered to search via river name. You can query the FTS and use a combination of page_size and page_number parameters to retrieve all reach identifier results for a query.

# Assign URLs to Variables for the APIs we use, FTS and Hydrocron
FTS_URL = "https://fts.podaac.earthdata.nasa.gov/v1"  
HYDROCRON_URL = "https://soto.podaac.earthdatacloud.nasa.gov/hydrocron/v1/timeseries"

# BASIN or RIVER to query FTS for
BASIN_IDENTIFIER = "732520" # to search via basin ID, find within SWORD database
RIVER_NAME = "Rhine" # to search via river name
def query_fts(query_url, params):
    """Query Feature Translation Service (FTS) for reach identifers using the query_url parameter.

    Parameters
    ----------
    query_url: str - URL to use to query FTS
    params: dict - Dictionary of parameters to pass to query

    Returns
    -------
    dict of results: hits, page_size, page_number, reach_ids
    """

    reaches = requests.get(query_url, params=params)
    reaches_json = reaches.json()

    hits = reaches_json['hits']
    if 'search on' in reaches_json.keys():
        page_size = reaches_json['search on']['page_size']
        page_number = reaches_json['search on']['page_number']
    else:
        page_size = 0
        page_number = 0

    return {
        "hits": hits,
        "page_size": page_size,
        "page_number": page_number,
        "reach_ids": [ item['reach_id'] for item in reaches_json['results'] ]
    }
# Search by basin code
query_url = f"{FTS_URL}/rivers/reach/{BASIN_IDENTIFIER}"
print(f"Searching by basin ...{query_url}")

# Search by river name
# query_url = f"{FTS_URL}/rivers/{RIVER_NAME}" #if searching via river name instead
# print(f"Searching by river name ...{query_url}")

page_size = 100    # Set FTS to retrieve 100 results at a time
page_number = 1    # Set FTS to retrieve the first page of results
hits = 1           # Set hits to intial value to start while loop
reach_ids = []
while (page_size * page_number) != 0 and len(reach_ids) < hits:
    params = { "page_size": page_size, "page_number": page_number }
    results = query_fts(query_url, params)
    
    hits = results['hits']
    page_size = results['page_size']
    page_number = results['page_number'] + 1
    reach_ids.extend(results['reach_ids'])

    print("page_size: ", page_size, ", page_number: ", page_number - 1, ", hits: ", hits, ", # reach_ids: ", len(reach_ids))
    
print("Total number of reaches: ", len(reach_ids))
reach_ids = list(set(reach_ids))    # Remove duplicates
print("Total number of non-duplicate reaches: ", len(reach_ids))
Searching by basin ...https://fts.podaac.earthdata.nasa.gov/v1/rivers/reach/732520
page_size:  100 , page_number:  1 , hits:  190 , # reach_ids:  100
page_size:  100 , page_number:  2 , hits:  190 , # reach_ids:  190
Total number of reaches:  190
Total number of non-duplicate reaches:  190

Once you have a list of reach identifiers, you can query Hydrocron for SWOT time series data. For now, we will just pull the first 10 reaches we discovered.

reach_ids = reach_ids[:10]
len(reach_ids)
10

Query Hydrocron for time series data

Dask Delayed Function to Query in Parallel

We define a function that will query hydrocron and return a database of the time series data for the reaches identified. This is defined as a Dask delayed function with the @dask.delayed decorator so that we can query reaches in parallel and return a Dask Dataframe to use in our visualizations.

@dask.delayed
def query_hydrocron(query_url, reach_id, start_time, end_time, fields, empty_df):
    """Query Hydrocron for reach-level time series data.

    Parameters
    ----------
    query_url: str - URL to use to query FTS
    reach_id: str - String SWORD reach identifier
    start_time: str - String time to start query
    end_time: str - String time to end query
    fields: list - List of fields to return in query response
    empty_df: pandas.DataFrame that contains empty query results

    Returns
    -------
    pandas.DataFrame that contains query results
    """

    params = {
        "feature": "Reach",
        "feature_id": reach_id,
        "output": "csv",
        "start_time": start_time,
        "end_time": end_time,
        "fields": fields
    }
    results = requests.get(query_url, params=params)
    if "results" in results.json().keys():
        results_csv = results.json()["results"]["csv"]
        df = pd.read_csv(StringIO(results_csv))
    else:
        df = empty_df

    return df

Create the query for Hydrocron!

All parameter options are listed in the Hydrocron documentation: https://podaac.github.io/hydrocron/timeseries.html

%%time
# Create queries that return Pandas.DataFrame objects
start_time = "2023-07-28T00:00:00Z"
end_time = "2024-04-16T00:00:00Z"
fields = "reach_id,time_str,wse"
results = []
for reach in reach_ids:
    # Create an empty dataframe for cases where no data is returned for a reach identifier
    empty_df = pd.DataFrame({
        "reach_id": np.int64(reach),
        "time_str": datetime.datetime(1900, 1, 1).strftime("%Y-%m-%dT%H:%M:%S"),
        "wse": -999999999999.0,
        "wse_units": "m"
    }, index=[0])
    results.append(query_hydrocron(HYDROCRON_URL, reach, start_time, end_time, fields, empty_df))

# Load DataFrame results into dask.dataframe
ddf = dd.from_delayed(results)
ddf.head(n=20, npartitions=len(reach_ids))
CPU times: user 444 ms, sys: 131 ms, total: 575 ms
Wall time: 10.7 s
reach_id time_str wse wse_units
0 73252000433 2024-02-09T04:28:31Z 1.004017e+02 m
1 73252000433 2024-02-09T15:17:26Z 9.985260e+01 m
2 73252000433 2024-03-01T01:13:37Z 1.272824e+02 m
3 73252000433 2024-03-01T12:02:32Z 1.005151e+02 m
4 73252000433 2024-03-21T21:58:40Z 1.103624e+02 m
5 73252000433 2024-04-11T18:43:45Z 1.142618e+02 m
6 73252000433 2024-04-12T05:32:40Z 1.006057e+02 m
0 73252001533 2024-02-09T04:28:41Z 2.006382e+02 m
1 73252001533 2024-02-09T15:17:16Z 2.007256e+02 m
2 73252001533 2024-03-01T01:13:47Z 2.008445e+02 m
3 73252001533 2024-03-01T12:02:22Z 2.016775e+02 m
4 73252001533 2024-03-21T21:58:50Z 2.009672e+02 m
5 73252001533 2024-04-11T18:43:55Z 2.008128e+02 m
6 73252001533 2024-04-12T05:32:30Z 2.008465e+02 m
0 73252001404 1900-01-01T00:00:00 -1.000000e+12 m
0 73252001434 2024-01-30T06:06:26Z -1.000000e+12 m
1 73252001434 no_data -1.000000e+12 m
2 73252001434 no_data -1.000000e+12 m
3 73252001434 2024-02-20T02:51:31Z -1.000000e+12 m
4 73252001434 no_data -1.000000e+12 m
# Remove fill values for missing observations
ddf = ddf.loc[(ddf["wse"] != -999999999999.0)]

# Convert time_str to datetime format
ddf.time_str = dd.to_datetime(ddf.time_str)

ddf.head(n=20, npartitions=len(reach_ids))
reach_id time_str wse wse_units
0 73252000433 2024-02-09 04:28:31+00:00 100.4017 m
1 73252000433 2024-02-09 15:17:26+00:00 99.8526 m
2 73252000433 2024-03-01 01:13:37+00:00 127.2824 m
3 73252000433 2024-03-01 12:02:32+00:00 100.5151 m
4 73252000433 2024-03-21 21:58:40+00:00 110.3624 m
5 73252000433 2024-04-11 18:43:45+00:00 114.2618 m
6 73252000433 2024-04-12 05:32:40+00:00 100.6057 m
0 73252001533 2024-02-09 04:28:41+00:00 200.6382 m
1 73252001533 2024-02-09 15:17:16+00:00 200.7256 m
2 73252001533 2024-03-01 01:13:47+00:00 200.8445 m
3 73252001533 2024-03-01 12:02:22+00:00 201.6775 m
4 73252001533 2024-03-21 21:58:50+00:00 200.9672 m
5 73252001533 2024-04-11 18:43:55+00:00 200.8128 m
6 73252001533 2024-04-12 05:32:30+00:00 200.8465 m
0 73252001695 2024-01-29 06:05:14+00:00 -0.2095 m
3 73252001695 2024-02-19 02:50:19+00:00 -0.1801 m
4 73252001695 2024-02-19 13:40:15+00:00 0.1047 m
6 73252001695 2024-03-10 23:35:22+00:00 -0.0288 m
7 73252001695 2024-03-11 10:25:17+00:00 0.2217 m
9 73252001695 2024-03-31 20:20:26+00:00 -0.0407 m
# Plot results
line_plot = ddf.hvplot(x="time_str", y="wse", by="reach_id", kind="line", persist=True)
line_plot.opts(xrotation=90)

scatter_plot = ddf.hvplot(x="time_str", y="wse", by="reach_id", kind="scatter", persist=True)
line_plot * scatter_plot
client.close()