¿Quién soy yo?

  • Ingeniero Aeron谩utico 馃帀
  • Data Scientist Formador de Python 馃悕 y Big Data en Synergic Partners
  • Presidente de la Asociaci贸n Python Espa帽a 馃嚜馃嚫
  • Contribuidor ocasional de NumPy, SciPy...
  • Conferencista en PyCon y PyData
  • Amante de la pizza y el hard rock 馃

Resumen

  1. Estado de la cuesti贸n
  2. Dask
    1. Introducci贸n
    2. Evaluaci贸n perezosa
    3. Grafos de operaciones
    4. Demo
    5. Limitaciones
  3. Proyectos relacionados
  4. Conclusiones y futuro

¿El principio del fin de PySpark?

1. Estado de la cuestión

Vuestro port谩til: ~3 GHz

Clock speed

https://en.wikipedia.org/wiki/File:Clock_CPU_Scaling.jpg

Big Data

2. Dask

dask es una biblioteca de computaci贸n paralela orientada a la anal铆tica. Est谩 formada por dos componentes:

  1. Dynamic task scheduling optimizada para la computaci贸n.
  2. Colecciones "Big Data" como arrays, DataFrames y listas paralelas, que mimetizan la forma de trabajar con NumPy, pandas o iteradores de Python para objetos m谩s grandes que la memoria disponible o en entornos distribuidos. Estas colecciones funcionan sobre los schedulers.

Es un proyecto muy joven pero tiene determinadas propiedades que lo hacen muy interesante, entre ellas:

  • Familiar: Dask replica la forma de trabajar con arrays de Numpy y DataFrames de pandas, as铆 que la transici贸n es mucho m谩s sencilla que con otros sistemas.
  • Flexible: Se integra bien con otros proyectos y provee herramientas para paralelizar nuestras propias funciones.
  • Nativo: Es Python puro, no hay antipatrones ni comunicaci贸n con otros lenguajes.
  • Escalable: Dask funciona tanto en clusters de 1000 nodos como en port谩tiles normales, optimizando el uso de memoria.
  • Amistoso: Proporciona feedback inmediato y abundantes herramientas de diagn贸stico.

dask

Evaluación perezosa

Vamos a hacer un ejemplo trivial con dask.array para comprobar c贸mo funciona la computaci贸n en dask.

In [1]:
%load_ext version_information
%version_information numpy,pandas,dask
Out[1]:
SoftwareVersion
Python3.6.1 64bit [GCC 4.8.2 20140120 (Red Hat 4.8.2-15)]
IPython6.0.0
OSLinux 4.4.0 72 generic x86_64 with debian stretch sid
numpy1.12.1
pandas0.19.2
dask0.14.1
Tue Apr 25 15:50:24 2017 CEST
In [2]:
import numpy as np
import dask.array as da
In [3]:
x = np.arange(1000)
y = da.from_array(x, chunks=100)
In [4]:
y
Out[4]:
dask.array<array, shape=(1000,), dtype=int64, chunksize=(100,)>

Si intentamos efectuar cualquier operaci贸n sobre estos arrays, no se ejecuta inmediatamente:

In [5]:
op = y.mean()
op
Out[5]:
dask.array<mean_agg-aggregate, shape=(), dtype=float64, chunksize=()>

Dask en su lugar construye un grafo con todas las operaciones necesarias y sus dependencias para que podamos visualizarlo y razonar sobre 茅l. Este grafo est谩 almacenado en estructuras de datos corrientes de Python como diccionarios, listas y tuplas:

