#!/usr/bin/env python # coding: utf-8 # # flower REST API # # This document shows how to use the flower [REST API](https://github.com/mher/flower#api). # # We will use [requests](http://www.python-requests.org/en/latest/) for accessing the API. (See [here](http://www.python-requests.org/en/latest/user/install/) on how to install it.) # # Code # We'll use the following code throughout the documentation. # ## tasks.py # In[43]: from celery import Celery from time import sleep celery = Celery() celery.config_from_object({ 'BROKER_URL': 'amqp://localhost', 'CELERY_RESULT_BACKEND': 'amqp://', 'CELERYD_POOL_RESTARTS': True, # Required for /worker/pool/restart API }) @celery.task def add(x, y): return x + y @celery.task def sub(x, y): sleep(30) # Simulate work return x - y # ## Running # You'll need a celery worker instance and a flower instance running. In one terminal window run # # celery worker --loglevel INFO -A proj -E --autoscale 10,3 # # and in another terminal run # # celery flower -A proj # # Tasks API # The tasks API is *async*, meaning calls will return immediately and you'll need to poll on task status. # In[3]: # Done once for the whole docs import requests, json api_root = 'http://localhost:5555/api' task_api = '{}/task'.format(api_root) # ## async-apply # In[6]: args = {'args': [1, 2]} url = '{}/async-apply/tasks.add'.format(task_api) print(url) resp = requests.post(url, data=json.dumps(args)) reply = resp.json() reply # We can see that we created a new task and it's pending. Note that the API is *async*, meaning it won't wait until the task finish. # ## apply # For create task and wait results you can use 'apply' API. # In[7]: args = {'args': [1, 2]} url = '{}/apply/tasks.add'.format(task_api) print(url) resp = requests.post(url, data=json.dumps(args)) reply = resp.json() reply # ## result # Gets the task result. This is *async* and will return immediately even if the task didn't finish (with state 'PENDING') # In[5]: url = '{}/result/{}'.format(task_api, reply['task-id']) print(url) resp = requests.get(url) resp.json() # ## revoke # Revoke a running task. # In[7]: # Run a task args = {'args': [1, 2]} resp = requests.post('{}/async-apply/tasks.sub'.format(task_api), data=json.dumps(args)) reply = resp.json() # Now revoke it url = '{}/revoke/{}'.format(task_api, reply['task-id']) print(url) resp = requests.post(url, data='terminate=True') resp.json() # ## rate-limit # Update [rate limit](https://docs.celeryq.dev/en/latest/userguide/tasks.html#Task.rate_limit) for a task. # In[20]: worker = 'miki-manjaro' # You'll need to get the worker name from the worker API (seel below) url = '{}/rate-limit/{}'.format(task_api, worker) print(url) resp = requests.post(url, params={'taskname': 'tasks.add', 'ratelimit': '10'}) resp.json() # ## timeout # Set timeout (both [hard](https://docs.celeryq.dev/en/latest/userguide/tasks.html#Task.time_limit) and [soft](https://docs.celeryq.dev/en/latest/userguide/tasks.html#Task.soft_time_limit)) for a task. # In[22]: url = '{}/timeout/{}'.format(task_api, worker) print(url) resp = requests.post(url, params={'taskname': 'tasks.add', 'hard': '3.14', 'soft': '3'}) # You can omit soft or hard resp.json() # # Worker API # In[55]: # Once for the documentation worker_api = '{}/worker'.format(api_root) # ## workers # List workers. # In[25]: url = '{}/workers'.format(api_root) # Only one not under /worker print(url) resp = requests.get(url) workers = resp.json() workers # ## pool/shutdown # Shutdown a worker. # In[30]: worker = workers.keys()[0] url = '{}/shutdown/{}'.format(worker_api, worker) print(url) resp = requests.post(url) resp.json() # ## pool/restart # Restart a worker pool, you need to have [CELERYD_POOL_RESTARTS](https://docs.celeryq.dev/en/latest/configuration.html#std:setting-CELERYD_POOL_RESTARTS) enabled in your configuration). # In[43]: pool_api = '{}/pool'.format(worker_api) url = '{}/restart/{}'.format(pool_api, worker) print(url) resp = requests.post(url) resp.json() # ## pool/grow # Grows worker pool. # In[53]: url = '{}/grow/{}'.format(pool_api, worker) print(url) resp = requests.post(url, params={'n': '10'}) resp.json() # ## pool/shrink # Shrink worker pool. # In[54]: url = '{}/shrink/{}'.format(pool_api, worker) print(url) resp = requests.post(url, params={'n': '3'}) resp.json() # ## pool/autoscale # [Autoscale](https://docs.celeryq.dev/en/latest/userguide/workers.html#autoscaling) a pool. # In[58]: url = '{}/autoscale/{}'.format(pool_api, worker) print(url) resp = requests.post(url, params={'min': '3', 'max': '10'}) resp.json() # ## queue/add-consumer # [Add a consumer](https://docs.celeryq.dev/en/latest/userguide/workers.html#std:control-add_consumer) to a queue. # In[62]: queue_api = '{}/queue'.format(worker_api) url = '{}/add-consumer/{}'.format(queue_api, worker) print(url) resp = requests.post(url, params={'queue': 'jokes'}) resp.json() # ## queue/cancel-consumer # [Cancel a consumer](https://docs.celeryq.dev/en/latest/userguide/workers.html#queues-cancelling-consumers) queue. # In[63]: url = '{}/cancel-consumer/{}'.format(queue_api, worker) print(url) resp = requests.post(url, params={'queue': 'jokes'}) resp.json() # # Queue API # # We assume that we've two queues; the default one 'celery' and 'all' # In[7]: url = '{}/queues/length'.format(api_root) print(url) resp = requests.get(url) resp.json()