This notebook will walk through usage of TileDB Cloud Delayed APIs
We will start off using the high level delayed API and finish with a quick look at the lower level DAG functionality.
First let's import the necessary packages
# Import base packages
import tiledb
import tiledb.cloud
from tiledb.cloud.compute import Delayed, DelayedSQL, DelayedArrayUDF
import numpy
Any python function can be wrapped in a Delayed object making the function executable as a future
x = Delayed(numpy.median)
# You can see the type is now `Delayed`
x
<tiledb.cloud.compute.delayed.Delayed at 0x7fa03c00b978>
The function can be called with parameters to store it and lazily executed
x([1,2,3,4,5])
<tiledb.cloud.compute.delayed.Delayed at 0x7fa03c00b978>
To force an execution call compute()
x.compute()
3.0
Besides arbitrary python functions, serverless sql queries and array based UDFs can also be called with the delayed API
# SQL
y = DelayedSQL("select AVG(`a`) FROM `tiledb://TileDB-Inc/quickstart_sparse`")
# Run query
y.compute()
AVG(`a`) | |
---|---|
0 | 2 |
# Array
z = DelayedArrayUDF("tiledb://TileDB-Inc/quickstart_sparse", lambda x: numpy.average(x["a"]))([(1, 4), (1, 4)])
# Run the udf on the array
z.compute()
Lastly it is also possible to include a generic python function as delayed but have it run locally instead of serverlessly. This is useful for testing or for saving finalized results to your local machine, i.e. saving a image.
local = Delayed(numpy.median, local=True)([1,2,3])
local.compute()
2.0
Delayed objects can be combined into a task graph. Output from one function or query can be passed into another, and dependencies are automatically determined.
# Build several delayed objects to build in a graph
local = Delayed(lambda x: x * 2, local=True)(100)
array_apply = DelayedArrayUDF("tiledb://TileDB-Inc/quickstart_sparse", lambda x: numpy.sum(x["a"]), name="array_apply")([(1, 4), (1, 4)])
sql = DelayedSQL("select SUM(`a`) as a from `{}`".format("tiledb://TileDB-Inc/quickstart_dense"), name="sql")
# Custom function to use to average all the results we are passing in
def mean(local, array_apply, sql):
import numpy
return numpy.mean([local, array_apply, sql.iloc(0)[0]])
res = Delayed(func_exec=mean, name="node_exec")(local, array_apply, sql)
A live graph can show the status of the Task Graph
res.visualize(force_plotly=True)
FigureWidget({ 'data': [{'hoverinfo': 'none', 'line': {'color': '#888', 'width': 0.5}, …
res.compute()
114.0
There are several functionalities which are exposed to allow for complex use cases
There might be cases were a function relies on another function but does not take it's arguments. An example would be if a function manipulated data on S3 but did not return anything.
# A few base functions:
import random
node_1 = Delayed(numpy.median, local=True, name="node_1")([1, 2, 3])
node_2 = Delayed(lambda x: x * 2, local=True, name="node_2")(node_1)
node_3 = Delayed(lambda x: x * 2, local=True, name="node_3")(node_2)
nodes_by_name= {'node_1': node_1, 'node_2': node_2, 'node_3': node_3}
#Function which sleeps for some time so we can see the graph in different states
def f():
import time
import random
x = random.randrange(0, 30)
time.sleep(x)
return x
# Randomly add 96 other nodes to the graph. All of these will use the sleep function
for i in range(4, 100):
name = "node_{}".format(i)
node = Delayed(f, local=True, name=name)()
dep = random.randrange(1, i-1)
# Randomly set dependency on one other node
node_dep = nodes_by_name["node_{}".format(dep)]
# Force the dependency to be set
node.depends_on(node_dep)
nodes_by_name[name] = node
node_1.visualize()
Visualize(value='{"nodes": ["node_1", "node_2", "node_4", "node_11", "node_20", "node_61", "node_3", "node_5",…
node_99 = nodes_by_name["node_99"]
node_99.compute()
2.0