In [6]:
y.dask.dicts
Out[6]:
{'array-72f4a9d7e59d592525f97b4055ecfbd1': {('array-72f4a9d7e59d592525f97b4055ecfbd1',
   0): (<function dask.array.core.getarray>,
   'array-original-72f4a9d7e59d592525f97b4055ecfbd1',
   (slice(0, 100, None),)),
  ('array-72f4a9d7e59d592525f97b4055ecfbd1',
   1): (<function dask.array.core.getarray>, 'array-original-72f4a9d7e59d592525f97b4055ecfbd1', (slice(100, 200, None),)),
  ('array-72f4a9d7e59d592525f97b4055ecfbd1',
   2): (<function dask.array.core.getarray>, 'array-original-72f4a9d7e59d592525f97b4055ecfbd1', (slice(200, 300, None),)),
  ('array-72f4a9d7e59d592525f97b4055ecfbd1',
   3): (<function dask.array.core.getarray>, 'array-original-72f4a9d7e59d592525f97b4055ecfbd1', (slice(300, 400, None),)),
  ('array-72f4a9d7e59d592525f97b4055ecfbd1',
   4): (<function dask.array.core.getarray>, 'array-original-72f4a9d7e59d592525f97b4055ecfbd1', (slice(400, 500, None),)),
  ('array-72f4a9d7e59d592525f97b4055ecfbd1',
   5): (<function dask.array.core.getarray>, 'array-original-72f4a9d7e59d592525f97b4055ecfbd1', (slice(500, 600, None),)),
  ('array-72f4a9d7e59d592525f97b4055ecfbd1',
   6): (<function dask.array.core.getarray>, 'array-original-72f4a9d7e59d592525f97b4055ecfbd1', (slice(600, 700, None),)),
  ('array-72f4a9d7e59d592525f97b4055ecfbd1',
   7): (<function dask.array.core.getarray>, 'array-original-72f4a9d7e59d592525f97b4055ecfbd1', (slice(700, 800, None),)),
  ('array-72f4a9d7e59d592525f97b4055ecfbd1',
   8): (<function dask.array.core.getarray>, 'array-original-72f4a9d7e59d592525f97b4055ecfbd1', (slice(800, 900, None),)),
  ('array-72f4a9d7e59d592525f97b4055ecfbd1',
   9): (<function dask.array.core.getarray>, 'array-original-72f4a9d7e59d592525f97b4055ecfbd1', (slice(900, 1000, None),)),
  'array-original-72f4a9d7e59d592525f97b4055ecfbd1': array([  0,   1,   2,   3,   4,   5,   6,   7,   8,   9,  10,  11,  12,
          13,  14,  15,  16,  17,  18,  19,  20,  21,  22,  23,  24,  25,
          26,  27,  28,  29,  30,  31,  32,  33,  34,  35,  36,  37,  38,
          39,  40,  41,  42,  43,  44,  45,  46,  47,  48,  49,  50,  51,
          52,  53,  54,  55,  56,  57,  58,  59,  60,  61,  62,  63,  64,
          65,  66,  67,  68,  69,  70,  71,  72,  73,  74,  75,  76,  77,
          78,  79,  80,  81,  82,  83,  84,  85,  86,  87,  88,  89,  90,
          91,  92,  93,  94,  95,  96,  97,  98,  99, 100, 101, 102, 103,
         104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116,
         117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129,
         130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142,
         143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155,
         156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168,
         169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181,
         182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194,
         195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207,
         208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220,
         221, 222, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232, 233,
         234, 235, 236, 237, 238, 239, 240, 241, 242, 243, 244, 245, 246,
         247, 248, 249, 250, 251, 252, 253, 254, 255, 256, 257, 258, 259,
         260, 261, 262, 263, 264, 265, 266, 267, 268, 269, 270, 271, 272,
         273, 274, 275, 276, 277, 278, 279, 280, 281, 282, 283, 284, 285,
         286, 287, 288, 289, 290, 291, 292, 293, 294, 295, 296, 297, 298,
         299, 300, 301, 302, 303, 304, 305, 306, 307, 308, 309, 310, 311,
         312, 313, 314, 315, 316, 317, 318, 319, 320, 321, 322, 323, 324,
         325, 326, 327, 328, 329, 330, 331, 332, 333, 334, 335, 336, 337,
         338, 339, 340, 341, 342, 343, 344, 345, 346, 347, 348, 349, 350,
         351, 352, 353, 354, 355, 356, 357, 358, 359, 360, 361, 362, 363,
         364, 365, 366, 367, 368, 369, 370, 371, 372, 373, 374, 375, 376,
         377, 378, 379, 380, 381, 382, 383, 384, 385, 386, 387, 388, 389,
         390, 391, 392, 393, 394, 395, 396, 397, 398, 399, 400, 401, 402,
         403, 404, 405, 406, 407, 408, 409, 410, 411, 412, 413, 414, 415,
         416, 417, 418, 419, 420, 421, 422, 423, 424, 425, 426, 427, 428,
         429, 430, 431, 432, 433, 434, 435, 436, 437, 438, 439, 440, 441,
         442, 443, 444, 445, 446, 447, 448, 449, 450, 451, 452, 453, 454,
         455, 456, 457, 458, 459, 460, 461, 462, 463, 464, 465, 466, 467,
         468, 469, 470, 471, 472, 473, 474, 475, 476, 477, 478, 479, 480,
         481, 482, 483, 484, 485, 486, 487, 488, 489, 490, 491, 492, 493,
         494, 495, 496, 497, 498, 499, 500, 501, 502, 503, 504, 505, 506,
         507, 508, 509, 510, 511, 512, 513, 514, 515, 516, 517, 518, 519,
         520, 521, 522, 523, 524, 525, 526, 527, 528, 529, 530, 531, 532,
         533, 534, 535, 536, 537, 538, 539, 540, 541, 542, 543, 544, 545,
         546, 547, 548, 549, 550, 551, 552, 553, 554, 555, 556, 557, 558,
         559, 560, 561, 562, 563, 564, 565, 566, 567, 568, 569, 570, 571,
         572, 573, 574, 575, 576, 577, 578, 579, 580, 581, 582, 583, 584,
         585, 586, 587, 588, 589, 590, 591, 592, 593, 594, 595, 596, 597,
         598, 599, 600, 601, 602, 603, 604, 605, 606, 607, 608, 609, 610,
         611, 612, 613, 614, 615, 616, 617, 618, 619, 620, 621, 622, 623,
         624, 625, 626, 627, 628, 629, 630, 631, 632, 633, 634, 635, 636,
         637, 638, 639, 640, 641, 642, 643, 644, 645, 646, 647, 648, 649,
         650, 651, 652, 653, 654, 655, 656, 657, 658, 659, 660, 661, 662,
         663, 664, 665, 666, 667, 668, 669, 670, 671, 672, 673, 674, 675,
         676, 677, 678, 679, 680, 681, 682, 683, 684, 685, 686, 687, 688,
         689, 690, 691, 692, 693, 694, 695, 696, 697, 698, 699, 700, 701,
         702, 703, 704, 705, 706, 707, 708, 709, 710, 711, 712, 713, 714,
         715, 716, 717, 718, 719, 720, 721, 722, 723, 724, 725, 726, 727,
         728, 729, 730, 731, 732, 733, 734, 735, 736, 737, 738, 739, 740,
         741, 742, 743, 744, 745, 746, 747, 748, 749, 750, 751, 752, 753,
         754, 755, 756, 757, 758, 759, 760, 761, 762, 763, 764, 765, 766,
         767, 768, 769, 770, 771, 772, 773, 774, 775, 776, 777, 778, 779,
         780, 781, 782, 783, 784, 785, 786, 787, 788, 789, 790, 791, 792,
         793, 794, 795, 796, 797, 798, 799, 800, 801, 802, 803, 804, 805,
         806, 807, 808, 809, 810, 811, 812, 813, 814, 815, 816, 817, 818,
         819, 820, 821, 822, 823, 824, 825, 826, 827, 828, 829, 830, 831,
         832, 833, 834, 835, 836, 837, 838, 839, 840, 841, 842, 843, 844,
         845, 846, 847, 848, 849, 850, 851, 852, 853, 854, 855, 856, 857,
         858, 859, 860, 861, 862, 863, 864, 865, 866, 867, 868, 869, 870,
         871, 872, 873, 874, 875, 876, 877, 878, 879, 880, 881, 882, 883,
         884, 885, 886, 887, 888, 889, 890, 891, 892, 893, 894, 895, 896,
         897, 898, 899, 900, 901, 902, 903, 904, 905, 906, 907, 908, 909,
         910, 911, 912, 913, 914, 915, 916, 917, 918, 919, 920, 921, 922,
         923, 924, 925, 926, 927, 928, 929, 930, 931, 932, 933, 934, 935,
         936, 937, 938, 939, 940, 941, 942, 943, 944, 945, 946, 947, 948,
         949, 950, 951, 952, 953, 954, 955, 956, 957, 958, 959, 960, 961,
         962, 963, 964, 965, 966, 967, 968, 969, 970, 971, 972, 973, 974,
         975, 976, 977, 978, 979, 980, 981, 982, 983, 984, 985, 986, 987,
         988, 989, 990, 991, 992, 993, 994, 995, 996, 997, 998, 999])}}

