import dask
import dask.dataframe as dd
from dask.distributed import Client
import hvplot.dask
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pprint
import requests
import datetime
from io import StringIO
From the PO.DAAC Cookbook, to access the GitHub version of the notebook, follow this link.
Hydrocron API: SWOT Time Series Examples
Summary:
Hydrocron is an API that repackages the SWOT_L2_HR_RIVERSP_2.0
dataset into csv or geojson formats that make time series analysis easier. This notebook will highlight how to utilize hydrocron and convert its output into a readable geodatabase of data from multiple SWOT reaches identified from the SWORD Database.
Requirements:
Any compute environment, local or the cloud.
Learning Objectives:
- Obtain a list of SWORD IDs for a region of interest
- Access SWOT river vector product attributes for multiple reach IDs via the Hydrocron API
- Convert accessed time series data into readable database
- Plot one time series variable from multiple reaches
Cite the Hydrocron API via the following:
Frank Greguska, Nikki Tebaldi, Victoria McDonald, Vanesa Gomez Gonzalez, & Cassie Nickles. (2024). podaac/hydrocron: 1.2.0 (1.2.0). Zenodo. https://doi.org/10.5281/zenodo.11176234
Import Packages
Set up Dask Workers for Parallelization
A note on Dask
We use Dask for the notebook as it aggregates the data into a Dask DataFrame which can then be used in a variety of ways to explore, visualize, and analyze the data and it optimizes these operations by preforming various tasks in parallel. See the Dask Documentation for more information.
We also recognize that it may be a bit complex to use for smaller queries and offer the recommedation below to set the number of workers n_workers
to 1 which will run the operations on a single thread instead of parallel.
= Client(n_workers=4) # Set to n workers for number of CPUs or parallel processes or set to 1 worker to run things on a single thread
client client
Client
|
Cluster
|
Obtain SWORD reach IDs
In this section, we query the Feature Translation Service (FTS) to give us a list of river IDs from the SWOT River Database (SWORD). One river reach ID has the format CBBBBBRRRRT
and a node ID has the format, CBBBBBRRRRNNNT
where C
stands for continent, B
for basin, R
for reach, N
for node, and T
for type. The first 6 digits of the id (CBBBBB) are the HydroBASINS Pfafstetter level 6 basin code, where the first digit represents one of nine continental regions (1 = Africa, 2 = Europe, 3 = Siberia, 4 = Asia, 5 = Oceania, 6 = South America, 7 = North America, 8 = Arctic, 9 = Greenland), and the remaining digits are nested basin levels 2–6. In this example, we search by the beginning of a SWORD ID (basin code). For example, ID 732520
represents multiple reaches along the Savannah River and its tributaries in Georgia, USA.
The query_fts
function can query the Feature Translation Service for reach identifiers by river name, reach identifier, HUC, or region. In this notebook, the function is used to query the FTS for reach identifiers associated with the basin identifier, and can be altered to search via river name. You can query the FTS and use a combination of page_size
and page_number
parameters to retrieve all reach identifier results for a query.
# Assign URLs to Variables for the APIs we use, FTS and Hydrocron
= "https://fts.podaac.earthdata.nasa.gov/v1"
FTS_URL = "https://soto.podaac.earthdatacloud.nasa.gov/hydrocron/v1/timeseries"
HYDROCRON_URL
# BASIN or RIVER to query FTS for
= "732520" # to search via basin ID, find within SWORD database
BASIN_IDENTIFIER = "Rhine" # to search via river name RIVER_NAME
def query_fts(query_url, params):
"""Query Feature Translation Service (FTS) for reach identifers using the query_url parameter.
Parameters
----------
query_url: str - URL to use to query FTS
params: dict - Dictionary of parameters to pass to query
Returns
-------
dict of results: hits, page_size, page_number, reach_ids
"""
= requests.get(query_url, params=params)
reaches = reaches.json()
reaches_json
= reaches_json['hits']
hits if 'search on' in reaches_json.keys():
= reaches_json['search on']['page_size']
page_size = reaches_json['search on']['page_number']
page_number else:
= 0
page_size = 0
page_number
return {
"hits": hits,
"page_size": page_size,
"page_number": page_number,
"reach_ids": [ item['reach_id'] for item in reaches_json['results'] ]
}
# Search by basin code
= f"{FTS_URL}/rivers/reach/{BASIN_IDENTIFIER}"
query_url print(f"Searching by basin ...{query_url}")
# Search by river name
# query_url = f"{FTS_URL}/rivers/{RIVER_NAME}" #if searching via river name instead
# print(f"Searching by river name ...{query_url}")
= 100 # Set FTS to retrieve 100 results at a time
page_size = 1 # Set FTS to retrieve the first page of results
page_number = 1 # Set hits to intial value to start while loop
hits = []
reach_ids while (page_size * page_number) != 0 and len(reach_ids) < hits:
= { "page_size": page_size, "page_number": page_number }
params = query_fts(query_url, params)
results
= results['hits']
hits = results['page_size']
page_size = results['page_number'] + 1
page_number 'reach_ids'])
reach_ids.extend(results[
print("page_size: ", page_size, ", page_number: ", page_number - 1, ", hits: ", hits, ", # reach_ids: ", len(reach_ids))
print("Total number of reaches: ", len(reach_ids))
= list(set(reach_ids)) # Remove duplicates
reach_ids print("Total number of non-duplicate reaches: ", len(reach_ids))
Searching by basin ...https://fts.podaac.earthdata.nasa.gov/v1/rivers/reach/732520
page_size: 100 , page_number: 1 , hits: 190 , # reach_ids: 100
page_size: 100 , page_number: 2 , hits: 190 , # reach_ids: 190
Total number of reaches: 190
Total number of non-duplicate reaches: 190
Once you have a list of reach identifiers, you can query Hydrocron for SWOT time series data. For now, we will just pull the first 10 reaches we discovered.
= reach_ids[:10]
reach_ids len(reach_ids)
10
Query Hydrocron for time series data
Dask Delayed Function to Query in Parallel
We define a function that will query hydrocron and return a database of the time series data for the reaches identified. This is defined as a Dask delayed function with the @dask.delayed
decorator so that we can query reaches in parallel and return a Dask Dataframe to use in our visualizations.
@dask.delayed
def query_hydrocron(query_url, reach_id, start_time, end_time, fields, empty_df):
"""Query Hydrocron for reach-level time series data.
Parameters
----------
query_url: str - URL to use to query FTS
reach_id: str - String SWORD reach identifier
start_time: str - String time to start query
end_time: str - String time to end query
fields: list - List of fields to return in query response
empty_df: pandas.DataFrame that contains empty query results
Returns
-------
pandas.DataFrame that contains query results
"""
= {
params "feature": "Reach",
"feature_id": reach_id,
"output": "csv",
"start_time": start_time,
"end_time": end_time,
"fields": fields
}= requests.get(query_url, params=params)
results if "results" in results.json().keys():
= results.json()["results"]["csv"]
results_csv = pd.read_csv(StringIO(results_csv))
df else:
= empty_df
df
return df
Create the query for Hydrocron!
All parameter options are listed in the Hydrocron documentation: https://podaac.github.io/hydrocron/timeseries.html
%%time
# Create queries that return Pandas.DataFrame objects
= "2023-07-28T00:00:00Z"
start_time = "2024-04-16T00:00:00Z"
end_time = "reach_id,time_str,wse"
fields = []
results for reach in reach_ids:
# Create an empty dataframe for cases where no data is returned for a reach identifier
= pd.DataFrame({
empty_df "reach_id": np.int64(reach),
"time_str": datetime.datetime(1900, 1, 1).strftime("%Y-%m-%dT%H:%M:%S"),
"wse": -999999999999.0,
"wse_units": "m"
=[0])
}, index
results.append(query_hydrocron(HYDROCRON_URL, reach, start_time, end_time, fields, empty_df))
# Load DataFrame results into dask.dataframe
= dd.from_delayed(results)
ddf =20, npartitions=len(reach_ids)) ddf.head(n
CPU times: user 444 ms, sys: 131 ms, total: 575 ms
Wall time: 10.7 s
reach_id | time_str | wse | wse_units | |
---|---|---|---|---|
0 | 73252000433 | 2024-02-09T04:28:31Z | 1.004017e+02 | m |
1 | 73252000433 | 2024-02-09T15:17:26Z | 9.985260e+01 | m |
2 | 73252000433 | 2024-03-01T01:13:37Z | 1.272824e+02 | m |
3 | 73252000433 | 2024-03-01T12:02:32Z | 1.005151e+02 | m |
4 | 73252000433 | 2024-03-21T21:58:40Z | 1.103624e+02 | m |
5 | 73252000433 | 2024-04-11T18:43:45Z | 1.142618e+02 | m |
6 | 73252000433 | 2024-04-12T05:32:40Z | 1.006057e+02 | m |
0 | 73252001533 | 2024-02-09T04:28:41Z | 2.006382e+02 | m |
1 | 73252001533 | 2024-02-09T15:17:16Z | 2.007256e+02 | m |
2 | 73252001533 | 2024-03-01T01:13:47Z | 2.008445e+02 | m |
3 | 73252001533 | 2024-03-01T12:02:22Z | 2.016775e+02 | m |
4 | 73252001533 | 2024-03-21T21:58:50Z | 2.009672e+02 | m |
5 | 73252001533 | 2024-04-11T18:43:55Z | 2.008128e+02 | m |
6 | 73252001533 | 2024-04-12T05:32:30Z | 2.008465e+02 | m |
0 | 73252001404 | 1900-01-01T00:00:00 | -1.000000e+12 | m |
0 | 73252001434 | 2024-01-30T06:06:26Z | -1.000000e+12 | m |
1 | 73252001434 | no_data | -1.000000e+12 | m |
2 | 73252001434 | no_data | -1.000000e+12 | m |
3 | 73252001434 | 2024-02-20T02:51:31Z | -1.000000e+12 | m |
4 | 73252001434 | no_data | -1.000000e+12 | m |
# Remove fill values for missing observations
= ddf.loc[(ddf["wse"] != -999999999999.0)]
ddf
# Convert time_str to datetime format
= dd.to_datetime(ddf.time_str)
ddf.time_str
=20, npartitions=len(reach_ids)) ddf.head(n
reach_id | time_str | wse | wse_units | |
---|---|---|---|---|
0 | 73252000433 | 2024-02-09 04:28:31+00:00 | 100.4017 | m |
1 | 73252000433 | 2024-02-09 15:17:26+00:00 | 99.8526 | m |
2 | 73252000433 | 2024-03-01 01:13:37+00:00 | 127.2824 | m |
3 | 73252000433 | 2024-03-01 12:02:32+00:00 | 100.5151 | m |
4 | 73252000433 | 2024-03-21 21:58:40+00:00 | 110.3624 | m |
5 | 73252000433 | 2024-04-11 18:43:45+00:00 | 114.2618 | m |
6 | 73252000433 | 2024-04-12 05:32:40+00:00 | 100.6057 | m |
0 | 73252001533 | 2024-02-09 04:28:41+00:00 | 200.6382 | m |
1 | 73252001533 | 2024-02-09 15:17:16+00:00 | 200.7256 | m |
2 | 73252001533 | 2024-03-01 01:13:47+00:00 | 200.8445 | m |
3 | 73252001533 | 2024-03-01 12:02:22+00:00 | 201.6775 | m |
4 | 73252001533 | 2024-03-21 21:58:50+00:00 | 200.9672 | m |
5 | 73252001533 | 2024-04-11 18:43:55+00:00 | 200.8128 | m |
6 | 73252001533 | 2024-04-12 05:32:30+00:00 | 200.8465 | m |
0 | 73252001695 | 2024-01-29 06:05:14+00:00 | -0.2095 | m |
3 | 73252001695 | 2024-02-19 02:50:19+00:00 | -0.1801 | m |
4 | 73252001695 | 2024-02-19 13:40:15+00:00 | 0.1047 | m |
6 | 73252001695 | 2024-03-10 23:35:22+00:00 | -0.0288 | m |
7 | 73252001695 | 2024-03-11 10:25:17+00:00 | 0.2217 | m |
9 | 73252001695 | 2024-03-31 20:20:26+00:00 | -0.0407 | m |
# Plot results
= ddf.hvplot(x="time_str", y="wse", by="reach_id", kind="line", persist=True)
line_plot =90)
line_plot.opts(xrotation
= ddf.hvplot(x="time_str", y="wse", by="reach_id", kind="scatter", persist=True)
scatter_plot * scatter_plot line_plot
client.close()