Dask provides high-level Array, Bag, and DataFrame collections that mimic NumPy, lists, and Pandas but can operate in parallel on datasets that don’t fit into main memory. Dask’s high-level collections are alternatives to NumPy and Pandas for large datasets.
if problem size close to limits of RAM, but fits to disk
This notebook based mainly based on this three sources
import gc
import os
import time
import warnings
import numpy as np
import pandas as pd
import psutil
from dask import delayed
warnings.filterwarnings("ignore")
Let's write a little function for tracking memory that takes python process
def memory_footprint():
mem = psutil.Process(os.getpid()).memory_info().rss
return mem / 1024 ** 2
before = memory_footprint()
print(f"Memory used before is {round(before,2)} MB")
N = (1024 ** 2) // 8
x = np.random.randn(50 * N)
after = memory_footprint()
print(f"Memory used after is {round(after,2)} MB")
Computes, but doesn't bind result to a variable allocate extra memory
x ** 2
after1 = memory_footprint()
print(f" Extra memory obtained after computation {round(after1,2)} MB")
Dask Array implements a subset of the NumPy ndarray interface using blocked algorithms, cutting up the large array into many small arrays. This lets us compute on arrays larger than memory using all of our cores. We coordinate these blocked algorithms using Dask graphs.dask array documentation
import dask.array as da
y = da.from_array(x, chunks=len(x) // 4)
print("Dask arrays require little memory:", memory_footprint() - after1)
import time
t_start = time.time()
x.mean()
t_end = time.time()
print("Compute mean value of this numpy array \n")
print(
"Elapsed time for compute mean of numpy array (ms):",
round((t_end - t_start) * 1000),
)
t_start = time.time()
y.mean().compute()
t_end = time.time()
print("Compute the same with dask \n")
print(
"Elapsed time for compute mean of dask array (ms):", round((t_end - t_start) * 1000)
)
Actually, this example will never be used in practice, because if your numpy already in memory, any partitioning will always raise computational time. But if you need to process data from HDF5, NetCDF or bulk of numpy files from disk it could be extremely useful
But dask could be useful for small data with delayed computation. It could easily parallelize computation. Let's see the example with our previous numpy array
def f(z):
return np.sqrt(z + 4)
def g(y):
return y - 3
def h(x):
return x ** 2
time_start = time.time()
x = np.random.randn(50 * N)
y = h(x)
z = g(x)
w = f(z + y)
time_end = time.time()
print(
"Elapsed time for compute complex functions with numpy array (ms):",
round((time_end - time_start) * 1000),
)
y = delayed(h)(x)
z = delayed(g)(x)
w = delayed(f)(z + y)
print("After we get dask delayed object", w)
time_start = time.time()
w.compute()
time_end = time.time()
print(
"Elapsed time for compute complex functions with numpy array with dask delayed (ms):",
round((time_end - time_start) * 1000),
)
It is easily understood why computation time decreased with the computational graph. Let's do this with the second way of introducing delay functions
@delayed
def f(z):
return np.sqrt(z + 4)
@delayed
def g(y):
return y - 3
@delayed
def h(x):
return x ** 2
y = h(x)
z = g(x)
w = f(z + y)
w.visualize()
Dask DataFrames coordinate many Pandas DataFrames/Series arranged along the index. A Dask DataFrame is partitioned row-wise, grouping rows by index value for efficiency. These Pandas objects may live on disk or on other machines. (See documentation)[http://docs.dask.org/en/latest/dataframe.html]
Here we use the file athlete_events.csv
from this Kaggle Dataset
import dask.dataframe as dd
print("Let's return to start of our ML journey\n")
print("Load olympic dataset \n")
PATH = "../../data/athlete_events.csv"
df = pd.read_csv(PATH)
df.head()
m1 = memory_footprint()
dask_df = dd.read_csv(PATH)
m2 = memory_footprint()
print("Dask do not allocate memory after creation:", m2 - m1)
print("But we could see data as in pandas dataframe:")
dask_df.head()
# building delayed computation
print(
"We can do many operation the same way as in pandas, but without loading all data in memory \n "
)
sex_distr = (
dask_df.loc[dask_df["Games"].str.contains("1996")].groupby("Sex")["Age"].min()
)
print(
"Here we done selecting and aggregation exactly the same way as we did in pandas \n"
)
print("But there is not any computation, we create dask structure \ n")
sex_distr
%%time
print(
"Computation is time consuming, but we remember that we dont't need to load all data in memory for this computation \n"
)
print(sex_distr.compute())
%%time
print("Pandas of course more effective \n")
print(df.loc[df["Games"].str.contains("1996")].groupby("Sex")["Age"].min())
PATH_TO_DATA = "../../data/capstone_user_identification"
print("We can load all files in single dataframe \n")
print("Your dont't need this in Alica project, just an example \n ")
user10dask = dd.read_csv(os.path.join(PATH_TO_DATA, "10users/*.csv"))
print("We can look at the data")
print(user10dask)
user10dask.tail()
print(
"Let's see what happens if we want to count all sites (it could seen as a one more way for dictionary creation) \n"
)
count_sites = user10dask.groupby("site")["site"].count()
print("If we visualize this structure we'll see the picture of computation \n")
count_sites.visualize()
%%time
count_sites.compute().sort_values(ascending=False)[:20]
Dask Bag implements operations like map, filter, fold, and groupby on collections of Python objects. It does this in parallel with a small memory footprint using Python iterators. It is similar to a parallel version of PyToolz or a Pythonic version of the PySpark RDD.Dask bag documentation
Dask bags are often used to parallelize simple computations on unstructured or semi-structured data like text data, log files, JSON records, or user defined Python objects.
Let's see example with our Medium data from this competition.
import json
import dask.bag as db
print("Path to our medium data \n")
PATH = "../../data/kaggle_medium"
print(PATH)
print("Wrap train json to dask bag format \n")
items = db.read_text(os.path.join(PATH, "train.json"))
items
%%time
print("Let's look at one example \n")
print(items.take(1))
print("We can parse date with json library and get dict like object \n")
dict_items = items.map(json.loads)
print(type(dict_items))
dict_items.take(1)
print("We can take any key from all records \n")
title_bag = dict_items.pluck("title")
print("With take method we received tuple of objects \n")
print(title_bag.take(3))
We can write any function for processing data and apply it with map function
def clean_title(text):
import string
cut_set = set(string.punctuation)
cut_set.update(["”", "—", "…", "“", "⌘", "❤", "+", "®", "➜", "¬", "–"])
text = text.translate(text.maketrans("".join(cut_set), " " * len(cut_set)))
text = text.lower()
return text
title_bag = dict_items.pluck("title").map(clean_title)
title_bag.take(3)
Process meta_tags
meta_tags_bag = dict_items.pluck("meta_tags")
test_meta = meta_tags_bag.take(3)
test_meta[1]
def clean_meta_tags(meta):
author = meta["author"].strip()
min_reads = int(meta["twitter:data1"].split()[0])
return {"author": author, "min_reads": min_reads}
meta_tags_bag = meta_tags_bag.map(clean_meta_tags)
meta_tags_bag.take(1)
%%time
# content_bag = dict_items.pluck('content').map(clean_content)
title_bag = dict_items.pluck("title").map(clean_title)
published_bag = dict_items.pluck("published").map(lambda x: x["$date"])
meta_bag = dict_items.pluck("meta_tags").map(clean_meta_tags)
domain_bag = dict_items.pluck("domain")
@delayed
def combine_to_df(list_dict):
list_df = [pd.DataFrame(dict_) for dict_ in list_dict]
return pd.concat(list_df, axis=1)
combined = combine_to_df([published_bag, meta_bag, domain_bag])
combined.visualize()
# It takes time, around a minute
from dask.diagnostics import ProgressBar
with ProgressBar():
df = combined.compute()
df.columns = ["published", "Author", "min_reads", "domain"]
df.head()
print("We can create dask dataframe from pandas \n")
dd_no_content = dd.from_pandas(df, npartitions=4)
dd_no_content
%%time
print(
"Transform published column to datetime as we did with pandas, it will by slightly slowly than in pandas \n"
)
df["published"] = pd.to_datetime(df.published, format="%Y-%m-%dT%H:%M:%S.%fZ")
%%time
print("Transform published column to datetime with pandas, \n")
dd_no_content["published"] = dd.to_datetime(
dd_no_content.published, format="%Y-%m-%dT%H:%M:%S.%fZ"
).compute()
dd_no_content.head()
print(
"We can apply function with mixed transformation to dask dataframe written for pandas df without changes \n"
)
def additional_time_features_df(
df, to_cat_cols=["Author", "domain", "month", "year", "day_of_week"]
):
df["month"] = df["published"].apply(lambda ts: ts.month)
df["year"] = df["published"].apply(lambda ts: ts.year)
hour = df["published"].apply(lambda ts: ts.hour)
df["hour"] = hour
df["morning"] = ((hour >= 7) & (hour <= 11)).astype("float64")
df["day"] = ((hour >= 12) & (hour <= 18)).astype("int")
df["evening"] = ((hour >= 19) & (hour <= 23)).astype("int")
df["night"] = ((hour >= 0) & (hour <= 6)).astype("int")
df["sin_hour"] = np.sin(2 * np.pi * df["hour"] / 24)
df["cos_hour"] = np.cos(2 * np.pi * df["hour"] / 24)
df = df.drop(["hour"], axis=1)
day_of_week = df["published"].dt.dayofweek.astype("int")
df["day_of_week"] = day_of_week
df["weekend"] = (day_of_week >= 5).astype("int")
# turn to categorical
df[to_cat_cols] = df[to_cat_cols].astype("category")
return df
%%time
df_medium_train = additional_time_features_df(df.copy())
dd_medium_train = additional_time_features_df(dd_no_content)
%%time
dd_medium_train.compute()
Dask ML provides scalable machine learning algorithms in python which are compatible with scikit-learn. Let us first understand how scikit-learn handles the computations and then we will look at how Dask performs these operations differently. See dask-ml tutorials: Examples from dask ml
You need to install dask-ml at first
There are two main parts in dask ml: - approaches to handle big datasets - approaches to handle big models
The biggest model from our course was a random forest on text data in the week with Random Forest assignment. Below I just reproduce part of our assignment, but I reduced nrows and max features in Count vectorizer, but you can check with original parameters.
Here we use the movie_reviews_train.csv
file.
# Download data
df = pd.read_csv("../../data/movie_reviews_train.csv", nrows=5000)
# Split data to train and test
X_text = df["text"]
y_text = df["label"]
# Classes counts
df.label.value_counts()
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import GridSearchCV, StratifiedKFold
from sklearn.pipeline import Pipeline
# Split on 3 folds
skf = StratifiedKFold(n_splits=3, shuffle=True, random_state=17)
# In Pipeline we will modify the text and train logistic regression
classifier = Pipeline(
[
("vectorizer", CountVectorizer(max_features=500, ngram_range=(1, 3))),
("clf", LogisticRegression(random_state=17)),
]
)
%%time
parameters = {"clf__C": (0.1, 1, 10, 100)}
grid_search = GridSearchCV(classifier, parameters, scoring="roc_auc", cv=skf)
grid_search = grid_search.fit(X_text, y_text)
grid_search.best_score_
In this approach all we need to do is replace joblib to dask distributed. We need to initialize distributed client, and change backend
from dask.distributed import Client
%%time
from sklearn.externals import joblib
client = Client()
parameters = {"clf__C": (0.1, 1, 10, 100)}
grid_search = GridSearchCV(classifier, parameters, scoring="roc_auc", cv=skf)
t_start = time.time()
with joblib.parallel_backend("dask"):
grid_search.fit(X_text, y_text)
t_end = time.time()
print("Elapsed time for grid_search with joblib replace (s):", round((t_end - t_start)))
grid_search.best_score_
Parallel to Gridsearch CV in sklearn, Dask provides a library called Dask-search CV (Dask-search CV is now included in Dask ML). It merges steps so that there are less repetitions. Below are the installation steps for Dask-search. We need to install it separately
# pip3 install dask-searchcv
import dask_searchcv as dcv
We can use a pipelines in dask grid search, and according the documentation we should use dask with pipelines with many opeations which could be parallelized, especially included feature union, but I've tried and get an error as a result... Anyway time consuming operations as CountVectorizer couldn't be parallelized, so here gridsearch from dask only for classifier documentation.
%%time
vect = CountVectorizer(max_features=500, ngram_range=(1, 3))
Xvect = vect.fit_transform(X_text)
lr = LogisticRegression()
parameters = {"C": (0.1, 1, 10, 100)}
t_start = time.time()
grid_search = dcv.GridSearchCV(lr, parameters, scoring="roc_auc", cv=skf)
grid_search.fit(Xvect, y_text)
t_end = time.time()
print(
f"Elapsed time for grid_search (without time spended to vectorization) {round((t_end - t_start))} (s):"
)
grid_search.best_score_
I tried to see how good dask will be with random forest with original parameters, but sometimes this raise en error get "(OSError: [Errno 24] Too many open files) after execution, and I couldn't fix it...." Sometimes it works ok, for small data it works in most cases, but if you re-run this notebook several times there is a big chance to get such an error. So, I believe that dask-ml very usefull, but for know I definitely don't know how it should be used properly.
from sklearn.ensemble import RandomForestClassifier
rf = RandomForestClassifier(random_state=17)
min_samples_leaf = [1, 2, 3]
max_features = [0.3, 0.5, 0.7]
max_depth = [None]
parameters = {
"max_features": max_features,
"min_samples_leaf": min_samples_leaf,
"max_depth": max_depth,
}
grid_search = dcv.GridSearchCV(rf, parameters, scoring="roc_auc", cv=skf)
t_start = time.time()
grid_search.fit(Xvect, y_text)
t_end = time.time()
print(
f"Elapsed time for dask grid_search for Random Forest {round((t_end - t_start))} (s):"
)
There are number of models rewritten in dask, which could take dask object (huge arrays) and compute models on them. You could read more in dask documentation. Below an example with KMeans, but also there are dask version of linear models, processing functions. The notation is very similar to scikit-learn, and it should be easy to use.
from dask_ml import datasets
from dask_ml.cluster import KMeans
X, y = datasets.make_blobs(
n_samples=10000000, chunks=1000000, random_state=0, centers=3
)
# Persist will give you back a lazy dask.delayed object
X = X.persist()
X
km = KMeans(n_clusters=3, init_max_iter=2, oversampling_factor=10)
km.fit(X)
Actually I read the article about dask couple of days ago and I've decided that task with tutorial a good way to get acquainted with the library. So I ask you not to be very strict if I misunderstood something:))