ruby-on-railssidekiqaasm

sidekiq perform_in(delay) from within the worker ignores the delay


Users in my app create Transactions, and I need for these transactions (and the associated jobs that are created for changing transaction state to ignored when users don't respond within a certain time) to cancel themselves unless a user performs a pay action.

The method I am using in one example makes the following calls using perform_async after a state changes to approved, and then cancels if it's not responded to in time:

Class Transaction < ApplicationRecord
 #when approved
 def create_worker
  MyWorker.perform_async(self.id)
 end

 #if user responds in time, cancel the jobs and update the record to `paid` etc
 def cancel_worker
  jid = MyWorker.perform_async(self.id)
  MyWorker.cancel! jid
 end
end

As suggested here and here, I'm putting additional functionality about when to cancel inside the worker. It looks something like this:

class MyWorker
 include Sidekiq::Worker

 def perform(transaction_id)
  return if paid?
  transaction = Transaction.find transaction_id
  self.class.perform_in(1.minutes, transaction.ignore!)
 end

 def paid?
  Sidekiq.redis { |c| c.exists("paid-#{jid}") }
 end

 def self.cancel! jid
  Sidekiq.redis { |c| c.setex("paid-#{jid}", 86400, 1) }
 end
end

This code results in the following terminal output:

2018-12-16T01:40:50.645Z 30530 TID-oxm547nes MyWorker JID-6c97e448fe30998235dee95d INFO: start
Changing transaction 4 approved to ignored (event: ignore!)
2018-12-16T01:40:50.884Z 30530 TID-oxm547nes MyWorker JID-6c97e448fe30998235dee95d INFO: done: 0.239 sec
2018-12-16T01:41:56.122Z 30530 TID-oxm547oag MyWorker JID-b46bb3b002e00f480a04be16 INFO: start
2018-12-16T01:41:56.125Z 30530 TID-oxm547oag MyWorker JID-b46bb3b002e00f480a04be16 INFO: fail: 0.003 sec
2018-12-16T01:41:56.126Z 30530 TID-oxm547oag WARN: {"context":"Job raised exception","job":{"class":"MyWorker","args":[true],"retry":true,"queue":"default","jid":"b46bb3b002e00f480a04be16","created_at":1544924450.884224,"enqueued_at":1544924516.107598,"error_message":"Couldn't find Transaction with 'id'=true","error_class":"ActiveRecord::RecordNotFound","failed_at":1544924516.125679,"retry_count":0},"jobstr":"{\"class\":\"MyWorker\",\"args\":[true],\"retry\":true,\"queue\":\"default\",\"jid\":\"b46bb3b002e00f480a04be16\",\"created_at\":1544924450.884224,\"enqueued_at\":1544924516.107598}"}

So this creates two jobs - one with a jid of 6c97e448fe30998235dee95d and which immediately sets the Transaction to ignored, and then one with a jid of b46bb3b002e00f480a04be16 which blows right past the early return in the worker's perform function (because it doesn't use the same jid as the first job).

One reason I can surmise about why this does not work the way I intend is that the call to MyWorker.cancel! cannot get the jid of the worker i want to cancel without first creating a db migration to hold said jid.

Is creating a db migration to contain the jid for a worker the preferred method for making sure that jid is accessible between actions? And how is id=true getting in there? As the error above says: Couldn't find Transaction with 'id'=true"


Solution

  • Ok, lets go piece by piece.

    1. This code:

      self.class.perform_in(1.minute, transaction.ignore!)
      

      is passing whatever the returned value of the ignore! method (in this case, true) as argument for the job, which causes the exception.

      You should make sure to pass the right arguments:

      self.class.perform_in(1.minute, transaction.tap(&:ignore!).id)
      
    2. Every time you call MyWorker.perform_async (or any other performing class method) you are creating a new job, so it’s not surprising that you are not getting the same jid.

      You should, as suggested, store the initial jid in the transaction table, and then upon payment retrieve it to cancel. Otherwise the job id is lost. An alternative is to actually use the same redis to store the paid flag, but keyed by the transaction instead. c.exists("paid-#{transaction.id}")

    3. Your code does not wait 1 minute to ignore the transaction, it just ignores the transaction right away and sets itself to execute again in 1 minute.

      You are probably wanting to call

      jid = MyWorker.perform_in(1.minute, transaction.id)
      

      directly from the create_worker method.


    UPDATE

    If, as I imagine, you are using some kind of persistent state machine, it's even easier to just "ignore unless complete" and forget about cancelling the job

    class Transaction
      # I'm inventing a DSL here
      include SomeStateMachine
    
      state :accepted do
        event :ignore, to: :ignored
        event :confirm, to: :confirmed
      end
      state :ignored
      state :confirmed
    
      def create_worker
        # no need to track it
        MyWorker.perform_in(1.minute, id)
      end
    end
    
    class MyWorker
      include Sidekiq::Worker
    
      def perform(id)
        transaction = Transaction.find(id)
        transaction.ignore! if transaction.can_ignore?
      end
    end
    

    You can let your job run, and it will happily skip any non-ignorable transaction.