 Because not all problems are dataframes

In [ ]:
from dask_gateway import Gateway

gateway = Gateway()
cluster = gateway.new_cluster()
cluster.scale(10)
cluster

In [ ]:
from dask.distributed import Client, progress
c = Client(cluster)
c

In [ ]:
from time import sleep

def inc(x):
from random import random
sleep(random())
return x + 1

def double(x):
from random import random
sleep(random())
return 2 * x

from random import random
sleep(random())
return x + y

In [ ]:
inc(1)

In [ ]:
future = c.submit(inc, 1)  # returns immediately with pending future
future

In [ ]:
future  # scheduler and client talk constantly

In [ ]:
future.result()


We submit many tasks that depend on each other in a normal Python for loop

In [ ]:
%%time
zs = []
for i in range(256):
x = c.submit(inc, i)     # x = inc(i)
y = c.submit(double, x)  # y = inc(x)
z = c.submit(add, x, y)  # z = inc(y)
zs.append(z)

total = c.submit(sum, zs)

In [ ]:
total.result()


### Custom computation: Tree summation¶

As an example of a non-trivial algorithm, consider the classic tree reduction. We accomplish this with a nested for loop and a bit of normal Python logic.

finish           total             single output
^          /        \
|        c1          c2        neighbors merge
|       /  \        /  \
|     b1    b2    b3    b4     neighbors merge
^    / \   / \   / \   / \
start   a1 a2 a3 a4 a5 a6 a7 a8    many inputs
In [ ]:
L = zs
while len(L) > 1:
new_L = []
for i in range(0, len(L), 2):
new_L.append(future)
L = new_L                                   # swap old list for new


In [ ]:
del L


### Visualize Computation¶

In [ ]:
from dask import delayed, visualize

L = zs
while len(L) > 1:
new_L = []
for i in range(0, len(L), 2):