I am new at RQ and I am trying to implement it within my flask app. The main goal of one of my routes is to update values from a database. To setup my worker I am using the following:
from rq import Worker, Queue, Connection
import redis
import os
@app.before_first_request
def start_worker():
def runworker():
redis_url = os.environ.get("REDIS_URL") or 'redis://'
conn = redis.from_url(redis_url)
with Connection(conn):
worker = Worker(list(map(Queue, listen)))
worker.work()
tp = ThreadPoolExecutor()
tp.submit(runworker)
def get_redis_connection():
redis_connection = getattr(g, '_redis_connection', None)
if redis_connection is None:
redis_url = os.environ.get('REDIS_URL') or 'redis://'
redis_connection = redis.from_url(redis_url)
return redis_connection
@app.before_request
def push_rq_connection():
push_connection(get_redis_connection())
@app.teardown_request
def pop_rq_connection(exception=None):
pop_connection()
then the update route queues the update job
@app.route('/update')
def update_db():
q = Queue(connection=conn)
job = q.q.enqueue('app.tasks.update_task', parameters)
job_id = job.get_id()
return {"job_id": job_id}, 201, {"Content-Type": 'application/json'}
finally, the worker runs the updating funcion
def update_task(parameters):
# script to update DB
I know the worker setup (almost) works because if I switch the update_task function to something simple like:
def update_task(seconds):
for i in range(seconds):
print(i)
time.sleep(1)
return "Hello world"
it works. However, for the real function I keep running into the problem that my environment variables are not being defined because when I run the actual update, I get exceptions saying my variables are None or something similar.
Does anyone know how to handle environment variables within RQ? Should I declare them again somewhere like a config file?
Ok so it's better to use multiprocessing
from multiprocessing import Process
def runworker():
redis_url = os.environ.get("REDIS_URL") or "redis://"
conn = redis.from_url(redis_url)
listen = ['default']
with Connection(conn):
worker = Worker(list(map(Queue, listen)))
worker.work()
pp_worker = Process(target=runworker)
pp_worker.start()
@app.teardow_appcontext
def teardown_rq(self):
pp_worker.terminate()
aaand that does the trick. Just if anyone was wondering about it.