This example demonstrates how Dask can scale scikit-learn to a cluster of machines for a CPU-bound problem. We'll fit a large model, a grid-search over many hyper-parameters, on a small dataset.
This video talks demonstrates the same example on a larger cluster.
from IPython.display import HTML
HTML("""<iframe width="560" height="315" src="https://www.youtube.com/embed/5Zf6DQaf7jk" frameborder="0" allow="autoplay; encrypted-media" allowfullscreen> </iframe>""")
from dask.distributed import Client, progress
client = Client(processes=False, threads_per_worker=4, n_workers=1, memory_limit='2GB')
client
Scikit-learn uses joblib for single-machine parallelism. This lets you train most estimators (anything that accepts an n_jobs
parameter) using all the cores of your laptop or workstation.
Dask registers a joblib backend. This lets you train those estimators using all the cores of your cluster, by changing one line of code.
This is most useful for training large models on medium-sized datasets. You may have a large model when searching over many hyper-parameters, or when using an ensemble method with many individual estimators. For too small datasets, training times will typically be small enough that cluster-wide parallelism isn't helpful. For too large datasets (larger than a single machine's memory), the scikit-learn estimators may not be able to cope (though Dask-ML provides other ways for working with larger than memory datasets).
import dask_ml.joblib # register the distriubted backend
from pprint import pprint
from time import time
import logging
from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.linear_model import SGDClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import Pipeline
# Scale Up: set categories=None to use all the categories
categories = [
'alt.atheism',
'talk.religion.misc',
]
print("Loading 20 newsgroups dataset for categories:")
print(categories)
data = fetch_20newsgroups(subset='train', categories=categories)
print("%d documents" % len(data.filenames))
print("%d categories" % len(data.target_names))
print()
We'll define a small pipeline that combines text feature extraction with a simple classifier.
pipeline = Pipeline([
('vect', CountVectorizer()),
('tfidf', TfidfTransformer()),
('clf', SGDClassifier(max_iter=1000)),
])
Grid search over some parameters.
parameters = {
'vect__max_df': (0.5, 0.75, 1.0),
#'vect__max_features': (None, 5000, 10000, 50000),
'vect__ngram_range': ((1, 1), (1, 2)), # unigrams or bigrams
#'tfidf__use_idf': (True, False),
#'tfidf__norm': ('l1', 'l2'),
# 'clf__alpha': (0.00001, 0.000001),
# 'clf__penalty': ('l2', 'elasticnet'),
#'clf__n_iter': (10, 50, 80),
}
grid_search = GridSearchCV(pipeline, parameters, n_jobs=-1, verbose=1, cv=3, refit=False, iid=False)
To fit this normally, we would write
grid_search.fit(data.data, data.target)
That would use the default joblib backend (multiple processes) for parallelism.
To use the Dask distributed backend, which will use a cluster of machines to train the model, perform the fit in a parallel_backend
context.
from sklearn.externals import joblib
with joblib.parallel_backend('dask', scatter=[data.data, data.target]):
grid_search.fit(data.data, data.target)
If you had your distributed dashboard open during that fit, you'll notice that each worker performs some of the fit tasks.