Y podemos visualizarlo si tenemos instalada la biblioteca graphviz:

In [8]:
op.visualize()
Out[8]:

Si queremos efectuar la operaci贸n, tendremos que llamar al m茅todo .compute().

In [9]:
op.compute()
Out[9]:
499.5

Si queremos convertir nuestro array original a array de NumPy, tambi茅n se hace llamando a compute():

In [10]:
y.compute()
Out[10]:
array([  0,   1,   2,   3,   4,   5,   6,   7,   8,   9,  10,  11,  12,
        13,  14,  15,  16,  17,  18,  19,  20,  21,  22,  23,  24,  25,
        26,  27,  28,  29,  30,  31,  32,  33,  34,  35,  36,  37,  38,
        39,  40,  41,  42,  43,  44,  45,  46,  47,  48,  49,  50,  51,
        52,  53,  54,  55,  56,  57,  58,  59,  60,  61,  62,  63,  64,
        65,  66,  67,  68,  69,  70,  71,  72,  73,  74,  75,  76,  77,
        78,  79,  80,  81,  82,  83,  84,  85,  86,  87,  88,  89,  90,
        91,  92,  93,  94,  95,  96,  97,  98,  99, 100, 101, 102, 103,
       104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116,
       117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129,
       130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142,
       143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155,
       156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168,
       169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181,
       182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194,
       195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207,
       208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220,
       221, 222, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232, 233,
       234, 235, 236, 237, 238, 239, 240, 241, 242, 243, 244, 245, 246,
       247, 248, 249, 250, 251, 252, 253, 254, 255, 256, 257, 258, 259,
       260, 261, 262, 263, 264, 265, 266, 267, 268, 269, 270, 271, 272,
       273, 274, 275, 276, 277, 278, 279, 280, 281, 282, 283, 284, 285,
       286, 287, 288, 289, 290, 291, 292, 293, 294, 295, 296, 297, 298,
       299, 300, 301, 302, 303, 304, 305, 306, 307, 308, 309, 310, 311,
       312, 313, 314, 315, 316, 317, 318, 319, 320, 321, 322, 323, 324,
       325, 326, 327, 328, 329, 330, 331, 332, 333, 334, 335, 336, 337,
       338, 339, 340, 341, 342, 343, 344, 345, 346, 347, 348, 349, 350,
       351, 352, 353, 354, 355, 356, 357, 358, 359, 360, 361, 362, 363,
       364, 365, 366, 367, 368, 369, 370, 371, 372, 373, 374, 375, 376,
       377, 378, 379, 380, 381, 382, 383, 384, 385, 386, 387, 388, 389,
       390, 391, 392, 393, 394, 395, 396, 397, 398, 399, 400, 401, 402,
       403, 404, 405, 406, 407, 408, 409, 410, 411, 412, 413, 414, 415,
       416, 417, 418, 419, 420, 421, 422, 423, 424, 425, 426, 427, 428,
       429, 430, 431, 432, 433, 434, 435, 436, 437, 438, 439, 440, 441,
       442, 443, 444, 445, 446, 447, 448, 449, 450, 451, 452, 453, 454,
       455, 456, 457, 458, 459, 460, 461, 462, 463, 464, 465, 466, 467,
       468, 469, 470, 471, 472, 473, 474, 475, 476, 477, 478, 479, 480,
       481, 482, 483, 484, 485, 486, 487, 488, 489, 490, 491, 492, 493,
       494, 495, 496, 497, 498, 499, 500, 501, 502, 503, 504, 505, 506,
       507, 508, 509, 510, 511, 512, 513, 514, 515, 516, 517, 518, 519,
       520, 521, 522, 523, 524, 525, 526, 527, 528, 529, 530, 531, 532,
       533, 534, 535, 536, 537, 538, 539, 540, 541, 542, 543, 544, 545,
       546, 547, 548, 549, 550, 551, 552, 553, 554, 555, 556, 557, 558,
       559, 560, 561, 562, 563, 564, 565, 566, 567, 568, 569, 570, 571,
       572, 573, 574, 575, 576, 577, 578, 579, 580, 581, 582, 583, 584,
       585, 586, 587, 588, 589, 590, 591, 592, 593, 594, 595, 596, 597,
       598, 599, 600, 601, 602, 603, 604, 605, 606, 607, 608, 609, 610,
       611, 612, 613, 614, 615, 616, 617, 618, 619, 620, 621, 622, 623,
       624, 625, 626, 627, 628, 629, 630, 631, 632, 633, 634, 635, 636,
       637, 638, 639, 640, 641, 642, 643, 644, 645, 646, 647, 648, 649,
       650, 651, 652, 653, 654, 655, 656, 657, 658, 659, 660, 661, 662,
       663, 664, 665, 666, 667, 668, 669, 670, 671, 672, 673, 674, 675,
       676, 677, 678, 679, 680, 681, 682, 683, 684, 685, 686, 687, 688,
       689, 690, 691, 692, 693, 694, 695, 696, 697, 698, 699, 700, 701,
       702, 703, 704, 705, 706, 707, 708, 709, 710, 711, 712, 713, 714,
       715, 716, 717, 718, 719, 720, 721, 722, 723, 724, 725, 726, 727,
       728, 729, 730, 731, 732, 733, 734, 735, 736, 737, 738, 739, 740,
       741, 742, 743, 744, 745, 746, 747, 748, 749, 750, 751, 752, 753,
       754, 755, 756, 757, 758, 759, 760, 761, 762, 763, 764, 765, 766,
       767, 768, 769, 770, 771, 772, 773, 774, 775, 776, 777, 778, 779,
       780, 781, 782, 783, 784, 785, 786, 787, 788, 789, 790, 791, 792,
       793, 794, 795, 796, 797, 798, 799, 800, 801, 802, 803, 804, 805,
       806, 807, 808, 809, 810, 811, 812, 813, 814, 815, 816, 817, 818,
       819, 820, 821, 822, 823, 824, 825, 826, 827, 828, 829, 830, 831,
       832, 833, 834, 835, 836, 837, 838, 839, 840, 841, 842, 843, 844,
       845, 846, 847, 848, 849, 850, 851, 852, 853, 854, 855, 856, 857,
       858, 859, 860, 861, 862, 863, 864, 865, 866, 867, 868, 869, 870,
       871, 872, 873, 874, 875, 876, 877, 878, 879, 880, 881, 882, 883,
       884, 885, 886, 887, 888, 889, 890, 891, 892, 893, 894, 895, 896,
       897, 898, 899, 900, 901, 902, 903, 904, 905, 906, 907, 908, 909,
       910, 911, 912, 913, 914, 915, 916, 917, 918, 919, 920, 921, 922,
       923, 924, 925, 926, 927, 928, 929, 930, 931, 932, 933, 934, 935,
       936, 937, 938, 939, 940, 941, 942, 943, 944, 945, 946, 947, 948,
       949, 950, 951, 952, 953, 954, 955, 956, 957, 958, 959, 960, 961,
       962, 963, 964, 965, 966, 967, 968, 969, 970, 971, 972, 973, 974,
       975, 976, 977, 978, 979, 980, 981, 982, 983, 984, 985, 986, 987,
       988, 989, 990, 991, 992, 993, 994, 995, 996, 997, 998, 999])

