Scikit-learn supports out-of-core learning (fitting a model on a dataset that doesn't fit in RAM), through it's partial_fit
API. See here.
The basic idea is that, for certain estimators, learning can be done in batches. The estimator will see a batch, and then incrementally update whatever it's learning (the coefficients, for example).
Unfortunately, the partial_fit
API doesn't play that nicely with my favorite part of scikit-learn: pipelines. You would essentially need every chain in the pipeline to have an out-of-core parital_fit
version, which isn't really feasible. Setting that aside, it wouldn't be great for a user, since working with generators of datasets is awkward.
Fortunately, we have a great data containers for larger than memory arrays and dataframes: dask.array
and dask.dataframe
. We can
partial_fit
APIThe final piece of the puzzle is a nice little wrapper for these scikit-learn models that can be used in pipelines. I've started this in dask-ml. I'm eager to have additional contributions.
import dask.array as da
import dask.dataframe as dd
from daskml.datasets import make_classification
from daskml.linear_model import BigSGDClassifier
from daskml.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline
Let's make an X
and y
for classification. We'll make a bunch of arrays and store them on disk using HDF5.
import string
n_blocks = 100
Let's generate a fake dataset, replicate it 100 times, and store each block in a parquet file. This simulates a database or central store of a large dataset.
X, y = make_classification(n_samples=1_000_000, chunks=500_000)
for i in range(n_blocks):
X.to_dask_dataframe(columns=list(string.ascii_letters[:20])).to_parquet(f"X-{i}.parq")
y.to_dask_dataframe(columns='y').to_parquet(f"y-{i}.parq")
And a utility function to read it in.
def read():
Xs = []
ys = []
for i in range(n_blocks):
xx = dd.read_parquet(f"X-{i}.parq/")
yy = dd.read_parquet(f"y-{i}.parq/")
shapes = [j - i for i, j in zip(xx.divisions, xx.divisions[1:])]
shapes[-1] += 1
x = [da.from_delayed(chunk.values, shape=(shapes[i], 20), dtype='f8')
for i, chunk in enumerate(xx.to_delayed())]
y = [da.from_delayed(chunk.values, shape=(shapes[i], 1), dtype='f8')
for i, chunk in enumerate(yy.to_delayed())]
Xs.append(da.concatenate(x, axis=0).rechunk((500_000, 20)))
ys.append(da.concatenate(y, axis=0).rechunk((500_000, 1)))
return da.concatenate(Xs, axis=0), da.concatenate(ys, axis=0).squeeze()
Now we'll read them into a pair of dask arrays.
X, y = read()
X
dask.array<concatenate, shape=(100000000, 20), dtype=float64, chunksize=(500000, 20)>
y
dask.array<squeeze, shape=(100000000,), dtype=float64, chunksize=(500000,)>
(X.nbytes + y.nbytes) / 10**9
16.8
In total, we'll be fitting the model on about 17 GB of data (100,000,000 rows by 20 columns), all floats. My laptop has 16 GB of RAM, so it'd be impossible to do this in main memory alone.
To demonstrate the idea, we'll have a small pipeline
SGDClassifer
I've implemented a daskml.preprocessing.StandardScaler
, using dask
, in about 40 lines of code. This will operate completely in parallel.
I haven't implemented a custom SGDClassifier
, because that'd be much more than 40 lines of code. I have a small wrapper that will use scikit-learn's implementation to provide fit method that operates out-of-core, but not in parallel.
from daskml.preprocessing import StandardScaler
from daskml.linear_model import BigSGDClassifier
from dask.diagnostics import ResourceProfiler, Profiler, ProgressBar
%%time
rp = ResourceProfiler()
p = Profiler()
pipe = make_pipeline(
StandardScaler(),
BigSGDClassifier(classes=[0, 1], max_iter=1000, tol=1e-3, random_state=2),
)
with p, rp:
pipe.fit(X, y)
CPU times: user 2min 38s, sys: 1min 44s, total: 4min 22s Wall time: 1min 47s
p.visualize()
/Users/taugspurger/Envs/dask-dev/lib/python3.6/site-packages/bokeh/util/deprecation.py:34: BokehDeprecationWarning: ResizeTool is removed in Bokeh 0.12.7, adding it is a no-op. In the future, accessing ResizeTool will be an error warn(message)
That graph shows the issue pretty well. We get good parallelism for reading from disk and computing the StandardScaler
. But once we hit the final stage in the pipeline, which is calling SGDClassifier.partial_fit
a bunch of times, everything is serial.
Prediction is completely parallel.
%time predictions = pipe.predict(X)
CPU times: user 8.33 ms, sys: 1.05 ms, total: 9.38 ms Wall time: 9.26 ms
Well, dask is lazy so that did actually complete in 9 ms :)
Let's write it to disk.
%%time
with rp, p:
predictions.to_dask_dataframe(columns='a').to_parquet('predictions.parq')
CPU times: user 51.2 s, sys: 1min, total: 1min 51s Wall time: 39.1 s
That's from disk, to prediction, and back to disk, for 16 GB in data in 40s, while using all 8 cores on my laptop.
p.visualize()
/Users/taugspurger/Envs/dask-dev/lib/python3.6/site-packages/bokeh/util/deprecation.py:34: BokehDeprecationWarning: ResizeTool is removed in Bokeh 0.12.7, adding it is a no-op. In the future, accessing ResizeTool will be an error warn(message)