 # Overview¶

• Enables parallel and larger-than-memory computations

• Uses familiar APIs you're used to from projects like NumPy, Pandas, and scikit-learn

• Allows you to scale existing workflows with minimal code changes

• Dask works on your laptop, but also scales out to large clusters

• Offers great built-in diagnosic tools In [ ]:
def inc(i):
return i + 1

return a + b

a, b = 1, 12
c = inc(a)
d = inc(b)

print(f'output = {output}')


This computation can be encoded in the following task graph: • Graph of inter-related tasks with dependencies between them

• Circular nodes in the graph are Python function calls

• Square nodes are Python objects that are created by one task as output and can be used as inputs in another task

# High Level Collections¶

• Dask arrays are chunked, n-dimensional arrays

• Can think of a Dask array as a collection of NumPy ndarray arrays

• Dask arrays implement a large subset of the NumPy API using blocked algorithms

• For many purposes Dask arrays can serve as drop-in replacements for NumPy arrays In [ ]:
import numpy as np

In [ ]:
arr_np = np.arange(1, 50, 3)
arr_np


We can create a Dask array in a similar manner, but need to specify a chunks argument to tell Dask how to break up the underlying array into chunks.

In [ ]:
arr_da = da.arange(1, 50, 3, chunks=5)    # Each chunk is 5 items long
type(arr_da)

In [ ]:
arr_da    # Dask arrays have nice HTML output in Jupyter notebooks


Dask arrays look and feel like NumPy arrays. For example, they have dtype and shape attributes

In [ ]:
print(arr_da.dtype)
print(arr_da.shape)


Dask arrays are lazily evaluated. The result from a computation isn't computed until you ask for it. Instead, a Dask task graph for the computation is produced. You can visualize the task graph using the visualize() method.

In [ ]:
arr_da.visualize()


To compute a task graph call the compute() method

In [ ]:
arr_da.compute()    # We'll go into more detail about .compute() later on


The result of this computation is a fimilar NumPy ndarray

In [ ]:
type(arr_da.compute())


Dask arrays support a large portion of the NumPy interface:

• Arithmetic and scalar mathematics: +, *, exp, log, ...

• Reductions along axes: sum(), mean(), std(), sum(axis=0), ...

• Tensor contractions / dot products / matrix multiply: tensordot

• Axis reordering / transpose: transpose

• Slicing: x[:100, 500:100:-2]

• Fancy indexing along single axes with lists or numpy arrays: x[:, [10, 1, 5]]

• Array protocols like __array__ and __array_ufunc__

• Some linear algebra: svd, qr, solve, solve_triangular, lstsq, ...

• ...

See the Dask array API docs for full details about what portion of the NumPy API is implemented for Dask arrays.

### Blocked Algorithms¶

Dask arrays are implemented using blocked algorithms. These algorithms break up a computation on a large array into many computations on smaller peices of the array. This minimizes the memory load (amount of RAM) of computations and allows for working with larger-than-memory datasets in parallel.

In [ ]:
x = da.random.random(20, chunks=5)
x

In [ ]:
result = x.sum()

In [ ]:
result.visualize()

In [ ]:
result.compute()


We can build more complex computations using the familiar NumPy operations we're used to.

In [ ]:
x = da.random.random(size=(1_000, 1_000), chunks=(250, 500))
x

In [ ]:
result = (x + x.T).sum(axis=0).mean()

In [ ]:
result.visualize()

In [ ]:
result.compute()


We can even perform computations on larger-than-memory arrays!

In [ ]:
x = da.random.random(size=(40_000, 40_000), chunks=(2_000, 2_000))

In [ ]:
x.nbytes / 1e9    # Size of array in gigabytes

In [ ]:
result = (x + x.T).sum(axis=0).mean()

In [ ]:
from dask.diagnostics import ProgressBar

with ProgressBar():
result.compute()


Note: Dask can be used to scale other array-like libraries that support the NumPy ndarray interface. For example, pydata/sparse for sparse arrays or CuPy for GPU-accelerated arrays.

Pandas is great for dealing with tabular datasets that can fit in memory on a single machine. Dask becomes useful when dealing with larger-than-memory datasets. We saw that a Dask arrays are composed of many NumPy arrays, chunked along one or more dimensions. It's similar for Dask DataFrames: a Dask DataFrame is composed of many Pandas DataFrames and the partitioning happens only along the index.

• Dask DataFrames are a collection of Pandas DataFrames

• Dask DataFrames implement a large subset of the Pandas API

• Backed by blocked algorithms that allow for parallel and out of core computation

• For many purposes Dask DataFrames can serve as drop-in replacements for Pandas DataFrames In [ ]:
import pandas as pd


Dask DataFrames support many of the same data I/O methods as Pandas. For example,

• read_csv \ to_csv
• read_hdf \ to_hdf
• read_json \ to_json
• read_parquet \ to_parquet

Can specify a chunksize argument to set the number of rows per partition

In [ ]:
ddf = dd.read_hdf('example_data.hdf', key='dataframe', chunksize=25)
ddf


The npartitions attribute tells us how many Pandas DataFrames make up our Dask DataFrame

In [ ]:
ddf.npartitions


Like Dask arrays, Dask DataFrames are lazily evaluated. Here, the dd.read_hdf function wraps several calls to pd.read_hdf, once for each partition of the Dask DataFrame.

In [ ]:
ddf.visualize()


Dask DataFrames cover a well-used portion of the Pandas API:

• Elementwise operations: df.x + df.y, df * df

• Row-wise selections: df[df.x > 0]

• Loc: df.loc[4.0:10.5]

• Common aggregations: df.x.max(), df.max()

• Is in: df[df.x.isin([1, 2, 3])]

• Datetime/string accessors: df.timestamp.month

• Froupby-aggregate (with common aggregations): df.groupby(df.x).y.max(), df.groupby('x').max()

• ...

See the Dask DataFrame API docs for full details about what portion of the Pandas API is implemented for Dask DataFrames.

In [ ]:
col_mean = ddf['col_1'].mean()
col_mean

In [ ]:
col_mean.visualize()

In [ ]:
col_mean.compute()


Sometimes problems don’t fit nicely into one of the high-level collections like Dask arrays or Dask DataFrames. In these cases, you can parallelize custom algorithms using the lower-level Dask delayed interface. This allows one to manually create task graphs with a light annotation of normal Python code.

In [ ]:
import time
import random

def inc(x):
time.sleep(random.random())
return x + 1

def double(x):
time.sleep(random.random())
return 2 * x

time.sleep(random.random())
return x + y

In [ ]:
%%time

data = [1, 2, 3, 4]

output = []
for x in data:
a = inc(x)
b = double(x)
output.append(c)

total = sum(output)


Dask delayed wraps function calls and delays their execution. delayed functions record what we want to compute (a function and input parameters) as a task in a graph that we’ll run later on parallel hardware by calling compute.

In [ ]:
from dask import delayed

In [ ]:
@delayed
def lazy_inc(x):
time.sleep(random.random())
return x + 1

In [ ]:
lazy_inc

In [ ]:
inc_output = lazy_inc(3)  # lazily evaluate inc(3)
inc_output

In [ ]:
inc_output.compute()


Using delayed functions, we can build up a task graph for the particular computation we want to perform

In [ ]:
double_inc_output = lazy_inc(inc_output)
double_inc_output

In [ ]:
double_inc_output.visualize()

In [ ]:
double_inc_output.compute()


We can use delayed to make our previous example computation lazy by wrapping all the function calls with delayed

In [ ]:
import time
import random

@delayed
def inc(x):
time.sleep(random.random())
return x + 1

@delayed
def double(x):
time.sleep(random.random())
return 2 * x

@delayed
time.sleep(random.random())
return x + y

In [ ]:
%%time

data = [1, 2, 3, 4]

output = []
for x in data:
a = inc(x)
b = double(x)
output.append(c)

total = delayed(sum)(output)
total

In [ ]:
total.visualize()

In [ ]:
%%time

total.compute()


I highly recommend checking out the Dask delayed best practices page to avoid some common pitfalls when using delayed.

# Schedulers¶

High-level collections like Dask arrays and Dask DataFrames, as well as the low-level dask.delayed interface build up task graphs for a computation. After these graphs are generated, they need to be executed (potentially in parallel). This is the job of a task scheduler. Different task schedulers exist within Dask. Each will consume a task graph and compute the same result, but with different performance characteristics. Dask has two different classes of schedulers: single-machine schedulers and a distributed scheduler.

## Single Machine Schedulers¶

Single machine schedulers provide basic features on a local process or thread pool and require no setup (only use the Python standard library). The different single machine schedulers Dask provides are:

• 'threads': The threaded scheduler executes computations with a local multiprocessing.pool.ThreadPool. The threaded scheduler is the default choice for Dask arrays, Dask DataFrames, and Dask delayed.

• 'processes': The multiprocessing scheduler executes computations with a local multiprocessing.Pool.

• 'single-threaded': The single-threaded synchronous scheduler executes all computations in the local thread, with no parallelism at all. This is particularly valuable for debugging and profiling, which are more difficult when using threads or processes.

You can configure which scheduler is used in a few different ways. You can set the scheduler globally by using the dask.config.set(scheduler=) command

In [ ]:
import dask

total.compute(); # Will use the multi-threading scheduler


or use it as a context manager to set the scheduler for a block of code

In [ ]:
with dask.config.set(scheduler='processes'):
total.compute()  # Will use the multi-processing scheduler


or even within a single compute call

In [ ]:
total.compute(scheduler='threads');  # Will use the multi-threading scheduler


The num_workers argument is used to specify the number of threads or processes to use

In [ ]:
total.compute(scheduler='threads', num_workers=4);


## Distributed Scheduler¶

Despite having "distributed" in it's name, the distributed scheduler works well on both single and multiple machines. Think of it as the "advanced scheduler".