Grafos de operaciones

Vamos a ver un ejemplo trivial de c贸mo maneja dask los grafos de tareas, c贸mo podemos definir nuestro propio flujo de trabajo y c贸mo optimizarlo para aprovechar mejor las capacidades del ordenador.

In [11]:
words = 'apple orange apple pear orange pear pear'

def print_and_return(string):
    print(string)
    return string

def format_str(count, val, nwords):
    return ('word list has {0} occurrences of {1}, '
            'out of {2} words').format(count, val, nwords)

Ejercicio: contar el n煤mero de ocurrencias de pear, apple y orange.

In [12]:
total = len(words.split())
for word in "pear", "apple", "orange":
    count = words.count(word)
    print_and_return(format_str(count, word, total))
word list has 3 occurrences of pear, out of 7 words
word list has 2 occurrences of apple, out of 7 words
word list has 2 occurrences of orange, out of 7 words

Misma operaci贸n definiendo un grafo en Dask:

In [13]:
dsk = {
    'words': 'apple orange apple pear orange pear pear',
    'nwords': (len, (str.split, 'words')),
    'val1': 'orange',
    'val2': 'apple',
    'val3': 'pear',
    'count1': (str.count, 'words', 'val1'),
    'count2': (str.count, 'words', 'val2'),
    'count3': (str.count, 'words', 'val3'),
    'out1': (format_str, 'count1', 'val1', 'nwords'),
    'out2': (format_str, 'count2', 'val2', 'nwords'),
    'out3': (format_str, 'count3', 'val3', 'nwords'),
    'print1': (print_and_return, 'out1'),
    'print2': (print_and_return, 'out2'),
    'print3': (print_and_return, 'out3')
}
In [14]:
from dask.dot import dot_graph
dot_graph(dsk)
Out[14]:

