flower REST API

This document shows how to use the flower REST API.

We will use requests for accessing the API. (See here on how to install it.)

Code

We'll use the following code throughout the documentation.

tasks.py

In [43]:
from celery import Celery
from time import sleep

celery = Celery()
celery.config_from_object({
    'BROKER_URL': 'amqp://localhost',
    'CELERY_RESULT_BACKEND': 'amqp://',
    'CELERYD_POOL_RESTARTS': True,  # Required for /worker/pool/restart API
})


@celery.task
def add(x, y):
    return x + y


@celery.task
def sub(x, y):
    sleep(30)  # Simulate work
    return x -  y

Running

You'll need a celery worker instance and a flower instance running. In one terminal window run

celery worker --loglevel INFO -A proj -E --autoscale 10,3

and in another terminal run

celery flower -A proj

Tasks API

The tasks API is async, meaning calls will return immediatly and you'll need to poll on task status.

In [3]:
# Done once for the whole docs
import requests, json
api_root = 'http://localhost:5555/api'
task_api = '{}/task'.format(api_root)

async-apply

In [6]:
args = {'args': [1, 2]}
url = '{}/async-apply/tasks.add'.format(task_api)
print(url)
resp = requests.post(url, data=json.dumps(args))
reply = resp.json()
reply
http://localhost:5555/api/task/async-apply/tasks.add
Out[6]:
{u'state': u'PENDING', u'task-id': u'f4a53407-30f3-42af-869f-b7f8f4fbd684'}

We can see that we created a new task and it's pending. Note that the API is async, meaning it won't wait until the task finish.

apply

For create task and wait results you can use 'apply' API.

In [7]:
args = {'args': [1, 2]}
url = '{}/apply/tasks.add'.format(task_api)
print(url)
resp = requests.post(url, data=json.dumps(args))
reply = resp.json()
reply
http://localhost:5555/api/task/apply/tasks.add
Out[7]:
{u'result': 3,
 u'state': u'SUCCESS',
 u'task-id': u'ced6fd57-419e-4b8e-8d99-0770be717cb4'}

result

Gets the task result. This is async and will return immediatly even if the task didn't finish (with state 'PENDING')

In [5]:
url = '{}/result/{}'.format(task_api, reply['task-id'])
print(url)
resp = requests.get(url)
resp.json()
http://localhost:5555/api/task/result/ced6fd57-419e-4b8e-8d99-0770be717cb4
Out[5]:
{u'result': 3,
 u'state': u'SUCCESS',
 u'task-id': u'ced6fd57-419e-4b8e-8d99-0770be717cb4'}

revoke

Revoke a running task.

In [7]:
# Run a task
args = {'args': [1, 2]}
resp = requests.post('{}/async-apply/tasks.sub'.format(task_api), data=json.dumps(args))
reply = resp.json()

# Now revoke it
url = '{}/revoke/{}'.format(task_api, reply['task-id'])
print(url)
resp = requests.post(url, data='terminate=True')
resp.json()
http://localhost:5555/api/task/revoke/bcb4ac2e-cb2d-4a4b-a402-8eb3a3b0c8e8
Out[7]:
{u'message': u"Revoked 'bcb4ac2e-cb2d-4a4b-a402-8eb3a3b0c8e8'"}

rate-limit

Update rate limit for a task.

In [20]:
worker = 'miki-manjaro'  # You'll need to get the worker name from the worker API (seel below)
url = '{}/rate-limit/{}'.format(task_api, worker)
print(url)
resp = requests.post(url, params={'taskname': 'tasks.add', 'ratelimit': '10'})
resp.json()
http://localhost:5555/api/task/rate-limit/miki-manjaro
Out[20]:
{u'message': u'new rate limit set successfully'}

timeout

Set timeout (both hard and soft) for a task.

