pythonredistask-queuepython-rq

How to create a ``depends_on`` relationship between scheduled and queued jobs in python-rq


I have a web service (Python 3.7, Flask 1.0.2) with a workflow consisting of 3 steps:

The remote compute job is of arbitrary length (between seconds and days) and each step is dependent on the completion of the previous one:

with Connection(redis.from_url(current_app.config['REDIS_URL'])):
    q = Queue()
    job1 = q.enqueue(step1)
    job2 = q.enqueue(step2, depends_on=job1)
    job3 = q.enqueue(step3, depends_on=job2)

However, eventually all workers (4 workers) will do polling (step 2 of 4 client requests), while they should continue to do step 1 of other incoming requests and step 3 of those workflows having successfully passed step 2.

Workers should be released after each poll. They should periodically come back to step 2 for the next poll (at most every 61 seconds per job) and if the remote compute job poll does not return "DONE" re-queue the poll job.


At this point in time I started to use rq-scheduler (because the interval and re-queueing features sounded promising):

with Connection(redis.from_url(current_app.config['REDIS_URL'])):
    q = Queue()
    s = Scheduler('default')

    job1 = q.enqueue(step1, REQ_ID)

    job2 = Job.create(step2, (REQ_ID,), depends_on=job1)
    job2.meta['interval'] = 61
    job2.origin = 'default'
    job2.save()
    s.enqueue_job(job2)

    job3 = q.enqueue(step3, REQ_ID, depends_on=job2)

Job2 is created correctly (including the depends_on relationship to job1 but s.enqueue_job() executes it straight away, ignoring its relationship to job1. (The function doc-string of q.enqueue_job() actually says that it is executed immediately ...).

How can I create the depends_on relationship between job1, job2 and job3, when job2 is put in the scheduler and not the queue? (Or, how can I hand job2 to the scheduler, without it executing job2 straight away and waiting for job1 to finish?)


For testing purposes the steps look like this:

def step1():
    print(f'*** --> [{datetime.utcnow()}] JOB [ 1 ] STARTED...', flush=True)
    time.sleep(20)
    print(f'    <-- [{datetime.utcnow()}] JOB [ 1 ] FINISHED', flush=True)
    return True

def step2():
    print(f'    --> [{datetime.utcnow()}] POLL JOB [ 2 ] STARTED...', flush=True)
    time.sleep(10)
    print(f'    <-- [{datetime.utcnow()}] POLL JOB [ 2 ] FINISHED', flush=True)
    return True

def step3():
    print(f'    --> [{datetime.utcnow()}] JOB [ 3 ] STARTED...', flush=True)
    time.sleep(10)
    print(f'*** <-- [{datetime.utcnow()}] JOB [ 3 ] FINISHED', flush=True)
    return True

And the output I receive is this:

worker_1     | 14:44:57 default: project.server.main.tasks.step1(1) (d40256a2-904f-4ce3-98da-6e49b5d370c9)
worker_2     | 14:44:57 default: project.server.main.tasks.step2(1) (3736909c-f05d-4160-9a76-01bb1b18db58)
worker_2     |     --> [2019-11-04 14:44:57.341133] POLL JOB [ 2 ] STARTED...
worker_1     | *** --> [2019-11-04 14:44:57.342142] JOB [ 1 ] STARTED...
...

job2 is not waiting for job1 to complete ...


#requirements.txt
Flask==1.0.2
Flask-Bootstrap==3.3.7.1
Flask-Testing==0.7.1
Flask-WTF==0.14.2
redis==3.3.11
rq==0.13
rq_scheduler==0.9.1

Solution

  • My solution to this problem uses rq only (and no longer rq_scheduler):

    1. Upgrade to the latest python-rq package:

      # requirements.txt
      ...
      rq==1.1.0
      
    2. Create a dedicated queue for the polling jobs, and enqueue jobs accordingly (with the depends_on relationship):

      with Connection(redis.from_url(current_app.config['REDIS_URL'])):
          q = Queue('default')
          p = Queue('pqueue')
          job1 = q.enqueue(step1)
          job2 = p.enqueue(step2, depends_on=job1)  # step2 enqueued in polling queue
          job3 = q.enqueue(step3, depends_on=job2)
      
    3. Derive a dedicated worker for the polling queue. It inherits from the standard Worker class:

      class PWorker(rq.worker.Worker):
          def execute_job(self, *args, **kwargs):
              seconds_between_polls = 65
              job = args[0]
              if 'lastpoll' in job.meta:
                  job_timedelta = (datetime.utcnow() - job.meta["lastpoll"]).total_seconds()
                  if job_timedelta < seconds_between_polls:
                      sleep_period = seconds_between_polls - job_timedelta
                      time.sleep(sleep_period)
              job.meta['lastpoll'] = datetime.utcnow()
              job.save_meta()
      
              super().execute_job(*args, **kwargs)
      

      The PWorker extends the execute_job method by adding a timestamp to the job's meta data 'lastpoll'.

      If a poll job comes in, having a lastpoll timestamp, the worker checks if the time period since lastpoll is greater than 65 seconds. If it is, it writes the current time to 'lastpoll' and executes the poll. If not, it sleeps until the 65s are up and then writes the current time to 'lastpoll' and executes the poll. A job coming in without a lastpoll timestamp is polling for the first time and the worker creates the timestamp and executes the poll.

    4. Create a dedicated exception (to be thrown by the task function) and an exception handler to deal with it:

      # exceptions.py
      
      class PACError(Exception):
          pass
      
      class PACJobRun(PACError):
          pass
      
      class PACJobExit(PACError):
          pass
      
      # exception_handlers.py
      
      def poll_exc_handler(job, exc_type, exc_value, traceback):
          if exc_type is PACJobRun:
              requeue_job(job.get_id(), connection=job.connection)
              return False  # no further exception handling
          else:
              return True  # further exception handling
      
      # tasks.py
      
      def step2():
          # GET request to remote compute job portal API for status
          # if response == "RUN":
          raise PACJobRun
          return True
      

      When the custom exception handler catches the custom exception (which means the remote compute job is still running), it requeues the job in the polling queue.

    5. Slot the custom exception handler into the exception handling hierarchy:

      # manage.py
      
      @cli.command('run_pworker')
      def run_pworker():
          redis_url = app.config['REDIS_URL']
          redis_connection = redis.from_url(redis_url)
          with rq.connections.Connection(redis_connection):
              pworker = PWorker(app.config['PQUEUE'], exception_handlers=[poll_exc_handler])
              pworker.work()
      

    The nice thing about this solution is that it extends the standard functionality of python-rq with only a few lines of extra code. On the other hand, there is the added complexity of an extra queue and worker …