Si queremos obtener resultados parciales, Dask solo recorre la parte del grafo que nos interesa:

In [15]:
from dask.threaded import get

outputs = ['print1', 'print2']
results = get(dsk, outputs)
results
word list has 2 occurrences of orange, out of 7 words
word list has 2 occurrences of apple, out of 7 words
Out[15]:
('word list has 2 occurrences of orange, out of 7 words',
 'word list has 2 occurrences of apple, out of 7 words')

Observamos que no se est谩 recorriendo el nodo print3. Esto es as铆 porque Dask est谩 eliminando operaciones innecesarias. Nosotros podemos reproducir ese proceso utilizando la funci贸n cull:

In [16]:
from dask.optimize import cull
dsk1, dependencies = cull(dsk, outputs)

dot_graph(dsk1)
Out[16]:

Visto que varios nodos del grafo est谩n accediendo a val1 y val2, podemos fusionar esos accesos para simplificarlo.

In [17]:
from dask.optimize import inline
dsk2 = inline(dsk1, dependencies=dependencies)

dot_graph(dsk2)
Out[17]:

Nuestro grafo de tareas ya es casi lineal, pero a煤n queda el conteo total de palabras como enlace. Supongamos que esta tarea es muy barata y que serializar el resultado para comunicarlo entre nodos es m谩s costoso: en ese caso podemos duplicarla para conseguir separar las dos ramas.

