Tasks provides a solution for large-scale computation within the Descartes Labs platform. Tasks can be used to train models, deploy algorithms over large regions of interest, or programmatically retrieve data from a sources such as Scenes. Tasks nodes only consume resources when used and autoscale up and down as needed. Tasks is best suited for highly parallel workflows where each unit of work is small in size.
The general terminology of Tasks is:
Let's say we were interested in estimating the percentage of farmland that has crops. A simple metric that is indicative of vegetation is NDVI, the normalized difference between the nir and red bands.
$$NDVI = \frac{N - R}{N + R}$$An NDVI value above 0.3 generally indicates the presence of vegetation.
import descarteslabs as dl
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
For this analysis we will use the NAIP dataset which contains high-resolution aerial imagery over the US.
def get_image(dltile_key):
tile = dl.scenes.DLTile.from_key(dltile_key)
sc, ctx = dl.scenes.search(aoi=tile, products='usda:naip:rgbn:v1', start_datetime='2015-01-01', end_datetime='2016-01-01')
im = sc.mosaic(bands='red green blue nir', ctx=ctx)
return im
def vegetation_mask(im):
n = im[-1].astype(float)
r = im[0].astype(float)
NDVI = (n - r) / (n + r + 1e-8)
return NDVI
Let's look at an example crop field over Nebraska.
lon, lat = -99.35249805450438,39.38670997057946
tile = dl.scenes.DLTile.from_latlon(lat=lat, lon=lon, resolution=1, tilesize=256, pad=0)
im = get_image(tile.key)
veg_mask = vegetation_mask(im)
fig, ax = plt.subplots(1, 3, figsize=(12, 5))
ax[0].imshow(np.moveaxis(im[0:3], 0, -1))
ax[1].imshow(veg_mask > 0.3)
ax[2].imshow(np.moveaxis(im[0:3], 0, -1))
ax[2].imshow(veg_mask > 0.3, alpha=0.3, clim=(0, 1))
ax[0].set_title('Raw Imagery')
ax[1].set_title('NDVI Mask')
ax[2].set_title('Masked Image')
pass
Our simple mask does a good job at distiguishing crops.
If we wanted to deploy our vegetation model over a wider area, we would need an efficient way to distribute our work over multiple machines. Tasks makes the task simple by providing a way to send local Python functions into a cloud-based execution environment.
Let's define a function that takes in an area of interest and outputs the percentage area of vegetation. In the function below, we write a self-contained function that pulls imagery from the product, does the analysis, and returns the result. The function takes a DLTile key which acts as a hash to a unique tile in space. The hash can be used to generate a unqiue DLTile on the fly.
def vegetation_percentage(dltile_key):
im = get_image(dltile_key)
mask = vegetation_mask(im)
return round(np.sum((mask > 0.3) == 1) / (mask.shape[0] * mask.shape[1]), 5)
vegetation_percentage(tile.key)
0.24503
The field we looked at before is about a quarter covered by vegetation. To deploy this function over a larger region, let's make a new Task group using the create_function method. The method will serialize our local code and put it into a Docker container for execution. Descartes Labs provides publicaly available images with many of the geospatial and scientific Python packages preinstalled. They can be found here. Make sure to choose an environment that matches your local Python version (from here we will assume you are using Python 3.6, but if not make sure to choose the correct image).
# Python 3.5
# docker_image = 'us.gcr.io/dl-ci-cd/images/tasks/public/py3.5/default:v2019.08.08-8-g0948e514'
# Python 3.6
docker_image = 'us.gcr.io/dl-ci-cd/images/tasks/public/py3.6/default:v2019.06.04-4-g4c7c582e'
# Python 3.7
# docker_image = 'us.gcr.io/dl-ci-cd/images/tasks/public/py3.7/default:v2019.08.08-7-g062b0653'
tasks = dl.Tasks()
async_func = tasks.create_function(
f=vegetation_percentage,
name='vegetation-demo',
image=docker_image,
maximum_concurrency=10,
)
The status of the Tasks group can be found on the Monitor page.
Once the status changes from pending to running the Task group will start scaling your computation.
We have now instantiated a Task group. It returned a CloudFunction which can be used to send our requests to the Task group. Tasks are then put into a queue and scheduled for work.
async_func
<descarteslabs.client.services.tasks.tasks.CloudFunction at 0x7fe0b8545748>
Say we wanted to run this model over more crop fields. We would define a region of interest which could be as large as we would like.
aoi = {"type":"Feature","geometry":{"type":"Polygon","coordinates":[[[-99.32275771466082,39.510715877324],[-99.17530059814455,39.510715877324],[-99.17530059814455,39.537992699478735],[-99.32275771466082,39.537992699478735],[-99.32275771466082,39.510715877324]]]}, "properties":{}}
from ipyleaflet import Map, GeoJSON, basemaps
m = Map(center=(39.52371, -99.25272), zoom=13, basemap=basemaps.Esri.WorldImagery)
m.add_layer(GeoJSON(data=aoi))
m
Map(basemap={'url': 'http://server.arcgisonline.com/ArcGIS/rest/services/World_Imagery/MapServer/tile/{z}/{y}/…
Once we have a region of interest, scenes.DLTile.from_shape will let us break that region up into chucks which can be analyzed in parallel.
tiles = dl.scenes.DLTile.from_shape(aoi, resolution=1, tilesize=512, pad=0)
len(tiles)
182
There are 182 tiles in this region of interest. We can easily launch our tasks by iterating over the tiles and calling the CloudFunction object as if we were calling a local function.
for tile in tiles:
async_func(tile.key)
We can check the status of each task using the Monitor page. We also can get the results back from the tasks via the client. We can retreive the id for a Task group using get_group_by_name (wait for the Task group to start before running this).
group_info = tasks.get_group_by_name('vegetation-demo')
group_info.id
'66fe1e3b'
We can retreive metadata about finished tasks by using get_task_results.
results = tasks.get_task_results(group_info.id, include=['arguments', 'result_url'])
results['results'][0]
{ 'arguments': '{"args": ["512:0:1.0:14:-54:8544"], "kwargs": {}}', 'created': '2019-08-14T00:34:05.462810', 'exception_name': None, 'failure_type': None, 'group': '66fe1e3b', 'id': '681750730057786', 'labels': None, 'log_size_bytes': 0, 'peak_memory_usage': 91025408, 'result_key': '66fe1e3b_681750730057786', 'result_size_bytes': 7, 'result_type': 'json', 'result_url': 'https://storage.googleapis.com/storage-4c8...z6AEtoYCD7FBjlFvmQo9I0EPwNadNznQiEFQg%3D%3D', 'runtime': 0.7154245376586914, 'status': 'SUCCESS', 'trunc_arguments': '"512:0:1.0:14:-54:8544"', 'updated': '2019-08-14T00:34:05.462815', 'webhook': None }
Let's load the information and result for each task.
import json
import requests
result_dict = {}
for result in results['results']:
if result.status == 'SUCCESS':
r = json.loads(requests.get(result.result_url).text)
args_dict = json.loads(result.arguments)
key = args_dict['args'][0]
result_dict[key] = r
Now let's look at the vegetation values for a few of our tiles.
fig, ax = plt.subplots(1, 3, figsize=(15, 5))
for i, key in enumerate(result_dict):
im = get_image(key)
p = result_dict[key]
ax[i].imshow(np.moveaxis(im[0:3], 0, -1))
ax[i].set_title(f'Vegetation percentage: {round(p * 100)}%')
if i >= 2:
break
Because Tasks scale down when not in use, there is no cost reason to delete a Task Group. Tasks can be put in a terminated state which removes them from the Monitor view. New jobs cannot be submitted to terminated groups.
tasks.delete_group_by_id(group_info.id)
{ 'ack_deadline': 300, 'build_log_size_bytes': None, 'cpu': 1, 'created': '2019-08-14T00:32:59.470979', 'function_python_version': '3.6.8', 'function_type': 'py_pickle', 'gpu': 0, 'id': '66fe1e3b', 'image': 'us.gcr.io/dl-ci-cd/images/tasks/public/py3.6/default:v2019.06.04-4-g4c7c582e', 'labels': {}, 'maximum_concurrency': 10, 'mem': 2147483648, 'minimum_concurrency': None, 'minimum_seconds': None, 'name': 'vegetation-demo', 'queue': { 'failures': '0', 'pending': 0, 'successes': '182' }, 'status': 'terminated', 'updated': '2019-08-14T00:37:37.188660', 'upload_url': None, 'worker_timeout': 1800, 'workers': { 'failed': 0, 'pending': 2, 'running': 7, 'succeeded': 0, 'unknown': 0 } }
In the previous sections we deployed a simple model in Tasks. In the following section, we explore more advanced uses for Tasks.
For many kinds of machine learning workflows, GPUs may be used to greatly accelerate computation. GPUs can be requested when launching a Task group using the gpu
argument. The function below should print out the name of a local GPU device if one exists.
def try_gpu():
from tensorflow.python.client import device_lib
devices = device_lib.list_local_devices()
for d in devices:
if d.device_type == 'GPU':
return d.physical_device_desc
To make sure the GPU can be utilized, the correct version of CUDA and drivers must be installed in the Task image. Descartes Labs provides GPU-ready public images to use. Once again it is important the that image version and the local system Python versions match.
# Python 3.5
# gpu_image = 'us.gcr.io/dl-ci-cd/images/tasks/public/py3.5-gpu/default:v2019.08.08-8-g0948e514'
# Python 3.6
gpu_image = 'us.gcr.io/dl-ci-cd/images/tasks/public/py3.6/default:v2019.08.08-8-g0948e514'
gpu_test_func = dl.Tasks().create_function(
f=try_gpu,
name='gpu-demo',
image=gpu_image,
maximum_concurrency=10,
gpus=1
)
Let's see what GPUs are available on Tasks. Because GPUs are provisioned on the fly it may take a minute or two to start.
task = gpu_test_func()
task.result
Tasks is easy to manage for small, self-contained functions, but what about with larger projects? There are two methods to encapsulate and run larger projects within Tasks.
Creating Docker containers may be cumbersome. The Descartes Labs platform provides some options to speed up this process. When creating a Task function, three arguments can be optionally specified which can modify the environment that the task function runs in.
include_modules
: Include local Python modules. Must by in the Python path of the launching function.include_data
: Includes data files such as CSVs or JSONs.requirements
: A list of pip-installable packagesWith these arguments, we can modify the public images for our own use. For example, if we wanted to train a neural network model with the newest version of PyTorch we could install it as part of building the environment. Because Tasks is rebuilding the environment it may take a few minutes.
def try_torch():
import torch
return torch.__version__
torch_version = dl.Tasks().create_function(
f=try_torch,
name='pytorch-demo',
image='us.gcr.io/dl-ci-cd/images/tasks/public/py3.6/default:v2019.06.04-4-g4c7c582e',
maximum_concurrency=10,
requirements=['torch>=1.0']
)
r = torch_version()
r.result
We have deployed a simple model of crop coverage over a region of interest with Tasks. In the process we have:
Used tasks.create_function to create a task group.
Tile regions of interest with scenes.DLTile.
Called async functions to deploy tasks.
Retreive task groups using tasks.get_group_by_name.
Get results back using tasks.get_task_results.
Terminate a Task group with tasks.delete_group_by_id
Used GPUs in Tasks.
Built our own custom environment in Tasks.