Dask for Machine Learning

Dask integrates well with machine learning libraries like scikit-learn.

Dask-ML implements scalable machine learning algorithms that are compatible with scikit-learn.

In [ ]:
from dask_gateway import Gateway

gateway = Gateway()
cluster = gateway.new_cluster()
cluster.scale(10)
cluster
In [ ]:
from dask.distributed import Client, progress
c = Client(cluster)
c

Distributed Training

Scikit-learn uses joblib for single-machine parallelism. This lets you train most estimators (anything that accepts an n_jobs parameter) using all the cores of your laptop or workstation.

Dask registers a joblib backend. This lets you train those estimators using all the cores of your cluster, by changing one line of code.

This is most useful for training large models on medium-sized datasets. You may have a large model when searching over many hyper-parameters, or when using an ensemble method with many individual estimators. For too small datasets, training times will typically be small enough that cluster-wide parallelism isn't helpful. For too large datasets (larger than a single machine's memory), the scikit-learn estimators may not be able to cope (see below).

In [ ]:
from sklearn.datasets import make_classification
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV
import pandas as pd

We'll use scikit-learn to create a pair of small random arrays, one for the features X, and one for the target y.

In [ ]:
X, y = make_classification(n_samples=1000, random_state=0)
X[:5]

We'll fit a Support Vector Classifier, using grid search to find the best combination of hyperparameters.

In [ ]:
param_grid = {"C": [0.001, 0.01, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0],
              "kernel": ['rbf', 'poly', 'sigmoid'],
              "shrinking": [True, False]}

grid_search = GridSearchCV(SVC(gamma='auto', random_state=0, probability=True),
                           param_grid=param_grid,
                           return_train_score=False,
                           n_jobs=-1)

To fit that normally, we'd call

grid_search.fit(X, y)

To fit it using the cluster, we just need to use a context manager provided by joblib. We'll pre-scatter the data to each worker, which can help with performance.

In [ ]:
import joblib

with joblib.parallel_backend('dask', scatter=[X, y]):
    grid_search.fit(X, y)

We fit 48 different models, one for each hyper-parameter combination in param_grid, distributed across the cluster. At this point, we have a regular scikit-learn model, which can be used for prediction, scoring, etc.

In [ ]:
pd.DataFrame(grid_search.cv_results_).head()
In [ ]:
grid_search.predict(X)[:5]
In [ ]:
grid_search.score(X, y)

For more on training scikit-learn models with distributed joblib, see the dask-ml documentation.

Training on Large Datasets

Most estimators in scikit-learn are designed to work on in-memory arrays. Training with larger datasets may require different algorithms.

All of the algorithms implemented in Dask-ML work well on larger than memory datasets, which you might store in a dask array or dataframe.

In [ ]:
%matplotlib inline
In [ ]:
import dask_ml.datasets
import dask_ml.cluster
import matplotlib.pyplot as plt

In this example, we'll use dask_ml.datasets.make_blobs to generate some random dask arrays.

In [ ]:
X, y = dask_ml.datasets.make_blobs(n_samples=10000000,
                                   chunks=1000000,
                                   random_state=0,
                                   centers=3)
X = X.persist()
X

We'll use the k-means implemented in Dask-ML to cluster the points. It uses the k-means|| (read: "k-means parallel") initialization algorithm, which scales better than k-means++. All of the computation, both during and after initialization, can be done in parallel.

In [ ]:
km = dask_ml.cluster.KMeans(n_clusters=3, init_max_iter=2, oversampling_factor=10)
km.fit(X)

We'll plot a sample of points, colored by the cluster each falls into.

In [ ]:
fig, ax = plt.subplots()
ax.scatter(X[::10000, 0], X[::10000, 1], marker='.', c=km.labels_[::10000],
           cmap='viridis', alpha=0.25);

For all the estimators implemented in Dask-ML, see the API documentation.