In [18]:
from dask.optimize import inline_functions
dsk3 = inline_functions(dsk2, outputs, [len, str.split], dependencies=dependencies)

dot_graph(dsk3)
Out[18]:

Por 煤ltimo, podemos consolidar tareas para simplificar la apariencia del grafo:

In [19]:
from dask.optimize import fuse
dsk4, dependencies = fuse(dsk3)

dot_graph(dsk4)
Out[19]:

Demo: Taxis de NYC

Otra de las estructuras de datos que provee pandas son los DataFrames, que se comportan de la misma manera que los DataFrames de pandas.

Para estudiar c贸mo funciona, vamos a descargar datos de trayectos de los taxis de New York:

In [2]:
!du data/yellow*.csv -h -s
1,9G	data/yellow_tripdata_2015-01.csv
1,9G	data/yellow_tripdata_2015-02.csv
2,0G	data/yellow_tripdata_2015-03.csv

Tanto dask.dataframe como dask.array usan un scheduler por defecto basado en hilos. En su lugar, vamos a utilizar una clase Client, la que emplear铆amos si estuvi茅ramos en un cluster.

In [4]:
import dask.dataframe as dd
In [5]:
from distributed import Client, progress

Esta clase Client, cuando se utiliza en local, lanza un scheduler que minimiza el uso de memoria y aprovecha todos los n煤cleos de la CPU.

