Dask for Machine Learning

This is a high-level overview demonstrating some the components of Dask-ML. Visit the main Dask-ML documentation, see the dask tutorial notebook 08, or explore some of the other machine-learning examples.

In [ ]:
from dask.distributed import Client, progress
client = Client(processes=False, threads_per_worker=4,
                n_workers=1, memory_limit='2GB')
client

Distributed Training

Scikit-learn uses joblib for single-machine parallelism. This lets you train most estimators (anything that accepts an n_jobs parameter) using all the cores of your laptop or workstation.

Alternatively, Scikit-Learn can use Dask for parallelism. This lets you train those estimators using all the cores of your cluster without significantly changing your code.

This is most useful for training large models on medium-sized datasets. You may have a large model when searching over many hyper-parameters, or when using an ensemble method with many individual estimators. For too small datasets, training times will typically be small enough that cluster-wide parallelism isn't helpful. For too large datasets (larger than a single machine's memory), the scikit-learn estimators may not be able to cope (see below).

Create Scikit-Learn Estimator

In [ ]:
from sklearn.datasets import make_classification
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV
import pandas as pd

We'll use scikit-learn to create a pair of small random arrays, one for the features X, and one for the target y.

In [ ]:
X, y = make_classification(n_samples=1000, random_state=0)
X[:5]

We'll fit a Support Vector Classifier, using grid search to find the best value of the $C$ hyperparameter.

In [ ]:
param_grid = {"C": [0.001, 0.01, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0],
              "kernel": ['rbf', 'poly', 'sigmoid'],
              "shrinking": [True, False]}

grid_search = GridSearchCV(SVC(gamma='auto', random_state=0, probability=True),
                           param_grid=param_grid,
                           return_train_score=False,
                           iid=True,
                           cv=3,
                           n_jobs=-1)

To fit that normally, we would call

grid_search.fit(X, y)

To fit it using the cluster, we just need to use a context manager provided by joblib.

In [ ]:
from sklearn.externals import joblib

with joblib.parallel_backend('dask'):
    grid_search.fit(X, y)

We fit 48 different models, one for each hyper-parameter combination in param_grid, distributed across the cluster. At this point, we have a regular scikit-learn model, which can be used for prediction, scoring, etc.

In [ ]:
pd.DataFrame(grid_search.cv_results_).head()
In [ ]:
grid_search.predict(X)[:5]
In [ ]:
grid_search.score(X, y)

For more on training scikit-learn models with distributed joblib, see the dask-ml documentation.

Training on Large Datasets

Most estimators in scikit-learn are designed to work on in-memory arrays. Training with larger datasets may require different algorithms.

All of the algorithms implemented in Dask-ML work well on larger than memory datasets, which you might store in a dask array or dataframe.

In [ ]:
%matplotlib inline
In [ ]:
import dask_ml.datasets
import dask_ml.cluster
import matplotlib.pyplot as plt

In this example, we'll use dask_ml.datasets.make_blobs to generate some random dask arrays.

In [ ]:
X, y = dask_ml.datasets.make_blobs(n_samples=10000000,
                                   chunks=1000000,
                                   random_state=0,
                                   centers=3)
X = X.persist()
X

We'll use the k-means implemented in Dask-ML to cluster the points. It uses the k-means|| (read: "k-means parallel") initialization algorithm, which scales better than k-means++. All of the computation, both during and after initialization, can be done in parallel.

In [ ]:
km = dask_ml.cluster.KMeans(n_clusters=3, init_max_iter=2, oversampling_factor=10)
km.fit(X)

We'll plot a sample of points, colored by the cluster each falls into.

In [ ]:
fig, ax = plt.subplots()
ax.scatter(X[::10000, 0], X[::10000, 1], marker='.', c=km.labels_[::10000],
           cmap='viridis', alpha=0.25);

For all the estimators implemented in Dask-ML, see the API documentation.