The Planetary Computer Hub is a JupyterHub paired with Dask Gateway for easily creating Dask clusters to distribute your computation on a cluster of machines.
Use dask_gateway.GatewayCluster
to quickly create a Dask cluster.
import dask_gateway
cluster = dask_gateway.GatewayCluster()
client = cluster.get_client()
cluster.scale(4)
print(cluster)
GatewayCluster<prod.a5fb51af6db1479aafd4b5e665a9febf, status=running>
Now that we've connected a client to the cluster, all computations using Dask will happen on our 4-worker cluster.
The Dask Dashboard provides invaluable information on the activity of your cluster. Clicking the "Dashboard" link at the bottom of the cluster repr will open a new browser tab.
We also include the dask-labextension for laying out the Dask dashboard as tabs in the Jupyterlab workspace.
To using the dask-labextension, copy the "Dashboard" address from the cluster repr, click the orange Dask logo on the lefthand navigation bar, and paste the dashboard address
You can close your cluster, freeing up its resources, by calling cluster.close()
.
cluster.close()
Dask Clusters can automatically adapt the cluster size based on the size of the workload. Use cluster.adapt(minimum, maximum)
to enable adaptive mode.
import dask_gateway
cluster = dask_gateway.GatewayCluster()
client = cluster.get_client()
cluster.adapt(minimum=2, maximum=50)
Dask will add workers as necessary when a computation is submitted. As an example, we'll compute the minimum daily temperature averaged over all of Hawaii, using the Daymet datset.
import xarray as xr
import fsspec
account_name = 'daymeteuwest'
container_name = 'daymet-zarr'
store = fsspec.get_mapper(f'az://{container_name}/daily/hi.zarr', account_name=account_name)
ds = xr.open_zarr(store, consolidated=True)
The .compute()
call in the next cell is what triggers computation and causes Dask to scale the cluster up to a dozen or so workers.
timeseries = ds['tmin'].mean(dim=["x", "y"]).compute()
import matplotlib.pyplot as plt
fig, ax = plt.subplots(figsize=(12, 6))
timeseries.plot(ax=ax);
cluster.close()
dask_gateway.GatewayCluster
creates a cluster with some default settings, which might not be appropriate for your workload. For example, we might have a memory-intensive worload which requires more memory per CPU core. Or we might need to set environment variables on the workers.
To customize your cluster, create a Gateway
object and then customize the options.
import dask_gateway
gateway = dask_gateway.Gateway()
cluster_options = gateway.cluster_options()
In a Jupyter Notebook, you can use the HTML widget to customize the options. Or using Python you can adjust the values programatically. We'll ask for 16GiB of memory per worker.
cluster_options["worker_memory"] = 16
Now create your cluster. Make sure to pass the cluster_options
object to gateway.new_cluster
.
cluster = gateway.new_cluster(cluster_options)
client = cluster.get_client()
cluster.scale(2)
The Dask documentation has much more information on using Dask for scalable computing. This JupyterHub deployment uses Dask Gateway to manage creating Dask clusters.