"The dask single-machine schedulers have logic to execute the graph in a way that minimizes memory footprint." http://dask.pydata.org/en/latest/custom-graphs.html?highlight=minimizes%20memory#related-projects

El servidor de diagn贸stico est谩 disponible en http://127.0.0.1:8787/.

In [6]:
client = Client()
client
Out[6]:
<Client: scheduler='tcp://127.0.0.1:41728' processes=4 cores=4>

Y ahora leemos los .csv con un filtro todos a la vez en el mismo DataFrame de Dask:

In [7]:
df = dd.read_csv("data/yellow*.csv", parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])

Que mimetiza la API de pandas:

In [8]:
df.head()
Out[8]:
VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count trip_distance pickup_longitude pickup_latitude RateCodeID store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge total_amount
0 2 2015-01-15 19:05:39 2015-01-15 19:23:42 1 1.59 -73.993896 40.750111 1 N -73.974785 40.750618 1 12.0 1.0 0.5 3.25 0.0 0.3 17.05
1 1 2015-01-10 20:33:38 2015-01-10 20:53:28 1 3.30 -74.001648 40.724243 1 N -73.994415 40.759109 1 14.5 0.5 0.5 2.00 0.0 0.3 17.80
2 1 2015-01-10 20:33:38 2015-01-10 20:43:41 1 1.80 -73.963341 40.802788 1 N -73.951820 40.824413 2 9.5 0.5 0.5 0.00 0.0 0.3 10.80
3 1 2015-01-10 20:33:39 2015-01-10 20:35:31 1 0.50 -74.009087 40.713818 1 N -74.004326 40.719986 2 3.5 0.5 0.5 0.00 0.0 0.3 4.80
4 1 2015-01-10 20:33:39 2015-01-10 20:52:58 1 3.00 -73.971176 40.762428 1 N -74.004181 40.742653 2 15.0 0.5 0.5 0.00 0.0 0.3 16.30
In [9]:
df.dtypes
Out[9]:
VendorID                          int64
tpep_pickup_datetime     datetime64[ns]
tpep_dropoff_datetime    datetime64[ns]
passenger_count                   int64
trip_distance                   float64
pickup_longitude                float64
pickup_latitude                 float64
RateCodeID                        int64
store_and_fwd_flag               object
dropoff_longitude               float64
dropoff_latitude                float64
payment_type                      int64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
improvement_surcharge           float64
total_amount                    float64
dtype: object

Vamos a calcular la longitud del DataFrame:

In [28]:
# Esta operaci贸n bloquea el int茅rprete durante unos minutos
len(df)
Out[28]:
51622905

Como se puede observar, el uso de memoria est谩 contenido y todas las CPUs est谩n trabajando.

len

Tambi茅n lo podemos hacer de manera as铆ncrona:

In [29]:
futures = client.submit(len, df)
futures
Out[29]:
<Future: status: pending, key: len-0e3f941b29b7395f1785e5e7566c738f>
In [30]:
progress(futures)

Vamos ahora a calcular la distancia media recorrida en funci贸n del n煤mero de ocupantes. Igual que cuando us谩bamos dask.array, la operaci贸n no se efect煤a autom谩ticamente.

In [31]:
op = df.groupby(df.passenger_count).trip_distance.mean()
op
Out[31]:
dd.Series<truediv..., npartitions=1>
In [32]:
f2 = client.compute(op)
f2
Out[32]:
<Future: status: pending, key: finalize-14642408fb730ca74c9edc7856bf5eec>
El m茅todo `client.compute` almacena el resultado en un solo nodo, y por tanto debe usarse con cuidado. Para objetos grandes, es mejor usar `client.persist`.

En este caso la visualizaci贸n de la operaci贸n ya tiene una magnitud considerable:

In [33]:
op.visualize()
Out[33]: