#!/usr/bin/env python # coding: utf-8 # Scikit-learn supports out-of-core learning (fitting a model on a dataset that doesn't fit in RAM), through it's `partial_fit` API. See [here](http://scikit-learn.org/stable/modules/scaling_strategies.html#scaling-with-instances-using-out-of-core-learning). # # The basic idea is that, *for certain estimators*, learning can be done in batches. The estimator will see a batch, and then incrementally update whatever it's learning (the coefficients, for example). # # Unfortunately, the `partial_fit` API doesn't play that nicely with my favorite part of scikit-learn: [pipelines](http://scikit-learn.org/stable/modules/pipeline.html#pipeline). You would essentially need every chain in the pipeline to have an out-of-core `parital_fit` version, which isn't really feasible. Setting that aside, it wouldn't be great for a user, since working with generators of datasets is awkward. # # Fortunately, we *have* a great data containers for larger than memory arrays and dataframes: `dask.array` and `dask.dataframe`. We can # # 1. Use dask for pre-processing data in an out-of-core manner # 2. Use scikit-learn to fit the actual model, out-of-core, using the `partial_fit` API # # The final piece of the puzzle is a nice little wrapper for these scikit-learn models that can be used in pipelines. I've started this in [dask-ml](https://github.com/dask/dask-ml). I'm eager to have additional contributions. # In[1]: import dask.array as da import dask.dataframe as dd from daskml.datasets import make_classification from daskml.linear_model import BigSGDClassifier from daskml.preprocessing import StandardScaler from sklearn.pipeline import make_pipeline # Let's make an `X` and `y` for classification. We'll make a bunch of arrays and store them on disk using HDF5. # ## Generate data # In[2]: import string # In[3]: n_blocks = 100 # Let's generate a fake dataset, replicate it 100 times, and store each block in a parquet file. # This simulates a database or central store of a large dataset. # In[4]: X, y = make_classification(n_samples=1_000_000, chunks=500_000) for i in range(n_blocks): X.to_dask_dataframe(columns=list(string.ascii_letters[:20])).to_parquet(f"X-{i}.parq") y.to_dask_dataframe(columns='y').to_parquet(f"y-{i}.parq") # And a utility function to read it in. # In[5]: def read(): Xs = [] ys = [] for i in range(n_blocks): xx = dd.read_parquet(f"X-{i}.parq/") yy = dd.read_parquet(f"y-{i}.parq/") shapes = [j - i for i, j in zip(xx.divisions, xx.divisions[1:])] shapes[-1] += 1 x = [da.from_delayed(chunk.values, shape=(shapes[i], 20), dtype='f8') for i, chunk in enumerate(xx.to_delayed())] y = [da.from_delayed(chunk.values, shape=(shapes[i], 1), dtype='f8') for i, chunk in enumerate(yy.to_delayed())] Xs.append(da.concatenate(x, axis=0).rechunk((500_000, 20))) ys.append(da.concatenate(y, axis=0).rechunk((500_000, 1))) return da.concatenate(Xs, axis=0), da.concatenate(ys, axis=0).squeeze() # Now we'll read them into a pair of dask arrays. # In[9]: X, y = read() # In[10]: X # In[11]: y # In[12]: (X.nbytes + y.nbytes) / 10**9 # In total, we'll be fitting the model on about 17 GB of data (100,000,000 rows by 20 columns), all floats. My laptop has 16 GB of RAM, so it'd be impossible to do this in main memory alone. # # To demonstrate the idea, we'll have a small pipeline # # 1. Scale the features by mean and variance # 2. Fit an `SGDClassifer` # # I've implemented a `daskml.preprocessing.StandardScaler`, using `dask`, in about 40 lines of code. This will operate completely in parallel. # # I haven't implemented a custom `SGDClassifier`, because that'd be *much* more than 40 lines of code. I have a small wrapper that will use scikit-learn's implementation to provide fit method that operates out-of-core, but not in parallel. # In[15]: from daskml.preprocessing import StandardScaler from daskml.linear_model import BigSGDClassifier from dask.diagnostics import ResourceProfiler, Profiler, ProgressBar # In[16]: get_ipython().run_cell_magic('time', '', 'rp = ResourceProfiler()\np = Profiler()\n\npipe = make_pipeline(\n StandardScaler(),\n BigSGDClassifier(classes=[0, 1], max_iter=1000, tol=1e-3, random_state=2),\n)\n\nwith p, rp:\n pipe.fit(X, y)\n') # In[17]: p.visualize() # That graph shows the issue pretty well. We get good parallelism for reading from disk and computing the `StandardScaler`. But once we hit the final stage in the pipeline, which is calling `SGDClassifier.partial_fit` a bunch of times, everything is serial. # Prediction is completely parallel. # In[18]: get_ipython().run_line_magic('time', 'predictions = pipe.predict(X)') # Well, dask is lazy so that did actually complete in 9 ms :) # # Let's write it to disk. # In[19]: get_ipython().run_cell_magic('time', '', "\nwith rp, p:\n predictions.to_dask_dataframe(columns='a').to_parquet('predictions.parq')\n") # That's from disk, to prediction, and back to disk, for 16 GB in data in 40s, while using all 8 cores on my laptop. # In[20]: p.visualize()