#!/usr/bin/env python # coding: utf-8 # # Dask # # ## Computación distribuida en Python _à la Spark_ # # # # ### Juan Luis Cano Rodríguez # ### Mission Planning & Execution Engineer @ Satellogic # ### PyData Alicante 2019-12-16 @ CubeCut Software # # ¿Quién soy yo? # # # # * **Ingeniero Aeronáutico** y pythonista autodidacta # * **Mission Planning & Execution Engineer** en **Satellogic** # * **Presidente de la Asociación Python España 🐍 🇪🇸** # * **Colaborador** en proyectos de **Python Científico**: NumPy, SciPy, conda, astropy, memory-profiler... # * **Profesor asociado** en el **Instituto Empresa** de Python para Big Data # * Amante de la pizza y el hard rock 🤘 # # Resumen # #
    #
  1. Estado de la cuestión
  2. #
  3. Dask #
      #
    1. Introducción
    2. #
    3. Evaluación perezosa
    4. #
    5. Grafos de operaciones
    6. #
    7. Demo
    8. #
    9. Limitaciones
    10. #
  4. #
  5. Proyectos relacionados
  6. #
  7. Conclusiones y futuro
  8. #
# ## _¿El principio del fin de PySpark?_ # # 1. Estado de la cuestión # # **Vuestro portátil: ~3.6 GHz** # # ![Clock speed](img/clock.jpg) # # https://en.wikipedia.org/wiki/File:Clock_CPU_Scaling.jpg # ![Big Data](img/bigdata.png) # # # 2. Dask # # dask es una biblioteca de computación paralela orientada a la analítica. Está formada por dos componentes: # # # # 1. _Dynamic task scheduling_ optimizada para la computación. # 2. Colecciones "Big Data" como arrays, DataFrames y listas paralelas, que mimetizan la forma de trabajar con NumPy, pandas o iteradores de Python para objetos más grandes que la memoria disponible o en entornos distribuidos. Estas colecciones funcionan sobre los _schedulers_. # Es un proyecto joven pero tiene determinadas propiedades que lo hacen muy interesante, entre ellas: # # * **Familiar**: Dask replica la forma de trabajar con arrays de Numpy y DataFrames de pandas, así que la transición es mucho más sencilla que con otros sistemas. # * **Flexible**: Se integra bien con otros proyectos y provee herramientas para paralelizar nuestras propias funciones. # * **Nativo**: Es Python puro, no hay antipatrones ni comunicación con otros lenguajes. # * **Escalable**: Dask funciona tanto en clusters de 1000 nodos como en portátiles normales, optimizando el uso de memoria. # * **Amistoso**: Proporciona feedback inmediato y abundantes herramientas de diagnóstico. # # ![dask](img/collections-schedulers.png) # ## Instalación # # La versión más reciente es la 2.9.0 (2019-12-06, ¡hace unos días!) y se puede instalar con pip: # # ``` # $ pip install dask[complete] # ``` # # o con conda: # # ``` # $ conda install dask # ``` # ## Evaluación perezosa # # Vamos a hacer un ejemplo trivial con `dask.array` para comprobar cómo funciona la computación en dask. # In[1]: import numpy as np import dask.array as da # In[2]: x = np.arange(1000) y = da.from_array(x, chunks=100) # In[3]: y # Si intentamos efectuar cualquier operación sobre estos arrays, no se ejecuta inmediatamente: # In[4]: op = y.mean() op # Dask en su lugar construye un grafo con todas las operaciones necesarias y sus dependencias para que podamos visualizarlo y razonar sobre él. Este grafo está almacenado en estructuras de datos corrientes de Python como diccionarios, listas y tuplas: # In[5]: y.dask.dicts # In[6]: op.dask.dicts # Y podemos visualizarlo si tenemos instalada la biblioteca graphviz: # In[7]: op.visualize() # Si queremos efectuar la operación, tendremos que llamar al método `.compute()`. # In[8]: op.compute() # Si queremos convertir nuestro array original a array de NumPy, también se hace llamando a `compute()`: # In[9]: y.compute() # ## Demo: Taxis de NYC # # Otra de las estructuras de datos que provee pandas son los DataFrames, que se comportan de la misma manera que los DataFrames de pandas. # # # # Para estudiar cómo funciona, vamos a descargar datos de trayectos de los taxis de New York: # In[10]: get_ipython().system('cat data/raw_data_urls.txt') # In[11]: get_ipython().system('du data/yellow*.csv -h -s') # In[12]: get_ipython().system('du data/ -h -s') # In[13]: get_ipython().system('cat data/download_raw_data.sh') # Tanto `dask.dataframe` como `dask.array` usan un _scheduler_ por defecto basado en hilos. En su lugar, vamos a utilizar una clase `Client`, la que emplearíamos si estuviéramos en un cluster. # In[14]: import dask.dataframe as dd # Esta clase `Client`, cuando se utiliza en local, lanza un scheduler que minimiza el uso de memoria y aprovecha todos los núcleos de la CPU. # # > "The dask single-machine schedulers have logic to execute the graph in a way that minimizes memory footprint." http://dask.pydata.org/en/latest/custom-graphs.html?highlight=minimizes%20memory#related-projects # # El servidor de diagnóstico está disponible en http://127.0.0.1:8787/. # In[15]: from dask.distributed import Client client = Client() client # Y ahora leemos los `.csv` con un filtro todos a la vez en el mismo DataFrame de Dask: # In[16]: df = dd.read_csv("data/yellow*.csv", parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime']) # Que mimetiza la API de pandas: # In[17]: df.head() # In[18]: df.dtypes # Vamos a calcular la longitud del DataFrame: # In[19]: # Esta operación bloquea el intérprete durante unos minutos len(df) # Como se puede observar, el uso de memoria está contenido y todas las CPUs están trabajando. # # ![len](img/len_df.png) # También lo podemos hacer de manera asíncrona: # In[20]: futures = client.submit(len, df) futures # In[21]: from distributed import progress # In[22]: progress(futures) # Vamos ahora a calcular la distancia media recorrida en función del número de ocupantes. Igual que cuando usábamos `dask.array`, la operación no se efectúa automáticamente. # In[23]: op = df.groupby(df.passenger_count).trip_distance.mean() op # In[24]: f2 = client.compute(op) f2 #
El método client.compute almacena el resultado en un solo nodo, y por tanto debe usarse con cuidado. Para objetos grandes, es mejor usar client.persist.
# In[25]: progress(f2) # In[26]: f2.result() # En este caso la visualización de la operación ya tiene una magnitud considerable: # In[27]: op.visualize() # Más operaciones que se pueden hacer: # In[28]: df2 = df[(df.tip_amount > 0) & (df.fare_amount > 0)] # filter out bad rows df2['tip_fraction'] = df2.tip_amount / df2.fare_amount # make new column # In[29]: hour = df2.groupby(df2.tpep_pickup_datetime.dt.hour).tip_fraction.mean() hour # In[30]: f_hour = client.compute(hour) # In[31]: f_hour.result().plot() # In[32]: import pandas as pd # In[33]: payments = pd.DataFrame( { 1: 'Credit Card', 2: 'Cash', 3: 'No Charge', 4: 'Dispute', 5: 'Unknown', 6: 'Voided trip' }, index=["payment_name"] ).T # In[34]: df2 = df.merge(payments, left_on='payment_type', right_index=True) # In[35]: client.compute(df2.groupby(df2.payment_name).tip_amount.mean()).result() # ## Limitaciones # # No todas las operaciones pueden ejecutarse leyendo parcialmente los datos en memoria. # In[36]: zero_tip = df2.tip_amount == 0 cash = df2.payment_name == 'Cash' client.compute(dd.concat([zero_tip, cash], axis=1).corr()).result() # ¿Qué significa este warning? En dask, algunas operaciones son sensibles al particionado http://dask.pydata.org/en/latest/dataframe-design.html#partitions así que tendremos que reindexar el DataFrame para que se alineen: # In[37]: df2.passenger_count.resample('1d').compute() # In[38]: df2.npartitions # In[39]: df2.divisions # In[40]: df3 = df2.set_index('tpep_pickup_datetime') # In[41]: df3.npartitions # In[42]: df3.divisions #
Las operaciones que requieren reordenar el dataset tienen consideraciones de rendimiento especiales y pueden no ser aptas para realizarse en local.
# In[43]: daily_mean = df3.passenger_count.resample('1d').mean() daily_mean # In[44]: # No ejecutar en local! # daily_mean.compute().plot() # # 3. Proyectos relacionados # # El ecosistema alrededor de Dask está creciendo a gran velocidad. # # * Modelos Lineales Generalizados https://github.com/dask/dask-glm # * Algoritmos de Machine Learning implementados en Dask https://github.com/dask/dask-ml # * Entrenamiento distribuido de XGBoost https://github.com/dask/dask-xgboost # * Computación en tiempo real _à la Flink_ https://github.com/python-streamz/streamz # # 4. Conclusiones # # * Dask tiene varias **ventajas**: # - Trivial de instalar en local # - Familiar para usuarios de NumPy y pandas # - Mismo funcionamiento en un cluster # * Y también algunas limitaciones: # - Proyecto más joven # - En local hay que tener cuidado (como era obvio) # ## Algunos enlaces # # - Documentación https://docs.dask.org/ # - Dask.distributed https://distributed.dask.org/ # - Blog de desarrollo https://blog.dask.org/ # # # # # `>>> print(¡Muchas gracias!)` # # ### hello@juanlu.space