import os
import numpy as np
import scipy.sparse as sp
import pandas as pd
from glob import glob
import dask
import dask.bag as db
import joblib
from distributed import Client
client = Client()
client
Client
|
Cluster
|
rm -rf sparse_chunks/
folder = 'sparse_chunks'
n_features = int(1e5)
n_informative = int(1e4)
n_chunks = int(1e1)
chunk_size = int(1e2)
rng = np.random.RandomState(42)
true_coef = rng.randn(n_features)
true_coef[n_informative:] = 0
def make_chunk(n_samples, true_coef, chunk_idx, format='csr',
density=1e-3, noise=1e-1):
rng = np.random.RandomState(chunk_idx)
n_features = true_coef.shape[0]
input_data = sp.rand(n_samples, n_features, format=format,
density=density, random_state=rng)
noise = rng.normal(loc=0, scale=noise, size=n_samples)
target = input_data.dot(true_coef).ravel() + noise
return chunk_idx, input_data, (target > 0).astype(np.int32)
def save_to_disk(chunk_idx, X, y, folder='sparse_chunks'):
os.makedirs(folder, exist_ok=True)
filename = "sparse_chunk_{:04d}.pkl".format(chunk_idx)
joblib.dump((X, y), os.path.join(folder, filename))
return filename
def load_from_disk(chunk_idx, filename):
X, y = joblib.load(filename)
return chunk_idx, X, y
if not os.path.exists(folder):
print("Generating chunks of sparse data into", folder)
b = db.from_sequence([(chunk_size, true_coef, i)
for i in range(n_chunks)])
b = b.starmap(make_chunk).starmap(save_to_disk).compute()
print("Lazy loading chunks from", folder)
b = db.from_sequence(enumerate(sorted(glob('sparse_chunks/*.pkl'))))
b = b.starmap(load_from_disk)
Lazy loading chunks from sparse_chunks
%time b = b.persist()
CPU times: user 8 ms, sys: 0 ns, total: 8 ms Wall time: 9.53 ms
len(b.compute())
10
%%time
chunk_idx, X_0, y_0 = b.take(1)[0]
CPU times: user 16 ms, sys: 4 ms, total: 20 ms Wall time: 25.8 ms
chunk_idx
0
X_0
<100x100000 sparse matrix of type '<class 'numpy.float64'>' with 10000 stored elements in Compressed Sparse Row format>
np.mean((X_0.dot(true_coef).ravel() > 0) == y_0)
0.97999999999999998
from sklearn.linear_model import SGDClassifier
from sklearn.model_selection import train_test_split
from dask import delayed
CLASSES = np.array([0, 1])
def scan_fit(model, chunk):
return model.partial_fit(*chunk, classes=CLASSES)
def score(model, chunk):
return model.score(*chunk)
all_filenames = sorted(glob('sparse_chunks/*.pkl'))
train_filenames, test_filenames = train_test_split(
all_filenames, random_state=0)
model = SGDClassifier(loss='log', alpha=1e-3, penalty='elasticnet', tol=0)
for i in range(20):
for filename in train_filenames:
chunk = delayed(joblib.load)(filename)
model = delayed(scan_fit)(model, chunk)
scores = [delayed(score)(model, delayed(joblib.load)(filename))
for filename in test_filenames]
%time scores, model = dask.compute(scores, model)
np.mean(scores), np.std(scores), np.mean(model.coef_ != 0)
CPU times: user 760 ms, sys: 64 ms, total: 824 ms Wall time: 2.75 s
(0.53333333333333333, 0.032998316455372205, 0.46405999999999997)
model = SGDClassifier(loss='log', penalty='l1', max_iter=1)
def scan_fit(model, next_chunk):
chunk_idx, X, y = next_chunk
return model.partial_fit(X, y, classes=[0, 1])
b.accumulate(scan_fit, initial=model).to_delayed()[-1].compute(get=dask.get)[0]
--------------------------------------------------------------------------- TypeError Traceback (most recent call last) <ipython-input-57-4e7666067cab> in <module>() 5 return model.partial_fit(X, y, classes=[0, 1]) 6 ----> 7 b.accumulate(scan_fit, initial=model).to_delayed()[-1].compute(get=dask.get)[0] ~/code/dask/dask/base.py in compute(self, **kwargs) 96 Extra keywords to forward to the scheduler ``get`` function. 97 """ ---> 98 (result,) = compute(self, traverse=False, **kwargs) 99 return result 100 ~/code/dask/dask/base.py in compute(*args, **kwargs) 203 dsk = collections_to_dsk(variables, optimize_graph, **kwargs) 204 keys = [var._keys() for var in variables] --> 205 results = get(dsk, keys, **kwargs) 206 207 results_iter = iter(results) ~/code/dask/dask/local.py in get_sync(dsk, keys, **kwargs) 560 """ 561 kwargs.pop('num_workers', None) # if num_workers present, remove it --> 562 return get_async(apply_sync, 1, dsk, keys, **kwargs) 563 564 ~/code/dask/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs) 506 # Seed initial tasks into the thread pool 507 while state['ready'] and len(state['running']) < num_workers: --> 508 fire_task() 509 510 # Main loop, wait on tasks to finish, insert new ones ~/code/dask/dask/local.py in fire_task() 502 args=(key, dumps((dsk[key], data)), 503 dumps, loads, get_id, pack_exception), --> 504 callback=queue.put) 505 506 # Seed initial tasks into the thread pool ~/code/dask/dask/local.py in apply_sync(func, args, kwds, callback) 549 def apply_sync(func, args=(), kwds={}, callback=None): 550 """ A naive synchronous version of apply_async """ --> 551 res = func(*args, **kwds) 552 if callback is not None: 553 callback(res) ~/code/dask/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception) 293 failed = False 294 except BaseException as e: --> 295 result = pack_exception(e, dumps) 296 failed = True 297 return key, result, failed ~/code/dask/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception) 288 try: 289 task, data = loads(task_info) --> 290 result = _execute_task(task, data) 291 id = get_id() 292 result = dumps((result, id)) ~/code/dask/dask/local.py in _execute_task(arg, cache, dsk) 269 func, args = arg[0], arg[1:] 270 args2 = [_execute_task(a, cache) for a in args] --> 271 return func(*args2) 272 elif not ishashable(arg): 273 return arg ~/code/dask/dask/bag/core.py in accumulate_part(binop, seq, initial, is_first) 1273 res = list(accumulate(binop, seq)) 1274 else: -> 1275 res = list(accumulate(binop, seq, initial=initial)) 1276 if is_first: 1277 return res, res[-1] if res else [], initial ~/.virtualenvs/py36/lib/python3.6/site-packages/toolz/itertoolz.py in accumulate(binop, seq, initial) 56 itertools.accumulate : In standard itertools for Python 3.2+ 57 """ ---> 58 seq = iter(seq) 59 result = next(seq) if initial == no_default else initial 60 yield result TypeError: 'Future' object is not iterable