#!/usr/bin/env python # coding: utf-8 # # Distributed Scikit-Learn for CPU Bound Problems # # This example demonstrates how Dask can scale scikit-learn to a cluster of machines for a CPU-bound problem. # We'll fit a large model, a grid-search over many hyper-parameters, on a small dataset. # # This video talks demonstrates the same example on a larger cluster. # In[ ]: from IPython.display import HTML HTML("""""") # In[ ]: from dask.distributed import Client, progress client = Client(processes=False, threads_per_worker=4, n_workers=1, memory_limit='2GB') client # ## Distributed Training # # # # Scikit-learn uses [joblib](http://joblib.readthedocs.io/) for single-machine parallelism. This lets you train most estimators (anything that accepts an `n_jobs` parameter) using all the cores of your laptop or workstation. # # Dask registers a joblib backend. This lets you train those estimators using all the cores of your *cluster*, by changing one line of code. # # This is most useful for training large models on medium-sized datasets. You may have a large model when searching over many hyper-parameters, or when using an ensemble method with many individual estimators. For too small datasets, training times will typically be small enough that cluster-wide parallelism isn't helpful. For too large datasets (larger than a single machine's memory), the scikit-learn estimators may not be able to cope (though Dask-ML provides other ways for working with larger than memory datasets). # In[ ]: import dask_ml.joblib # register the distriubted backend from pprint import pprint from time import time import logging from sklearn.datasets import fetch_20newsgroups from sklearn.feature_extraction.text import CountVectorizer from sklearn.feature_extraction.text import TfidfTransformer from sklearn.linear_model import SGDClassifier from sklearn.model_selection import GridSearchCV from sklearn.pipeline import Pipeline # In[ ]: # Scale Up: set categories=None to use all the categories categories = [ 'alt.atheism', 'talk.religion.misc', ] print("Loading 20 newsgroups dataset for categories:") print(categories) data = fetch_20newsgroups(subset='train', categories=categories) print("%d documents" % len(data.filenames)) print("%d categories" % len(data.target_names)) print() # We'll define a small pipeline that combines text feature extraction with a simple classifier. # In[ ]: pipeline = Pipeline([ ('vect', CountVectorizer()), ('tfidf', TfidfTransformer()), ('clf', SGDClassifier(max_iter=1000)), ]) # Grid search over some parameters. # In[ ]: parameters = { 'vect__max_df': (0.5, 0.75, 1.0), #'vect__max_features': (None, 5000, 10000, 50000), 'vect__ngram_range': ((1, 1), (1, 2)), # unigrams or bigrams #'tfidf__use_idf': (True, False), #'tfidf__norm': ('l1', 'l2'), # 'clf__alpha': (0.00001, 0.000001), # 'clf__penalty': ('l2', 'elasticnet'), #'clf__n_iter': (10, 50, 80), } # In[ ]: grid_search = GridSearchCV(pipeline, parameters, n_jobs=-1, verbose=1, cv=3, refit=False, iid=False) # To fit this normally, we would write # # # ```python # grid_search.fit(data.data, data.target) # ``` # # That would use the default joblib backend (multiple processes) for parallelism. # To use the Dask distributed backend, which will use a cluster of machines to train the model, perform the fit in a `parallel_backend` context. # In[ ]: from sklearn.externals import joblib # In[ ]: with joblib.parallel_backend('dask', scatter=[data.data, data.target]): grid_search.fit(data.data, data.target) # If you had your distributed dashboard open during that fit, you'll notice that each worker performs some of the fit tasks.