Async/Await and Non-Blocking Execution

Dask integrates natively with concurrent applications using the Tornado or Asyncio frameworks, and can make use of Python's async and await keywords.

This example shows a small example how how to start up a Dask Client in asynchronous mode.

The asynchronous=True parameter

Dask LocalCluster and Client objects can operate in async-await mode if you pass the asynchronous=True parameter.

In [ ]:
from dask.distributed import Client
client = await Client(asynchronous=True)
In [ ]:
def inc(x: int) -> int:
    return x + 1

future = client.submit(inc, 10)
future
In [ ]:
await future

Collections

Note that blocking operations like the .compute() method aren't ok to use in asynchronous mode. Instead you'll have to use the Client.compute method.

In [ ]:
import dask
df = dask.datasets.timeseries()
df
In [ ]:
df = df.persist()             # persist is non-blocking, so it's ok
In [ ]:
total = df[['x', 'y']].sum()  # lazy computations are also ok
In [ ]:
# total.compute()             # but compute is bad, because compute blocks until done
In [ ]:
future = client.compute(total)
future
In [ ]:
await future

Within a script

Running async/await code in Jupyter is a bit atypical. Jupyter already has an event loop running, so it's easy to use async/await syntax directly within it. In a normal Python script this won't be the case. Here is an example script that should run within a normal Python interpreter or as a script.

import asyncio
from dask.distributed import Client


def inc(x: int) -> int:
    return x + 1


async def f():
    async with Client(asynchronous=True) as client:
        future = client.submit(inc, 10)
        result = await future
        print(result)


if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(f())
In [ ]: