# install necessary libraries
#!pip install dask_cudf
#!pip install dask_ml
#!pip install cuml --upgrade
import cuml
# read data as Dask df
from dask.distributed import Client, progress, wait
import dask.dataframe as dd
client = Client()
client
workers = client.has_what().keys()
n_workers = len(workers)
n_streams = 8 # Performance optimization
Refer to Dask Dataframe API documentation for various data processing operations: https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe Note that we are udin
import dask_cudf
import numpy as np
import cudf
file = '/data/day_15'
header = ['col'+str(i) for i in range (1,41)] #note that according to criteo, the first column in the dataset is Click Through (CT). Consist of 40 columns
gdf_original = dask_cudf.read_csv(file, delimiter='\t', names=header)
client
Client
|
Cluster
|
client.run(cudf.set_allocator, "managed") # Uses managed memory instead of "default"
{'tcp://172.17.1.221:35641': None, 'tcp://172.17.1.232:46093': None, 'tcp://172.17.2.23:35103': None}
gdf_original.head()
col1 | col2 | col3 | col4 | col5 | col6 | col7 | col8 | col9 | col10 | ... | col31 | col32 | col33 | col34 | col35 | col36 | col37 | col38 | col39 | col40 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 0 | 2.0 | 9.0 | <NA> | 1.0 | <NA> | 0.0 | 0.0 | 3 | 1 | ... | 1f7fc70b | b8170bba | 9512c20b | 31a9f3b3 | 228aee9b | b74c6548 | 59f9dd38 | 165fbf32 | 0b3c06d0 | 2ccea557 |
1 | 0 | 12.0 | 166.0 | 3.0 | 3.0 | <NA> | 1.0 | 0.0 | 1 | 3 | ... | d20856aa | b6bc86c5 | 108a0699 | e7ef7c20 | 113b1789 | 670bb82a | 0c427c16 | fc6fc912 | 991321ea | 2997ef88 |
2 | 0 | 1.0 | 66.0 | <NA> | <NA> | <NA> | <NA> | <NA> | 2 | 0 | ... | 753da5f3 | b8170bba | 9512c20b | 1a0af648 | 13b96cbc | 3f2bae22 | 209c86ee | 165fbf32 | ff654802 | 2ccea557 |
3 | 0 | 1.0 | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | 2 | 1 | ... | 1f7fc70b | b8170bba | 7a7178b2 | 0da1444b | cf12754e | af22e988 | c483d0dd | 75350c8a | 57e36578 | ed10571d |
4 | 0 | 2.0 | <NA> | 4.0 | 4.0 | <NA> | 7.0 | 0.0 | 59 | 4 | ... | d20856aa | a1eb1511 | 9512c20b | 44fa1260 | c59d0ef0 | c41079d6 | 38d2af52 | 37dcf7a2 | ff654802 | b757e957 |
5 rows × 40 columns
gdf_sliced = gdf_original.iloc[:, 0:14]
# gdf_sliced_small = gdf_sliced.sample(frac=0.1)
gdf_sliced_small = gdf_sliced
gdf_sliced.dtypes
col1 int64 col2 float64 col3 float64 col4 float64 col5 float64 col6 float64 col7 float64 col8 float64 col9 int64 col10 int64 col11 float64 col12 float64 col13 float64 col14 float64 dtype: object
gdf_sliced_small.head()
col1 | col2 | col3 | col4 | col5 | col6 | col7 | col8 | col9 | col10 | col11 | col12 | col13 | col14 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 0 | 2.0 | 9.0 | <NA> | 1.0 | <NA> | 0.0 | 0.0 | 3 | 1 | 0.0 | <NA> | 1036.0 | <NA> |
1 | 0 | 12.0 | 166.0 | 3.0 | 3.0 | <NA> | 1.0 | 0.0 | 1 | 3 | 1.0 | <NA> | 28.0 | 3.0 |
2 | 0 | 1.0 | 66.0 | <NA> | <NA> | <NA> | <NA> | <NA> | 2 | 0 | <NA> | <NA> | 1211.0 | <NA> |
3 | 0 | 1.0 | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | 2 | 1 | <NA> | <NA> | 8.0 | <NA> |
4 | 0 | 2.0 | <NA> | 4.0 | 4.0 | <NA> | 7.0 | 0.0 | 59 | 4 | 1.0 | <NA> | 378.0 | 4.0 |
from cuml.dask.ensemble import RandomForestClassifier as cumlDaskRF
from cuml.dask.common import utils as dask_utils
Refer to Official Dask Documentation for Best Practices on repartitioning your Dask Dataframe: https://docs.dask.org/en/latest/dataframe-best-practices.html#repartition-to-reduce-overhead
# You should aim for partitions that have around 100MB of data each.
gdf_sliced_small = gdf_sliced_small.astype(np.float32).repartition(npartitions=450)
# gdf = gdf.persist() # if on a distributed system
gdf_sliced_small = gdf_sliced_small.fillna(0)
gdf_sliced_small.dtypes
col1 float32 col2 float32 col3 float32 col4 float32 col5 float32 col6 float32 col7 float32 col8 float32 col9 float32 col10 float32 col11 float32 col12 float32 col13 float32 col14 float32 dtype: object
# split data into training and Y
Y = gdf_sliced_small.pop('col1') # first column is binary (click or not)
Y = Y.astype(np.int32)
Y
<dask_cudf.Series | 2508 tasks | 450 npartitions>
%%time
# Random Forest building parameters
n_streams = 8 # optimization
max_depth = 10
n_bins = 16
n_trees = 10
cuml_model = cumlDaskRF(max_depth=max_depth, n_estimators=n_trees, n_bins=n_bins, n_streams=n_streams, verbose=True, client=client)
cuml_model.fit(gdf_sliced_small, Y)
CPU times: user 4.1 s, sys: 825 ms, total: 4.92 s Wall time: 50min 46s
<cuml.dask.ensemble.randomforestclassifier.RandomForestClassifier at 0x7fbb7a88a250>
# split data into gdf_test and test_y for testing set
gdf_test = gdf_original.loc['0':'6000']
gdf_test = gdf_test.iloc[:, 0:14]
gdf_test = gdf_test.astype(np.float32).repartition(npartitions=450)
gdf_test = gdf_test.fillna(0)
test_y = gdf_test.pop('col1') # first column is binary (click or not)
test_y = test_y.astype(np.int32)
gdf_test
col2 | col3 | col4 | col5 | col6 | col7 | col8 | col9 | col10 | col11 | col12 | col13 | col14 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
npartitions=450 | |||||||||||||
float32 | float32 | float32 | float32 | float32 | float32 | float32 | float32 | float32 | float32 | float32 | float32 | float32 | |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
%%time
# Model prediction
pred_df = cuml_model.predict(gdf_sliced_small)
# converting from Dask cuDF Series to NumPy array
pred_df = pred_df.compute().to_array()
pred_df
# converting from Dask cuDF Series to NumPy array
Y = Y.compute().to_array()
Y
from sklearn import metrics
# Model Accuracy
print("Accuracy:",metrics.accuracy_score(Y, pred_df))