In [22]:
url = '{}/timeout/{}'.format(task_api, worker)
print(url)
resp = requests.post(url, params={'taskname': 'tasks.add', 'hard': '3.14', 'soft': '3'})  # You can omit soft or hard
resp.json()
http://localhost:5555/api/task/timeout/miki-manjaro
Out[22]:
{u'message': u'time limits set successfully'}

Worker API

In [55]:
# Once for the documentation
worker_api = '{}/worker'.format(api_root)

workers

List workers.

In [25]:
url = '{}/workers'.format(api_root)  # Only one not under /worker
print(url)
resp = requests.get(url)
workers = resp.json()
workers
http://localhost:5555/api/workers
Out[25]:
{u'miki-manjaro': {u'completed_tasks': 0,
  u'concurrency': 1,
  u'queues': [u'celery'],
  u'running_tasks': 0,
  u'status': True}}

pool/shutdown

Shutdown a worker.

In [30]:
worker = workers.keys()[0]
url = '{}/shutdown/{}'.format(worker_api, worker)
print(url)
resp = requests.post(url)
resp.json()
http://localhost:5555/api/worker/shutdown/miki-manjaro
Out[30]:
{u'message': u'Shutting down!'}

pool/restart

Restart a worker pool, you need to have CELERYD_POOL_RESTARTS enabled in your configuration).

In [43]:
pool_api = '{}/pool'.format(worker_api)
url = '{}/restart/{}'.format(pool_api, worker)
print(url)
resp = requests.post(url)
resp.json()
http://localhost:5555/api/worker/pool/restart/miki-manjaro
Out[43]:
{u'message': u"Restarting 'miki-manjaro' worker's pool"}

pool/grow

Grows worker pool.

In [53]:
url = '{}/grow/{}'.format(pool_api, worker)
print(url)
resp = requests.post(url, params={'n': '10'})
resp.json()
http://localhost:5555/api/worker/pool/grow/miki-manjaro
Out[53]:
{u'message': u"Growing 'miki-manjaro' worker's pool"}

pool/shrink

Shrink worker pool.

In [54]:
url = '{}/shrink/{}'.format(pool_api, worker)
print(url)
resp = requests.post(url, params={'n': '3'})
resp.json()
http://localhost:5555/api/worker/pool/shrink/miki-manjaro
Out[54]:
{u'message': u"Shrinking 'miki-manjaro' worker's pool"}

pool/autoscale

Autoscale a pool.

In [58]:
url = '{}/autoscale/{}'.format(pool_api, worker)
print(url)
resp = requests.post(url, params={'min': '3', 'max': '10'})
resp.json()
http://localhost:5555/api/worker/pool/autoscale/miki-manjaro
Out[58]:
{u'message': u"Autoscaling 'miki-manjaro' worker"}

queue/add-consumer

Add a consumer to a queue.

In [62]:
queue_api = '{}/queue'.format(worker_api)
url = '{}/add-consumer/{}'.format(queue_api, worker)
print(url)
resp = requests.post(url, params={'queue': 'jokes'})
resp.json()
http://localhost:5555/api/worker/queue/add-consumer/miki-manjaro
Out[62]:
{u'message': u"add consumer u'jokes'"}

queue/cancel-consumer

Cancel a consumer queue.

In [63]:
url = '{}/cancel-consumer/{}'.format(queue_api, worker)
print(url)
resp = requests.post(url, params={'queue': 'jokes'})
resp.json()
http://localhost:5555/api/worker/queue/cancel-consumer/miki-manjaro
Out[63]:
{u'message': u'no longer consuming from jokes'}

Queue API

We assume that we've two queues; the default one 'celery' and 'all'

In [7]:
url = '{}/queues/length'.format(api_root)
print(url)
resp = requests.get(url)
resp.json()
http://localhost:5555/api/queues/length
Out[7]:
{u'active_queues': [{u'messages': 2, u'name': u'all'},
  {u'messages': 1, u'name': u'celery'}]}