Because not all problems are dataframes
from dask_kubernetes import KubeCluster
cluster = KubeCluster(n_workers=10)
cluster
from dask.distributed import Client, progress
c = Client(cluster)
c
from time import sleep
def inc(x):
from random import random
sleep(random())
return x + 1
def double(x):
from random import random
sleep(random())
return 2 * x
def add(x, y):
from random import random
sleep(random())
return x + y
inc(1)
future = c.submit(inc, 1) # returns immediately with pending future
future
future # scheduler and client talk constantly
future.result()
We submit many tasks that depend on each other in a normal Python for loop
%%time
zs = []
for i in range(256):
x = c.submit(inc, i) # x = inc(i)
y = c.submit(double, x) # y = inc(x)
z = c.submit(add, x, y) # z = inc(y)
zs.append(z)
total = c.submit(sum, zs)
total.result()
As an example of a non-trivial algorithm, consider the classic tree reduction. We accomplish this with a nested for loop and a bit of normal Python logic.
finish total single output
^ / \
| c1 c2 neighbors merge
| / \ / \
| b1 b2 b3 b4 neighbors merge
^ / \ / \ / \ / \
start a1 a2 a3 a4 a5 a6 a7 a8 many inputs
L = zs
while len(L) > 1:
new_L = []
for i in range(0, len(L), 2):
future = c.submit(add, L[i], L[i + 1]) # add neighbors
new_L.append(future)
L = new_L # swap old list for new
del L
from dask import delayed, visualize
L = zs
while len(L) > 1:
new_L = []
for i in range(0, len(L), 2):
future = delayed(add)(L[i], L[i + 1]) # add neighbors
new_L.append(future)
L = new_L # swap old list for new
visualize(*L)