The Dask distributed cluster is composed of a single centralized scheduler and one or more worker processes. A Client object is used as the user-facing entry point to interact with the cluster. Deploying a remote Dask cluster involves some additional setup. There are several projects for easily deploying a Dask cluster on commonly used computing resources:

• Dask-Jobqueue for deploying Dask on job queuing systems (e.g. PBS, Slurm, etc.)

Setting up the distributed scheduler locally just involves creating a Client object, which lets you interact with the "cluster" (local threads or processes on your machine).

In [ ]:
from dask.distributed import Client

In [ ]:
client = Client(threads_per_worker=4, n_workers=1)
client


Note: when we create a distributed scheduler Client, by default it registers itself as the default Dask scheduler. All .compute() calls will automatically start using the distributed scheduler unless otherwise told to use a different scheduler.

The distributed scheduler has many features:

See the Dask distributed documentation for full details about all the distributed scheduler features.

For this talk, I'd like to highlight two of the distributed scheduler features: real time diagnostics and the futures interface.

In [ ]:
x = da.ones((20_000, 20_000), chunks=(400, 400))
result = (x + x.T).sum(axis=0).mean()

In [ ]:
result.compute()


### Futures interface¶

The Dask distributed scheduler implements a superset of Python's concurrent.futures interface that allows for finer control and asynchronous computation.

In [ ]:
import time
import random

def inc(x):
time.sleep(random.random())
return x + 1

def double(x):
time.sleep(random.random())
return 2 * x

time.sleep(random.random())
return x + y


We can run these functions locally

In [ ]:
inc(1)


Or we can submit them to run remotely on a Dask worker node

In [ ]:
future = client.submit(inc, 1)
future


The submit function sends a function and arguments to the distributed scheduler for processing. It returns a Future object that refer to remote data on the cluster. The Future returns immediately while the computations run remotely in the background. There is no blocking of the local Python session.

If you wait a moment, and then check on the future again, you'll see that it has finished.

In [ ]:
future


Once the computation for a Future is complete, you can retrieve the result using the .result() method

In [ ]:
future.result()


Much like the delayed interface, we can submit tasks based on other futures. This will create a dependency between the inputs and outputs. Dask will track the execution of all tasks and ensure that downstream tasks are run at the proper time and place and with the proper data.

In [ ]:
x = client.submit(inc, 1)
y = client.submit(double, 2)
z

In [ ]:
z.result()


As an example, we can submit many tasks that depend on each other in a for-loop

In [ ]:
%%time

zs = []

for i in range(64):
x = client.submit(inc, i)     # x = inc(i)
y = client.submit(double, x)  # y = inc(x)
z = client.submit(add, x, y)  # z = inc(y)
zs.append(z)

total = client.submit(sum, zs)

In [ ]:
total

In [ ]:
total.result()


#### Custom computation: Tree summation¶

As an example of a non-trivial algorithm, consider the classic tree reduction. We accomplish this with a nested for-loop and a bit of normal Python logic.

finish           total             single output
^          /        \
|        c1          c2        neighbors merge
|       /  \        /  \
|     b1    b2    b3    b4     neighbors merge
^    / \   / \   / \   / \
start   a1 a2 a3 a4 a5 a6 a7 a8    many inputs

We can easily scale our distributed cluster up or down depending on our needs

In [ ]:
client.cluster.scale(3)  # ask for 3 4-thread workers

In [ ]:
L = zs
while len(L) > 1:
new_L = []
for i in range(0, len(L), 2):
new_L.append(future)
L = new_L                                   # swap old list for new

In [ ]:
del future, L, new_L, total


#### Building a computation dynamically¶

We can even dynamically submit tasks based on the output of other tasks. This gives more flexibility in situations where the computations may evolve over time.

For this, we can use operations like as_completed(), which returns futures in the order in which they complete.

In [ ]:
from dask.distributed import as_completed

zs = client.map(inc, zs)
seq = as_completed(zs)

while seq.count() > 2:  # at least two futures left
a = next(seq)
b = next(seq)


This was a brief demo of the distributed scheduler. It's has lots of other cool features not touched on here. For more information, check out the Distributed documentation.

# Scalable Machine Learning with Dask-ML¶

Dask-ML is a Python library for scalable machine learning in Python. Build on top of Dask collections and supports the scikit-learn API.

Three different approaches are taken to scaling modern machine learning algorithms:

• Parallelize scikit-learn directly

• Reimplement scalable algorithms with Dask arrays

• Partner with other distributed libraries (like XGBoost and TensorFlow)

In [ ]:
from dask_ml.datasets import make_classification

In [ ]:
X, y = make_classification(n_samples=1000,
n_features=2,
n_classes=2,
random_state=2,
chunks=10)

In [ ]:
X

In [ ]:
X_train, X_test, y_train, y_test = train_test_split(X, y,
test_size=0.3,
random_state=2)

In [ ]:
clf = LogisticRegression(max_iter=2)
clf.fit(X_train, y_train)

In [ ]:
y_pred = clf.predict(X_test)
accuracy_score(y_test, y_pred)