Not really
RQ is Python specific, messages use pickle
.
Redis is a key value store, not a highly available message broker.
RQ Workers fork new child processes for every task.
But is quick and easy!
pip install rq
Real Quick
Take any Python function
import requests
def count_words_at_url(url):
resp = requests.get(url)
return len(resp.text.split())
Create a queue and enqueue the function reference and arguments
from redis import Redis
from rq import Queue
from count import count_words_at_url
q = Queue(connection=Redis())
q.enqueue(
count_words_at_url, 'http://uberhip.com'
)
Run the worker on the command line
rqworker
Run the client
rq1.py
Worker logs show
Remember, all we did was enqueue a job, specifying only the function and its arguments.
q = Queue(connection=Redis())
q.enqueue(
count_words_at_url, 'http://uberhip.com'
)
No queue names, no priorities, no handling of the return values.
To enqueue something on a specific queue, instantiate Queue() with the queue name as an argument.
low_q = Queue('low', connection=Redis())
low_q.enqueue(
count_words_at_url, 'http://uberhip.com'
)
You can use any queue name.
Workers can be started on specific queues by providing queue names as command line arguments to rqworker
.
rqworker low
Other Queue
keyword arguments include: timeout
, result_ttl
, ttl
, depends_on
, job_id
, and at_font
. See http://python-rq.org/docs/ for details.
The task function passed to enqueue
can be EITHER a function reference OR a string. So the Workers can be implemented in a separate code base from the enqueueing
code.
Queue()
has the following methods and attributes:
len(q)
- Number of jobs in queueq.job_ids
- List of enqueued job IDsq.jobs
- List of enqueued job instancesjob = q.fetch_job('my_id')
- Returns job having ID: my_idConfig file
Burst Mode
Lifecycle: Boot
-> Reg
-> Listen
-> Prep Exec
-> Fork()
-> Process
-> Cleanup
-> GOTO 3
Custom Workers
Read the docs, we don't have time for this!
Want results from your job? enqueue
returns a job reference, save it, ask it for results before the default results_ttl
of 500s expires.
q = Queue(connection=Redis())
job = q.enqueue(
count_words_at_url, 'http://uberhip.com'
)
time.sleep(2)
print job.result
What, if we don't have time for Workers, we certainly don't have time for jobs!
Well, other than to emphasize there are two times-to-live
result_ttl
- result time to livettl
- job time to liveAnd ... there's a meta
property to which you can attach arbitrary data using dictionary syntax.
Read the docs: http://python-rq.org/docs/jobs/
rqinfo
rq-dashboard
DEMO!!!!!!!
(sounds a little fancy, maybe you want to try a different queueing system)
failed
queue, rq-dashboard
is handy for reviewing this.SimpleWorker
otherwise you might encounter trouble with fork()
.depends_on
@job
decoratorttl
s and runtimes appropriately. The default job ttl
is 180s, if your jobs runtime is expected to exceed 3min you should be changing the ttl
.