Dask logo

Custom Workloads

Because not all problems are dataframes

In [ ]:
from dask_gateway import Gateway

gateway = Gateway()
cluster = gateway.new_cluster()
cluster.scale(10)
cluster
In [ ]:
from dask.distributed import Client, progress
c = Client(cluster)
c
In [ ]:
from time import sleep

def inc(x):
    from random import random
    sleep(random())
    return x + 1

def double(x):
    from random import random
    sleep(random())
    return 2 * x
    
def add(x, y):
    from random import random
    sleep(random())
    return x + y 
In [ ]:
inc(1)
In [ ]:
future = c.submit(inc, 1)  # returns immediately with pending future
future
In [ ]:
future  # scheduler and client talk constantly
In [ ]:
future.result()

Submit many tasks

We submit many tasks that depend on each other in a normal Python for loop

In [ ]:
%%time
zs = []
for i in range(256):
    x = c.submit(inc, i)     # x = inc(i)
    y = c.submit(double, x)  # y = inc(x)
    z = c.submit(add, x, y)  # z = inc(y)
    zs.append(z)
    
total = c.submit(sum, zs)
In [ ]:
total.result()

Custom computation: Tree summation

As an example of a non-trivial algorithm, consider the classic tree reduction. We accomplish this with a nested for loop and a bit of normal Python logic.

finish           total             single output
    ^          /        \
    |        c1          c2        neighbors merge
    |       /  \        /  \
    |     b1    b2    b3    b4     neighbors merge
    ^    / \   / \   / \   / \
start   a1 a2 a3 a4 a5 a6 a7 a8    many inputs
In [ ]:
L = zs
while len(L) > 1:
    new_L = []
    for i in range(0, len(L), 2):
        future = c.submit(add, L[i], L[i + 1])  # add neighbors
        new_L.append(future)
    L = new_L                                   # swap old list for new
   
In [ ]:
del L

Visualize Computation

In [ ]:
from dask import delayed, visualize

L = zs
while len(L) > 1:
    new_L = []
    for i in range(0, len(L), 2):
        future = delayed(add)(L[i], L[i + 1])  # add neighbors
        new_L.append(future)
    L = new_L                                   # swap old list for new
    
visualize(*L)