Scikit-learn supports out-of-core learning (fitting a model on a dataset that doesn't fit in RAM), through it's partial_fit API. See here.

The basic idea is that, for certain estimators, learning can be done in batches. The estimator will see a batch, and then incrementally update whatever it's learning (the coefficients, for example).

Unfortunately, the partial_fit API doesn't play that nicely with my favorite part of scikit-learn: pipelines. You would essentially need every chain in the pipeline to have an out-of-core parital_fit version, which isn't really feasible. Setting that aside, it wouldn't be great for a user, since working with generators of datasets is awkward.

Fortunately, we have a great data containers for larger than memory arrays and dataframes: dask.array and dask.dataframe. We can

  1. Use dask for pre-processing data in an out-of-core manner
  2. Use scikit-learn to fit the actual model, out-of-core, using the partial_fit API

The final piece of the puzzle is a nice little wrapper for these scikit-learn models that can be used in pipelines. I've started this in dask-ml. I'm eager to have additional contributions.

In [1]:
import dask.array as da
import dask.dataframe as dd

from daskml.datasets import make_classification
from daskml.linear_model import BigSGDClassifier
from daskml.preprocessing import StandardScaler

from sklearn.pipeline import make_pipeline

Let's make an X and y for classification. We'll make a bunch of arrays and store them on disk using HDF5.

Generate data

In [2]:
import string
In [3]:
n_blocks = 100

Let's generate a fake dataset, replicate it 100 times, and store each block in a parquet file. This simulates a database or central store of a large dataset.

In [4]:
X, y = make_classification(n_samples=1_000_000, chunks=500_000)

for i in range(n_blocks):
    X.to_dask_dataframe(columns=list(string.ascii_letters[:20])).to_parquet(f"X-{i}.parq")
    y.to_dask_dataframe(columns='y').to_parquet(f"y-{i}.parq")

And a utility function to read it in.

In [5]:
def read():
    Xs = []
    ys = []
    for i in range(n_blocks):
        xx = dd.read_parquet(f"X-{i}.parq/")
        yy = dd.read_parquet(f"y-{i}.parq/")
        shapes = [j - i for i, j in zip(xx.divisions, xx.divisions[1:])]
        shapes[-1] += 1

        x = [da.from_delayed(chunk.values, shape=(shapes[i], 20), dtype='f8')
                             for i, chunk in enumerate(xx.to_delayed())]
        y = [da.from_delayed(chunk.values, shape=(shapes[i], 1), dtype='f8')
                             for i, chunk in enumerate(yy.to_delayed())]
        Xs.append(da.concatenate(x, axis=0).rechunk((500_000, 20)))
        ys.append(da.concatenate(y, axis=0).rechunk((500_000, 1)))
    return da.concatenate(Xs, axis=0), da.concatenate(ys, axis=0).squeeze()

Now we'll read them into a pair of dask arrays.

In [9]:
X, y = read()
In [10]:
X
Out[10]:
dask.array<concatenate, shape=(100000000, 20), dtype=float64, chunksize=(500000, 20)>
In [11]:
y
Out[11]:
dask.array<squeeze, shape=(100000000,), dtype=float64, chunksize=(500000,)>
In [12]:
(X.nbytes + y.nbytes) / 10**9
Out[12]:
16.8

In total, we'll be fitting the model on about 17 GB of data (100,000,000 rows by 20 columns), all floats. My laptop has 16 GB of RAM, so it'd be impossible to do this in main memory alone.

To demonstrate the idea, we'll have a small pipeline

  1. Scale the features by mean and variance
  2. Fit an SGDClassifer

I've implemented a daskml.preprocessing.StandardScaler, using dask, in about 40 lines of code. This will operate completely in parallel.

I haven't implemented a custom SGDClassifier, because that'd be much more than 40 lines of code. I have a small wrapper that will use scikit-learn's implementation to provide fit method that operates out-of-core, but not in parallel.

In [15]:
from daskml.preprocessing import StandardScaler
from daskml.linear_model import BigSGDClassifier

from dask.diagnostics import ResourceProfiler, Profiler, ProgressBar
In [16]:
%%time
rp = ResourceProfiler()
p = Profiler()

pipe = make_pipeline(
    StandardScaler(),
    BigSGDClassifier(classes=[0, 1], max_iter=1000, tol=1e-3, random_state=2),
)

with p, rp:
    pipe.fit(X, y)
CPU times: user 2min 38s, sys: 1min 44s, total: 4min 22s
Wall time: 1min 47s
In [17]:
p.visualize()
/Users/taugspurger/Envs/dask-dev/lib/python3.6/site-packages/bokeh/util/deprecation.py:34: BokehDeprecationWarning: ResizeTool is removed in Bokeh 0.12.7, adding it is a no-op. In the future, accessing ResizeTool will be an error
  warn(message)
Out[17]:
Figure(
id = 'cb1de8ac-f0b1-4690-b58f-45765e3f473f', …)

That graph shows the issue pretty well. We get good parallelism for reading from disk and computing the StandardScaler. But once we hit the final stage in the pipeline, which is calling SGDClassifier.partial_fit a bunch of times, everything is serial.

Prediction is completely parallel.

In [18]:
%time predictions = pipe.predict(X)
CPU times: user 8.33 ms, sys: 1.05 ms, total: 9.38 ms
Wall time: 9.26 ms

Well, dask is lazy so that did actually complete in 9 ms :)

Let's write it to disk.

In [19]:
%%time

with rp, p:
    predictions.to_dask_dataframe(columns='a').to_parquet('predictions.parq')
CPU times: user 51.2 s, sys: 1min, total: 1min 51s
Wall time: 39.1 s

That's from disk, to prediction, and back to disk, for 16 GB in data in 40s, while using all 8 cores on my laptop.

In [20]:
p.visualize()
/Users/taugspurger/Envs/dask-dev/lib/python3.6/site-packages/bokeh/util/deprecation.py:34: BokehDeprecationWarning: ResizeTool is removed in Bokeh 0.12.7, adding it is a no-op. In the future, accessing ResizeTool will be an error
  warn(message)
Out[20]:
Figure(
id = '9c8e31b6-9830-4e5a-b9a3-a4af96cdecfa', …)