#!/usr/bin/env python
# coding: utf-8
# # Distributed Scikit-Learn for CPU Bound Problems
#
# This example demonstrates how Dask can scale scikit-learn to a cluster of machines for a CPU-bound problem.
# We'll fit a large model, a grid-search over many hyper-parameters, on a small dataset.
#
# This video talks demonstrates the same example on a larger cluster.
# In[ ]:
from IPython.display import HTML
HTML("""""")
# In[ ]:
from dask.distributed import Client, progress
client = Client(processes=False, threads_per_worker=4, n_workers=1, memory_limit='2GB')
client
# ## Distributed Training
#
#
#
# Scikit-learn uses [joblib](http://joblib.readthedocs.io/) for single-machine parallelism. This lets you train most estimators (anything that accepts an `n_jobs` parameter) using all the cores of your laptop or workstation.
#
# Dask registers a joblib backend. This lets you train those estimators using all the cores of your *cluster*, by changing one line of code.
#
# This is most useful for training large models on medium-sized datasets. You may have a large model when searching over many hyper-parameters, or when using an ensemble method with many individual estimators. For too small datasets, training times will typically be small enough that cluster-wide parallelism isn't helpful. For too large datasets (larger than a single machine's memory), the scikit-learn estimators may not be able to cope (though Dask-ML provides other ways for working with larger than memory datasets).
# In[ ]:
import dask_ml.joblib # register the distriubted backend
from pprint import pprint
from time import time
import logging
from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.linear_model import SGDClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import Pipeline
# In[ ]:
# Scale Up: set categories=None to use all the categories
categories = [
'alt.atheism',
'talk.religion.misc',
]
print("Loading 20 newsgroups dataset for categories:")
print(categories)
data = fetch_20newsgroups(subset='train', categories=categories)
print("%d documents" % len(data.filenames))
print("%d categories" % len(data.target_names))
print()
# We'll define a small pipeline that combines text feature extraction with a simple classifier.
# In[ ]:
pipeline = Pipeline([
('vect', CountVectorizer()),
('tfidf', TfidfTransformer()),
('clf', SGDClassifier(max_iter=1000)),
])
# Grid search over some parameters.
# In[ ]:
parameters = {
'vect__max_df': (0.5, 0.75, 1.0),
#'vect__max_features': (None, 5000, 10000, 50000),
'vect__ngram_range': ((1, 1), (1, 2)), # unigrams or bigrams
#'tfidf__use_idf': (True, False),
#'tfidf__norm': ('l1', 'l2'),
# 'clf__alpha': (0.00001, 0.000001),
# 'clf__penalty': ('l2', 'elasticnet'),
#'clf__n_iter': (10, 50, 80),
}
# In[ ]:
grid_search = GridSearchCV(pipeline, parameters, n_jobs=-1, verbose=1, cv=3, refit=False, iid=False)
# To fit this normally, we would write
#
#
# ```python
# grid_search.fit(data.data, data.target)
# ```
#
# That would use the default joblib backend (multiple processes) for parallelism.
# To use the Dask distributed backend, which will use a cluster of machines to train the model, perform the fit in a `parallel_backend` context.
# In[ ]:
from sklearn.externals import joblib
# In[ ]:
with joblib.parallel_backend('dask', scatter=[data.data, data.target]):
grid_search.fit(data.data, data.target)
# If you had your distributed dashboard open during that fit, you'll notice that each worker performs some of the fit tasks.