Parallel Processing of Feature Detection with dask#
This notebook demonstrates how to run tobac feature detection in parallel using the dask library as the parallel processor.
Imports and Dask Cluster Setup#
[1]:
%matplotlib inline
[2]:
import tobac
import dask.bag as db
import xarray as xr
import s3fs
There are many different ways to initialize a dask cluster. This is just one example, running two workers on a single local machine.
[3]:
from dask.distributed import Client, progress
client = Client(n_workers=2, threads_per_worker=1)
client
[3]:
Client
Client-a81ef1fe-0073-11f1-88af-5ce91e89f225
| Connection method: Cluster object | Cluster type: distributed.LocalCluster |
| Dashboard: http://127.0.0.1:8787/status |
Cluster Info
LocalCluster
47c93e83
| Dashboard: http://127.0.0.1:8787/status | Workers: 2 |
| Total threads: 2 | Total memory: 64.00 GiB |
| Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-97d48457-ae1a-4c58-ae75-b2ce78a333fb
| Comm: tcp://127.0.0.1:58618 | Workers: 0 |
| Dashboard: http://127.0.0.1:8787/status | Total threads: 0 |
| Started: Just now | Total memory: 0 B |
Workers
Worker: 0
| Comm: tcp://127.0.0.1:58634 | Total threads: 1 |
| Dashboard: http://127.0.0.1:58635/status | Memory: 32.00 GiB |
| Nanny: tcp://127.0.0.1:58621 | |
| Local directory: /var/folders/bj/m6g82c6n41g83y3_dx02y7ch0000gp/T/dask-scratch-space/worker-xpiuck_r | |
Worker: 1
| Comm: tcp://127.0.0.1:58633 | Total threads: 1 |
| Dashboard: http://127.0.0.1:58636/status | Memory: 32.00 GiB |
| Nanny: tcp://127.0.0.1:58623 | |
| Local directory: /var/folders/bj/m6g82c6n41g83y3_dx02y7ch0000gp/T/dask-scratch-space/worker-zn2bx8bj | |
Read in Data#
Here, we are using the NOAA Global Mosaic of Geostationary Satellite Imagery (GMGSI) as our input data source from AWS s3.
[4]:
fs = s3fs.S3FileSystem(anon=True)
aws_urls = [
"s3://noaa-gmgsi-pds/GMGSI_LW/2024/01/01/00/GLOBCOMPLIR_nc.2024010100",
"s3://noaa-gmgsi-pds/GMGSI_LW/2024/01/01/01/GLOBCOMPLIR_nc.2024010101",
]
all_ds = list()
for aws_url in aws_urls:
fileObj = fs.open(aws_url)
all_ds.append(xr.open_dataset(fileObj, engine="h5netcdf"))
We loaded in two files and we will use xarray to concatenate them.
[5]:
combined_ds = xr.concat(all_ds, dim="time")
[6]:
combined_ds
[6]:
<xarray.Dataset> Size: 240MB
Dimensions: (time: 2, yc: 3000, xc: 4999)
Coordinates:
* time (time) datetime64[ns] 16B 2024-01-01 2024-01-01T01:00:00
lat (yc, xc) float32 60MB 72.72 72.72 72.72 ... -72.74 -72.74 -72.74
lon (yc, xc) float32 60MB 180.0 -179.9 -179.9 ... 179.8 179.8 179.9
Dimensions without coordinates: yc, xc
Data variables:
data (time, yc, xc) float32 120MB 206.0 204.0 204.0 ... 188.0 182.0
Attributes:
Conventions: CF-1.4
Source: McIDAS Area File
Satellite Sensor: DERIVED DATA
time_coverage_start: 2024-01-01T00:00:00
instrument_name: GLOBCOMPLIR
history: Mon Jan 1 00:38:21 2024: ncks -d xc,0,4998 templir...
NCO: netCDF Operators version 4.7.5 (Homepage = http://n...These feature detection parameters are just examples.
tobac Feature Detection#
[7]:
parameters_features = {}
parameters_features["position_threshold"] = "weighted_diff"
parameters_features["sigma_threshold"] = 0.5
parameters_features["n_min_threshold"] = 4
parameters_features["target"] = "minimum"
parameters_features["threshold"] = [180, 170]
parameters_features["PBC_flag"] = "hdim_2"
While future versions (1.6 and greater) of tobac will support xarray natively in feature detection and segmentation, current versions of tobac rely on Iris for gridded data. Because of this, we have to make some conversions to have this data be compatible with iris.
[8]:
# iris issues
combined_ds["data"].attrs["units"] = "kelvin"
combined_ds["data"]["time"].attrs["long_name"] = "time"
Now, we will use a dask bag to parallelize our feature detection over time.
[9]:
b = db.from_sequence(
[
combined_ds["data"][x : x + 1][0:500, 0:500]
for x in range(len(combined_ds["time"]))
],
npartitions=1,
)
out_feature_dfs = db.map(
lambda x: tobac.feature_detection_multithreshold(
x, 4000, **parameters_features
),
b,
).compute()
/Users/seanfreeman/mambaforge/envs/tobac_dev_3_13/lib/python3.13/site-packages/distributed/client.py:3374: UserWarning: Sending large graph of size 57.21 MiB.
This may cause some slowdown.
Consider loading the data with Dask directly
or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
warnings.warn(
Combining parallel-detected features into one coherent DataFrame#
[10]:
tobac.utils.general.combine_feature_dataframes(out_feature_dfs)
[10]:
| frame | idx | hdim_1 | hdim_2 | num | threshold_value | feature | time | timestr | lat | lon | |
|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 0 | 1 | 0.975325 | 60.718132 | 26 | 180 | 1 | 2024-01-01 00:00:00 | 2024-01-01 00:00:00 | 72.694528 | -175.628134 |
| 1 | 0 | 2 | 0.670018 | 79.074818 | 16 | 180 | 2 | 2024-01-01 00:00:00 | 2024-01-01 00:00:00 | 72.701065 | -174.306289 |
| 2 | 0 | 5 | 0.358115 | 1492.979778 | 15 | 180 | 3 | 2024-01-01 00:00:00 | 2024-01-01 00:00:00 | 72.707742 | -72.492469 |
| 3 | 0 | 6 | 0.482579 | 1531.520215 | 26 | 180 | 4 | 2024-01-01 00:00:00 | 2024-01-01 00:00:00 | 72.705077 | -69.717214 |
| 4 | 0 | 7 | 3.409896 | 2113.185770 | 285 | 180 | 5 | 2024-01-01 00:00:00 | 2024-01-01 00:00:00 | 72.642292 | -27.832080 |
| ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
| 1565 | 1 | 1283 | 498.180906 | 30.005031 | 16 | 170 | 1566 | 2024-01-01 01:00:00 | 2024-01-01 01:00:00 | 58.262049 | -177.839756 |
| 1566 | 1 | 1284 | 497.817803 | 715.879216 | 7 | 170 | 1567 | 2024-01-01 01:00:00 | 2024-01-01 01:00:00 | 58.275803 | -128.450668 |
| 1567 | 1 | 1285 | 498.158232 | 3295.186171 | 63 | 170 | 1568 | 2024-01-01 01:00:00 | 2024-01-01 01:00:00 | 58.262908 | 57.282537 |
| 1568 | 1 | 1288 | 498.454404 | 3793.095530 | 5 | 170 | 1569 | 2024-01-01 01:00:00 | 2024-01-01 01:00:00 | 58.251682 | 93.136465 |
| 1569 | 1 | 1293 | 498.648911 | 3317.641704 | 6 | 170 | 1570 | 2024-01-01 01:00:00 | 2024-01-01 01:00:00 | 58.244310 | 58.899535 |
1570 rows × 11 columns