This notebook highlights some key differences when transfering code from Pandas
to run in a Dask
environment.
Most issues have a link to the Dask documentation for additional information.
# since Dask is activly beeing developed - the current example is running with the below version
import dask
import dask.dataframe as dd
import pandas as pd
print(f'Dask version: {dask.__version__}')
print(f'Pandas version: {pd.__version__}')
Starting the Dask Client is optional. In this example we are running on a LocalCluster
, this will also provide a dashboard which is useful to gain insight on the computation.
For additional information on Dask Client see documentation
The link to the dashboard will become visible when you create a client (as shown below).
When running within Jupyter Lab
an extenstion can be installed to view the various dashboard widgets.
from dask.distributed import Client
# client = Client(n_workers=1, threads_per_worker=4, processes=False, memory_limit='2GB')
client = Client()
client
Dask comes with builtin dataset samples, we will use this sample for our example.
ddf = dask.datasets.timeseries()
ddf
Dask framework
is lazy thus in order to see the result we need to run compute()(or head()
which runs under the hood compute()) )
ddf.head(2)
In order to create a Pandas
dataframe we can use the compute()
method from a Dask dataframe
pdf = ddf.compute()
print(type(pdf))
pdf.head(2)
We can also see dask laziness when using the shape attribute
print(f'Pandas shape: {pdf.shape}')
print('---------------------------')
print(f'Dask lazy shape: {ddf.shape}')
We cannot get the full shape before accessing all the partitions - running len
will do so
print(f'Dask computed shape: {len(ddf.index):,}') # expensive
Dask dataframe
from Pandas
¶In order to utilize Dask
capablities on an existing Pandas dataframe
(pdf) we need to convert the Pandas dataframe
into a Dask dataframe
(ddf) with the from_pandas method.
You must supply the number of partitions or chunksize that will be used to generate the dask dataframe
ddf2 = dask.dataframe.from_pandas(pdf, npartitions=10)
ddf2
Notice that when we created a Dask dataframe
we needed to supply an argument of npartitions
.
The number of partitions will assist Dask
on how to breakup the Pandas Datafram
and parallelize the computation.
Each partition is a separate dataframe. For additional information see partition documentation
An example for this can be seen when examing the reset_ index()
method:
pdf2 = pdf.reset_index()
# Only 1 row
pdf2.loc[0]
ddf2 = ddf2.reset_index()
# each partition has an index=0
ddf2.loc[0].compute()
Now that we have a dask
(ddf) and a pandas
(pdf) dataframe we can start to compair the interactions with them.
Dask does not update - thus there are no arguments such as inplace=True
which exist in Pandas.
For more detials see issue#653 on github
inplace=True
is not considerd to be best practice.# Pandas
print(pdf.columns)
# pdf.rename(columns={'id':'ID'}, inplace=True)
pdf = pdf.rename(columns={'id':'ID'})
pdf.columns
# Dask
print(ddf.columns)
ddf = ddf.rename(columns={'id':'ID'})
ddf.columns
There are several differences when manipulating data.
cond_pdf = (pdf['x']>0.5) & (pdf['x']<0.8)
pdf.loc[cond_pdf, ['y']] = pdf['y']* 100
pdf[cond_pdf].head(2)
cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)
ddf.loc[cond_ddf, ['y']] = ddf['y']* 100
ddf[cond_ddf].head(2)
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Input In [14], in <cell line: 2>()
1 cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)
----> 2 ddf.loc[cond_ddf, ['y']] = ddf['y']* 100
3 ddf[cond_ddf].head(2)
TypeError: '_LocIndexer' object does not support item assignment
# Pandas
pdf['y'] = pdf['y'].mask(cond=cond_pdf, other=pdf['y']* 100)
pdf.head(2)
#Dask
cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)
ddf['y'] = ddf['y'].mask(cond=cond_ddf, other=ddf['y']* 100)
ddf.head(2)
For more information see dask mask documentation
One key feature in Dask
is the introduction of meta
arguement.
meta
is the prescription of the names/types of the output from the computation
Since Dask
creates a DAG for the computation, it requires to understand what are the outputs of each calculation stage.
For additinal information see meta documentation
pdf['initials'] = pdf['name'].apply(lambda x: x[0]+x[1])
pdf.head(2)
# Dask - Warning
ddf['initials'] = ddf['name'].apply(lambda x: x[0]+x[1])
ddf.head(2)
# Describe the outcome type of the calculation
meta_arg = pd.Series(object, name='initials')
ddf['initials'] = ddf['name'].apply(lambda x: x[0]+x[1], meta=meta_arg)
ddf.head(2)
# similar when using a function
def func(row):
if (row['x']> 0):
return row['x'] * 1000
else:
return row['y'] * -1
ddf['z'] = ddf.apply(func, axis=1, meta=('z', 'float'))
ddf.head(2)
Mainly useful for functions that are not implemented in Dask
or Pandas
.
dataframe
which needs to be described in the meta
argumentThe function could also include arguments.
import numpy as np
def func2(df, coor_x, coor_y, drop_cols):
df['dist'] = np.sqrt ( (df[coor_x] - df[coor_x].shift())**2
+ (df[coor_y] - df[coor_y].shift())**2 )
return df.drop(drop_cols, axis=1)
ddf2 = ddf.map_partitions(func2
, coor_x='x'
, coor_y='y'
, drop_cols=['initials', 'z']
, meta=pd.DataFrame({'ID':'i8'
, 'name':str
, 'x':'f8'
, 'y':'f8'
, 'dist':'f8'}, index=[0]))
ddf2.head()
# Only Pandas
pdf = pdf.assign(times=pd.to_datetime(pdf.index).time)
pdf.head(2)
# Dask or Pandas
ddf = ddf.assign(times=ddf.index.astype('M8[ns]'))
# or ddf = ddf.assign(Time= dask.dataframe.to_datetime(ddf.index, format='%Y-%m-%d'). )
ddf['times'] = ddf['times'].dt.time
ddf =client.persist(ddf)
ddf.head(2)
# no issue with regular drop columns
pdf = pdf.drop(labels=['initials'],axis=1)
ddf = ddf.drop(labels=['initials','z'],axis=1)
# Pandas
pdf = pdf.assign(colna = None)
# Dask
ddf = ddf.assign(colna = None)
pdf = pdf.dropna(axis=1, how='all')
pdf.head(2)
In odrer for Dask
to drop a column with all na
it must check all the partitions with compute()
if ddf.colna.isnull().all().compute() == True: # check if all values in column are Null - expensive
ddf = ddf.drop(labels=['colna'],axis=1)
ddf.head(2)
# Pandas
pdf =pdf.reset_index(drop=True)
pdf.head(2)
# Dask
ddf = ddf.reset_index()
ddf = ddf.drop(labels=['timestamp'], axis=1 )
ddf.head(2)
pandas
and dask
preferable use parquet format.Dask
- files can be read with multiple workers .kwargs
are applicable for reading and writing filese.g. ddf = dd.read_csv('data/pd2dd/ddf*.csv', compression='gzip', header=False).
nrows
.see documentaion (including the option for output file naming).
from pathlib import Path
output_dir_file = Path('data/pdf_single_file.csv')
output_dir_file.parent.mkdir(parents=True, exist_ok=True)
%%time
# Pandas
pdf.to_csv(output_dir_file)
list(output_dir_file.parent.glob('*.csv'))
Notice the '*'
to allow for multiple file renaming.
output_dask_dir = Path('data/dask_multi_files/')
output_dask_dir.mkdir(parents=True, exist_ok=True)
%%time
# Dask
ddf.to_csv(f'{output_dask_dir}/ddf*.csv', index = False)
To find the number of partitions which will determine the number of output files use dask.dataframe.npartitions
To change the number of output files use repartition which is an expensive operation.
ddf.npartitions
For pandas
it is possible to iterate and concat the files see answer from stack overflow.
%%time
# Pandas
concat_df = pd.concat([pd.read_csv(f)
for f in list(output_dask_dir.iterdir())])
len(concat_df)
%%time
# Dask
_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')
_ddf
Remember that Dask
is lazy - thus it does not realy read the file until it needs to...
%%time
_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')
len(_ddf)
Since Dask is lazy - it may run the entire graph/DAG (again) even if it already run part of the calculation in a previous cell. Thus use persist to keep the results in memory
Additional information can be read in this stackoverflow issue or see an example in this post
This concept should also be used when running a code within a script (rather then a jupyter notebook) which incoperates loops within the code.
# e.g.
_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')
# do some filter
_ddf = client.persist(_ddf)
# do some computations
_ddf.head(2)
In addition to the groupby notebook example that is in the repository -
This is another example how to try to eliminate the use of groupby.apply
.
In this example we are grouping columns into unique lists.
# prepare pandas dataframe
pdf = pdf.assign(time=pd.to_datetime(pdf.index).time)
pdf['seconds'] = pdf.time.astype(str).str[-2:]
cols_for_demo =['name', 'ID','seconds']
pdf[cols_for_demo].head()
pdf_gb = pdf.groupby(pdf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [pdf_gb[att_col_gr].apply
(lambda x: list(set(x.to_list())))
for att_col_gr in gp_col]
%%time
df_edge_att = pdf_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
df_edge_att = df_edge_att.join(ser.to_frame(), how='left')
print(df_edge_att.head(2))
Pandas
is more efficiante (assuming that you can load all the data into the RAM).def set_list_att(x: dd.Series):
return list(set([item for item in x.values]))
ddf['seconds'] = ddf.times.astype(str).str[-2:]
ddf = client.persist(ddf)
ddf[cols_for_demo].head(2)
ddf.columns
df_gb = ddf.groupby(ddf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [df_gb[att_col_gr].apply(set_list_att
,meta=pd.Series(dtype='object', name=f'{att_col_gr}_att'))
for att_col_gr in gp_col]
%%time
df_edge_att = df_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
df_edge_att = df_edge_att.join(ser.to_frame(), how='left')
df_edge_att.head(2)
We can do better...
Using dask custom aggregation is consideribly better
import itertools
custom_agg = dd.Aggregation(
'custom_agg',
lambda s: s.apply(set),
lambda s: s.apply(lambda chunks: list(set(itertools.chain.from_iterable(chunks)))),)
%%time
df_gb = ddf.groupby(ddf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [df_gb[att_col_gr].agg(custom_agg) for att_col_gr in gp_col]
df_edge_att = df_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
df_edge_att = df_edge_att.join(ser.to_frame(), how='left')
df_edge_att.head(2)
In this example we show that once the DAG is currupted you may need to reset the calculation
# reset dataframe
ddf = dask.datasets.timeseries()
ddf.head(1)
def func_dist2(df, coor_x, coor_y):
dist = np.sqrt ( (df[coor_x] - df[coor_x].shift())^2 # `^` <-- wrong syntax
+ (df[coor_y] - df[coor_y].shift())^2 ) # `^` <-- wrong syntax
return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
, meta=('float'))
Is everything OK?
# Still results with an error
def func_dist2(df, coor_x, coor_y):
dist = np.sqrt ( (df[coor_x] - df[coor_x].shift())**2 # `**` <-- correct syntax
+ (df[coor_y] - df[coor_y].shift())**2 ) # `**` <-- correct syntax
return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
, meta=('float'))
We need to reset the dataframe
ddf = dask.datasets.timeseries()
def func_dist2(df, coor_x, coor_y):
dist = np.sqrt ( (df[coor_x] - df[coor_x].shift())**2 #corrected math function
+ (df[coor_y] - df[coor_y].shift())**2 )
return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
, meta=('float'))
ddf.head(2)