Данный тьюториал содержит краткий обзор библиотеки Dask и более подробное описание возможностей dask.dataframe.
Dask - библиотека Python для параллельных вычислений. Работает как на одной машине, максимально используя доступные вычислительные ресурсы, так и на кластере до 1000 ядер. Однако, как заметил разработчик Dask Matthew Rocklin: "Медианный размер кластера Dask - 1 компьютер".
В отдельные проекты выделены:
Dask.dataframe - это распределенный pandas.DataFrame. Если Dask.dataframe не помещается в память, то в RAM последовательно подгружаются соответствующие объему памяти части, а "излишки" хранятся на диске.
Проблема №1: данные должны помещаться в память
Интерфейс dask.dataframe аналогичен pandas:
#pandas #dask
import pandas as pd import dask.dataframe as dd
df = pd.read_csv('2015-01-01.csv') df = dd.read_csv('2015-*-*.csv')
df.groupby(df.user_id).value.mean() df.groupby(df.user_id).value.mean().compute()
import gc
import glob
import dask
import dask.dataframe as dd
import numpy as np
import pandas as pd
Имеется 2 файла:
!ls data/*.csv
Считывать будем только 4 столбца: VendorID
, tpep_pickup_datetime
, passenger_count
, total_amount
params = dict(header=0,
usecols = [0, 1, 3, 16],
dtype = {'1': 'datetime64'},
#небольшой костыль для корректного считывания данных
converters = {'Passenger_count': (lambda x: round(float(x), 0) // 1 if (x != 'NaN' or len(x) <= 5) else 0),
'Total_amount': (lambda x: float(x) if (x != 'NaN' or len(x) <= 5) else 0)}
)
pandas
%%time
pandas_df = pd.read_csv('data/yellow_tripdata_2017-12.csv', **params)
pandas_df.head()
dask
%%time
dask_df = dd.read_csv('data/yellow_tripdata_2017-12.csv', **params)
dask_df.head()
Dask справился значительно быстрее, потому что pandas сначала считывает файл и выводит первые 5, а dask считывает 5 строк и сразу их выводит.
Однако, когда файл помещается в оперативную память, pandas с уже загруженными данными серьезно превосходит dask, работающий по "ленивому" принципу - вычисления и обработка данных происходят непосредственно при вызове метода. Реализация "ленивого" подхода, в принципе, характерна для ресурсоемких операций. Особенно, когда дело касается "настоящей бигдаты".
Следим за использованием памяти, удаляем ненужные объекты, собираем мусор:
del pandas_df, dask_df
gc.collect()
dask
%%time
dask_df2 = dd.read_csv('data/*.csv', **params)
dask_df2.head()
pandas
%%time
pandas_df2 = pd.concat([pd.read_csv(fn, **params) for fn in glob.glob('data/*.csv')])
pandas_df2.head()
Учитывая, что загружаемые файлы примерно одинакового размера (~800 Mb), видим, что время обработки увеличилось линейно. Очевидно, если грузить реально большой файл(-ы), pandas рано или поздно упрётся в лимит RAM.
Для устранения этого неудобства можно просто преобразовать pandas.DataFrame в dask.datafram и считать всеми имеющимися ядрами. Автоматически, без дополнительного кода и настроек.
Используем pandas_df2 из предыдущего примера:
%%time
dask_df3 = dd.from_pandas(pandas_df2, npartitions=2, chunksize=None)
pandas'овский датафрейм просто переопределим для нумерации датафреймов:
%%time
pandas_df3 = pandas_df2
Уборка:
del pandas_df2
gc.collect()
Рассмотрим несколько примеров, наглядно демонстрирующих: с помощью dask можно значительно ускорить обработку данных.
Обратите внимание на метод compute()
при обработке dask датафрейма - это как раз команда "посчитать". Без нее "ленивый" dask лишь определит, что нужно будет сделать непосредственно при запросе пользователя.
%%time
pandas_df3['total_amount'].max()
%%time
dask_df3['total_amount'].max().compute()
%%time
pandas_df3['passenger_count'].value_counts()
%%time
pandas_df3['passenger_count'].value_counts()
%%time
pandas_df3.groupby(by='VendorID')['passenger_count'].sum()
%%time
dask_df3.groupby(by='VendorID')['passenger_count'].sum().compute()
Очевидно, dask, автоматически используя доступные ресурсы, работает быстрее pandas даже при простых операциях.
Dask.dataframe API является частью Pandas API, но не является его полной копией - следует знать о некоторых ограничениях, например:
Dask - простой и мощный инструмент для чтения больших файлов и обработки данных. Использвание dask.dataframe позволяет максимально использовать ресурсы компьютера без дополнительного кода и настроек.
dask.dataframe рекомендуется использовать, когда:
Использование dask.dataframe не рекомендуется, когда: