This notebook gives you a short introduction on how to use Dask to parallelize model training, particularly if you have multiple learning tasks on which you want to train individual models for.
For brevity, I will not be elaborating on the exact machine learning task here, but focus on the idioms that we need to use Dask for this task.
%load_ext autoreload
%autoreload 2
%matplotlib inline
%config InlineBackend.figure_format = 'retina'
from dask.distributed import LocalCluster, Client
import numpy as np
import pandas as pd
import janitor
Here, we instantiate a Dask cluster
(this is only a LocalCluster
, but other cluster types can be created too, such as an SGECluster
or KubeCluster
. We then connect a client
to the cluster.
client = Client()
/home/ericmjl/anaconda/envs/minimal-panel/lib/python3.7/site-packages/distributed/dashboard/core.py:72: UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the diagnostics dashboard on a random port instead. warnings.warn("\n" + msg)
We will now preprocess our data and get it into a shape for machine learning.
from utils import molecular_weights, featurize_sequence_
drugs = ['ATV', 'DRV', 'FPV', 'IDV', 'LPV', 'NFV', 'SQV', 'TPV']
data = (
pd.read_csv("data/hiv-protease-data-expanded.csv", index_col=0)
.query("weight == 1.0")
.transform_column("sequence", lambda x: len(x), "seq_length")
.query("seq_length == 99")
.transform_column("sequence", featurize_sequence_, "features")
.transform_columns(drugs, np.log10)
)
features = pd.DataFrame(np.vstack(data['features'])).set_index(data.index)
data.head(3)
ATV | DRV | FPV | IDV | LPV | NFV | SQV | SeqID | TPV | seqid | sequence | sequence_object | weight | seq_length | features | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
6 | 1.50515 | NaN | 0.477121 | 1.544068 | 1.50515 | 1.462398 | 2.214844 | 4426 | NaN | 4426-0 | PQITLWQRPIVTIKIGGQLKEALLDTGADDTVLEEMNLPGKWKPKM... | ID: 4426-0\nName: <unknown name>\nDescription:... | 1.0 | 99 | [[115.131, 146.1451, 131.1736, 119.1197, 131.1... |
7 | NaN | NaN | 0.176091 | 0.000000 | NaN | 0.342423 | 0.041393 | 4432 | NaN | 4432-0 | PQITLWQRPLVTVKIGGQLKEALLDTGADDTVLEEMNLPGRWKPKM... | ID: 4432-0\nName: <unknown name>\nDescription:... | 1.0 | 99 | [[115.131, 146.1451, 131.1736, 119.1197, 131.1... |
14 | NaN | NaN | 0.491362 | 0.939519 | NaN | 1.505150 | 1.227887 | 4664 | NaN | 4664-0 | PQITLWQRPIVTIKVGGQLIEALLDTGADDTVLEEINLPGRWKPKM... | ID: 4664-0\nName: <unknown name>\nDescription:... | 1.0 | 99 | [[115.131, 146.1451, 131.1736, 119.1197, 131.1... |
features.head(3)
0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | ... | 89 | 90 | 91 | 92 | 93 | 94 | 95 | 96 | 97 | 98 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
6 | 115.131 | 146.1451 | 131.1736 | 119.1197 | 131.1736 | 204.2262 | 146.1451 | 174.2017 | 115.131 | 131.1736 | ... | 131.1736 | 119.1197 | 146.1451 | 131.1736 | 75.0669 | 121.159 | 119.1197 | 131.1736 | 132.1184 | 165.19 |
7 | 115.131 | 146.1451 | 131.1736 | 119.1197 | 131.1736 | 204.2262 | 146.1451 | 174.2017 | 115.131 | 131.1736 | ... | 131.1736 | 119.1197 | 146.1451 | 131.1736 | 75.0669 | 121.159 | 119.1197 | 131.1736 | 132.1184 | 165.19 |
14 | 115.131 | 146.1451 | 131.1736 | 119.1197 | 131.1736 | 204.2262 | 146.1451 | 174.2017 | 115.131 | 131.1736 | ... | 149.2124 | 119.1197 | 146.1451 | 131.1736 | 75.0669 | 121.159 | 119.1197 | 131.1736 | 132.1184 | 165.19 |
3 rows × 99 columns
When writing code to interface with Dask, a functional paradigm is often preferred. Hence, we will write the procedures that are needed inside functions that can be submitted by the client
to the cluster
.
from utils import featurize_sequence_, fit_model, cross_validate, predict
Now, we'll scatter the data around the workers. dataf
is named as such because this is the "data futures", a "promise" to the workers that data
will exist for them and that they can access it. Likewise for featuresf
.
dataf = client.scatter(data)
featuresf = client.scatter(features)
Now, we fit the models, and collect their cross-validated scores.
models = dict()
scores = dict()
for drug in drugs:
models[drug] = client.submit(fit_model, dataf, featuresf, drug)
scores[drug] = client.submit(cross_validate, dataf, featuresf, drug)
models = client.gather(models)
Finally, let's save the models. To save space on disk, we will pickle and gzip them.
import pickle as pkl
import gzip
for name, model in models.items():
with gzip.open(f"data/models/{name}.pkl.gz", 'wb') as f:
pkl.dump(model, f)
scores = client.gather(scores)
with gzip.open("data/scores.pkl.gz", "wb") as f:
pkl.dump(scores, f)