# ### Mission Planning & Execution Engineer @ Satellogic
# ### PyData Alicante 2019-12-16 @ CubeCut Software
# # ¿Quién soy yo?
#
#
#
# * **Ingeniero Aeronáutico** y pythonista autodidacta
# * **Mission Planning & Execution Engineer** en **Satellogic**
# * **Presidente de la Asociación Python España 🐍 🇪🇸**
# * **Colaborador** en proyectos de **Python Científico**: NumPy, SciPy, conda, astropy, memory-profiler...
# * **Profesor asociado** en el **Instituto Empresa** de Python para Big Data
# * Amante de la pizza y el hard rock 🤘
# # Resumen
#
#
# - Estado de la cuestión
# - Dask
#
# - Introducción
# - Evaluación perezosa
# - Grafos de operaciones
# - Demo
# - Limitaciones
#
# - Proyectos relacionados
# - Conclusiones y futuro
#
# ## _¿El principio del fin de PySpark?_
# # 1. Estado de la cuestión
#
# **Vuestro portátil: ~3.6 GHz**
#
# ![Clock speed](img/clock.jpg)
#
# https://en.wikipedia.org/wiki/File:Clock_CPU_Scaling.jpg
# ![Big Data](img/bigdata.png)
#
# # 2. Dask
#
# dask es una biblioteca de computación paralela orientada a la analítica. Está formada por dos componentes:
#
#
#
# 1. _Dynamic task scheduling_ optimizada para la computación.
# 2. Colecciones "Big Data" como arrays, DataFrames y listas paralelas, que mimetizan la forma de trabajar con NumPy, pandas o iteradores de Python para objetos más grandes que la memoria disponible o en entornos distribuidos. Estas colecciones funcionan sobre los _schedulers_.
# Es un proyecto joven pero tiene determinadas propiedades que lo hacen muy interesante, entre ellas:
#
# * **Familiar**: Dask replica la forma de trabajar con arrays de Numpy y DataFrames de pandas, así que la transición es mucho más sencilla que con otros sistemas.
# * **Flexible**: Se integra bien con otros proyectos y provee herramientas para paralelizar nuestras propias funciones.
# * **Nativo**: Es Python puro, no hay antipatrones ni comunicación con otros lenguajes.
# * **Escalable**: Dask funciona tanto en clusters de 1000 nodos como en portátiles normales, optimizando el uso de memoria.
# * **Amistoso**: Proporciona feedback inmediato y abundantes herramientas de diagnóstico.
#
# ![dask](img/collections-schedulers.png)
# ## Instalación
#
# La versión más reciente es la 2.9.0 (2019-12-06, ¡hace unos días!) y se puede instalar con pip:
#
# ```
# $ pip install dask[complete]
# ```
#
# o con conda:
#
# ```
# $ conda install dask
# ```
# ## Evaluación perezosa
#
# Vamos a hacer un ejemplo trivial con `dask.array` para comprobar cómo funciona la computación en dask.
# In[1]:
import numpy as np
import dask.array as da
# In[2]:
x = np.arange(1000)
y = da.from_array(x, chunks=100)
# In[3]:
y
# Si intentamos efectuar cualquier operación sobre estos arrays, no se ejecuta inmediatamente:
# In[4]:
op = y.mean()
op
# Dask en su lugar construye un grafo con todas las operaciones necesarias y sus dependencias para que podamos visualizarlo y razonar sobre él. Este grafo está almacenado en estructuras de datos corrientes de Python como diccionarios, listas y tuplas:
# In[5]:
y.dask.dicts
# In[6]:
op.dask.dicts
# Y podemos visualizarlo si tenemos instalada la biblioteca graphviz:
# In[7]:
op.visualize()
# Si queremos efectuar la operación, tendremos que llamar al método `.compute()`.
# In[8]:
op.compute()
# Si queremos convertir nuestro array original a array de NumPy, también se hace llamando a `compute()`:
# In[9]:
y.compute()
# ## Demo: Taxis de NYC
#
# Otra de las estructuras de datos que provee pandas son los DataFrames, que se comportan de la misma manera que los DataFrames de pandas.
#
#
#
# Para estudiar cómo funciona, vamos a descargar datos de trayectos de los taxis de New York:
# In[10]:
get_ipython().system('cat data/raw_data_urls.txt')
# In[11]:
get_ipython().system('du data/yellow*.csv -h -s')
# In[12]:
get_ipython().system('du data/ -h -s')
# In[13]:
get_ipython().system('cat data/download_raw_data.sh')
# Tanto `dask.dataframe` como `dask.array` usan un _scheduler_ por defecto basado en hilos. En su lugar, vamos a utilizar una clase `Client`, la que emplearíamos si estuviéramos en un cluster.
# In[14]:
import dask.dataframe as dd
# Esta clase `Client`, cuando se utiliza en local, lanza un scheduler que minimiza el uso de memoria y aprovecha todos los núcleos de la CPU.
#
# > "The dask single-machine schedulers have logic to execute the graph in a way that minimizes memory footprint." http://dask.pydata.org/en/latest/custom-graphs.html?highlight=minimizes%20memory#related-projects
#
# El servidor de diagnóstico está disponible en http://127.0.0.1:8787/.
# In[15]:
from dask.distributed import Client
client = Client()
client
# Y ahora leemos los `.csv` con un filtro todos a la vez en el mismo DataFrame de Dask:
# In[16]:
df = dd.read_csv("data/yellow*.csv",
parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])
# Que mimetiza la API de pandas:
# In[17]:
df.head()
# In[18]:
df.dtypes
# Vamos a calcular la longitud del DataFrame:
# In[19]:
# Esta operación bloquea el intérprete durante unos minutos
len(df)
# Como se puede observar, el uso de memoria está contenido y todas las CPUs están trabajando.
#
# ![len](img/len_df.png)
# También lo podemos hacer de manera asíncrona:
# In[20]:
futures = client.submit(len, df)
futures
# In[21]:
from distributed import progress
# In[22]:
progress(futures)
# Vamos ahora a calcular la distancia media recorrida en función del número de ocupantes. Igual que cuando usábamos `dask.array`, la operación no se efectúa automáticamente.
# In[23]:
op = df.groupby(df.passenger_count).trip_distance.mean()
op
# In[24]:
f2 = client.compute(op)
f2
# El método client.compute
almacena el resultado en un solo nodo, y por tanto debe usarse con cuidado. Para objetos grandes, es mejor usar client.persist
.
# In[25]:
progress(f2)
# In[26]:
f2.result()
# En este caso la visualización de la operación ya tiene una magnitud considerable:
# In[27]:
op.visualize()
# Más operaciones que se pueden hacer:
# In[28]:
df2 = df[(df.tip_amount > 0) & (df.fare_amount > 0)] # filter out bad rows
df2['tip_fraction'] = df2.tip_amount / df2.fare_amount # make new column
# In[29]:
hour = df2.groupby(df2.tpep_pickup_datetime.dt.hour).tip_fraction.mean()
hour
# In[30]:
f_hour = client.compute(hour)
# In[31]:
f_hour.result().plot()
# In[32]:
import pandas as pd
# In[33]:
payments = pd.DataFrame(
{
1: 'Credit Card',
2: 'Cash',
3: 'No Charge',
4: 'Dispute',
5: 'Unknown',
6: 'Voided trip'
}, index=["payment_name"]
).T
# In[34]:
df2 = df.merge(payments, left_on='payment_type', right_index=True)
# In[35]:
client.compute(df2.groupby(df2.payment_name).tip_amount.mean()).result()
# ## Limitaciones
#
# No todas las operaciones pueden ejecutarse leyendo parcialmente los datos en memoria.
# In[36]:
zero_tip = df2.tip_amount == 0
cash = df2.payment_name == 'Cash'
client.compute(dd.concat([zero_tip, cash], axis=1).corr()).result()
# ¿Qué significa este warning? En dask, algunas operaciones son sensibles al particionado http://dask.pydata.org/en/latest/dataframe-design.html#partitions así que tendremos que reindexar el DataFrame para que se alineen:
# In[37]:
df2.passenger_count.resample('1d').compute()
# In[38]:
df2.npartitions
# In[39]:
df2.divisions
# In[40]:
df3 = df2.set_index('tpep_pickup_datetime')
# In[41]:
df3.npartitions
# In[42]:
df3.divisions
# Las operaciones que requieren reordenar el dataset tienen consideraciones de rendimiento especiales y pueden no ser aptas para realizarse en local.
# In[43]:
daily_mean = df3.passenger_count.resample('1d').mean()
daily_mean
# In[44]:
# No ejecutar en local!
# daily_mean.compute().plot()
# # 3. Proyectos relacionados
#
# El ecosistema alrededor de Dask está creciendo a gran velocidad.
#
# * Modelos Lineales Generalizados https://github.com/dask/dask-glm
# * Algoritmos de Machine Learning implementados en Dask https://github.com/dask/dask-ml
# * Entrenamiento distribuido de XGBoost https://github.com/dask/dask-xgboost
# * Computación en tiempo real _à la Flink_ https://github.com/python-streamz/streamz
# # 4. Conclusiones
#
# * Dask tiene varias **ventajas**:
# - Trivial de instalar en local
# - Familiar para usuarios de NumPy y pandas
# - Mismo funcionamiento en un cluster
# * Y también algunas limitaciones:
# - Proyecto más joven
# - En local hay que tener cuidado (como era obvio)
# ## Algunos enlaces
#
# - Documentación https://docs.dask.org/
# - Dask.distributed https://distributed.dask.org/
# - Blog de desarrollo https://blog.dask.org/
#
#
#
# # `>>> print(¡Muchas gracias!)`
#
# ### hello@juanlu.space