rubyruby-on-rails-5rufus-scheduler

Rufus Scheduler - Help Starting / Stopping Jobs in Model


After Reading the Rufus Scheduler Docs about 100 times, every StackOverflow question I can find, looking at the source code, and finally reading how to report bugs effectively twice. I have come to the point where @jmettraux I need your help:

The Application

I am writing an internal application that allows my team to create metrics that periodically run code that saves values to a database. I have the following setup.

schema.rb

create_table "metrics", force: :cascade do |t|
    t.string   "frequency"
    t.string   "name"
    t.text     "data"
    t.datetime "created_at", null: false
    t.datetime "updated_at", null: false
    t.enter code hereboolean  "active"
  end

initializers/scheduler.rb

require 'rufus-scheduler'

class Rufus::Scheduler::Job
  def report
    logger = Logger.new(STDOUT)
    logger.info "Job: #{@id} Tags: #{@tags} Frequency: #{@frequency}"
  end
end

SCHEDULER = Rufus::Scheduler.new(:lockfile => ".rufus-scheduler.lock")

unless defined?(Rails::Console) || File.split($0).last == 'rake'
  #Launch All Matrics Jobs that are active
  @metrics = Metric.active

  unless SCHEDULER.down?
    @metrics.each_with_index do | metric, index |
      SCHEDULER.every metric.frequency, :tags => metric.id, :overlap => false, :timeout => metric.try(:timeout) || '3m' do | job |
        metric.add_value
        job.report
      end
    end
  end
end

What Works

The code above works flawlessly and Rufus initializes properly and all the jobs start as they should.

The Problem

Where I am running into a problem is that once the app is running I need to be able to unschedule jobs and reschedule them when a Metric is saved. I use the Rufus tags to assign a tag with the ID of the Metric when they are created and I'm using that to load them for unscheduling. Here is my current metric model with the relevant code.

models/metric.rb

class Metric < ApplicationRecord

  ...

  after_save :update_job

  private
  def update_job
    if self.changed.present?
      job = SCHEDULER.jobs(tag: self.id).first
      if job.present?
        logger.info "Job #{job.id}, for Metric #{self.id} unscheduled."
        SCHEDULER.unschedule(job.id)
      end

      if self.active
        metric = self
        new_job = SCHEDULER.every self.frequency, :tags => self.id, :overlap => false, :timeout => self.try(:timeout) || '3m' do | job |
          metric.add_value
        end
        logger.info "Job #{new_job} for Metric #{self.id} scheduled."
      end
    end
  end

end

The Actual Question!

Scenario 1:

  • Application Starts
  • scheduler initilizer loads all active metrics
  • scheduler initilizer starts all active metrics as 'every' job
  • User edits an active metric setting it to inactive
  • The update_job method is ran after the metric is saved
  • inside the update_job method the job for the current metric is found successfully
  • the unscheudle method is called on the job
  • PROBLEM HERE: The job runs again even after being unscheduled

Scenario 2:

  • Application Starts
  • scheduler initilizer loads all active metrics
  • scheduler initilizer starts all active metrics as 'every' job
  • User edits an inactive metric setting it to active
  • The update_job method is ran after the metric is saved
  • inside the update_job method a new 'every' job is scheduled for the metric
  • PROBLEM HERE: The job never gets started

What I have tried

System/Application Info

Phase 2 Logs - 12/20/2016

Scenario 1:

  xxx SCHEDULER started: 70168058449780
  xxx trigger (scheduled in initializer) Job: Rufus::Scheduler::EveryJob every_1482273018.01852_1684784125738549751 (70168045492220) Tags: ["2"] Frequency: 20.0
  xxx Metric 2 #update_job active: false
  xxx Metric 2 #update_job 0 SCHEDULER Rufus::Scheduler 3.2.2 70168058449780 down? false at_jobs: 0 in_jobs: 0 every_jobs: 2 interval_jobs: 0 cron_jobs: 0
  xxx Metric 2 found Job Job: Rufus::Scheduler::EveryJob every_1482273018.01852_1684784125738549751 (70168045492220) Tags: ["2"] Frequency: 20.0
  xxx Metric 2 Job every_1482273018.01852_1684784125738549751 unscheduled
  xxx Metric 2 #update_job 1 SCHEDULER Rufus::Scheduler 3.2.2 70168058449780 down? false at_jobs: 0 in_jobs: 0 every_jobs: 1 interval_jobs: 0 cron_jobs: 0
  xxx Metric 2 #update_job 2 SCHEDULER Rufus::Scheduler 3.2.2 70168058449780 down? false at_jobs: 0 in_jobs: 0 every_jobs: 1 interval_jobs: 0 cron_jobs: 0
  xxx trigger (scheduled in initializer) Job: Rufus::Scheduler::EveryJob every_1482273018.01852_1684784125738549751 (70168045492220) Tags: ["2"] Frequency: 20.0
  xxx trigger (scheduled in initializer) Job: Rufus::Scheduler::EveryJob every_1482273018.01852_1684784125738549751 (70168045492220) Tags: ["2"] Frequency: 20.0

Scenario 2:

  xxx SCHEDULER started: 70272158805840
  xxx Metric 2 #update_job active: true
  xxx Metric 2 #update_job 0 SCHEDULER Rufus::Scheduler 3.2.2 70272158805840 down? false at_jobs: 0 in_jobs: 0 every_jobs: 1 interval_jobs: 0 cron_jobs: 0
  xxx Metric 2 #update_job 1 SCHEDULER Rufus::Scheduler 3.2.2 70272158805840 down? false at_jobs: 0 in_jobs: 0 every_jobs: 1 interval_jobs: 0 cron_jobs: 0
  xxx Metric 2 Job every_1482273159.799382_1489585858424637616 scheduled
  xxx Metric 2 #update_job 2 SCHEDULER Rufus::Scheduler 3.2.2 70272158805840 down? false at_jobs: 0 in_jobs: 0 every_jobs: 2 interval_jobs: 0 cron_jobs: 0

Phase 3 Logs - 12/20/2016

Scenario 1:

xxx SCHEDULER started: 70321240613760
xxx sc Rufus::Scheduler 3.2.2 70321240613760
xxx sc down? false
xxx sc Process.pid 39975
xxx sc jobs:
xxx sc   0: j: Rufus::Scheduler::EveryJob i: "every_1482335616.453129_4198855636205920237" oi: 70321198587560 ts: ["2"] frq: 20.0 ua: nil c: 0 nt: 2016-12-21 09:53:56 -0600
xxx initializer over.
xxx Metric 2 Job every_1482335616.453129_4198855636205920237 scheduled in initializer for Metric 2
xxx Metric 2 Job every_1482335616.453129_4198855636205920237 scheduled in initializer for Metric 2
xxx Metric 2 Job every_1482335616.453129_4198855636205920237 scheduled in initializer for Metric 2
xxx Metric 2 #update_job changed: ["active", "updated_at"] active: false
xxx Metric 2 sc #update_job 0 Rufus::Scheduler 3.2.2 70321240613760
xxx Metric 2 sc #update_job 0 down? false
xxx Metric 2 sc #update_job 0 Process.pid 40037
xxx Metric 2 sc #update_job 0 jobs:
xxx Metric 2 sc #update_job 0   0: j: Rufus::Scheduler::EveryJob i: "every_1482335616.453129_4198855636205920237" oi: 70321198587560 ts: ["2"] frq: 20.0 ua: nil c: 0 nt: 2016-12-21 09:53:56 -0600
xxx Metric 2 found Job j: Rufus::Scheduler::EveryJob i: "every_1482335616.453129_4198855636205920237" oi: 70321198587560 ts: ["2"] frq: 20.0 ua: nil c: 0 nt: 2016-12-21 09:53:56 -0600 first
xxx Metric 2 Job every_1482335616.453129_4198855636205920237 unscheduled
xxx Metric 2 Job every_1482335616.453129_4198855636205920237 scheduled? false
xxx Metric 2 sc #update_job 1 Rufus::Scheduler 3.2.2 70321240613760
xxx Metric 2 sc #update_job 1 down? false
xxx Metric 2 sc #update_job 1 Process.pid 40037
xxx Metric 2 sc #update_job 1 jobs:
xxx Metric 2 Job every_1482335616.453129_4198855636205920237 scheduled in initializer for Metric 2

Scenario 2:

xxx SCHEDULER started: 70152969118240
xxx sc Rufus::Scheduler 3.2.2 70152969118240
xxx sc down? false
xxx sc Process.pid 40811
xxx sc jobs:
xxx initializer over.
xxx Metric 2 #update_job changed: ["active", "updated_at"] active: true
xxx Metric 2 sc #update_job 0 Rufus::Scheduler 3.2.2 70152969118240
xxx Metric 2 sc #update_job 0 down? false
xxx Metric 2 sc #update_job 0 Process.pid 40872
xxx Metric 2 sc #update_job 0 jobs:
xxx Metric 2 sc #update_job 1 Rufus::Scheduler 3.2.2 70152969118240
xxx Metric 2 sc #update_job 1 down? false
xxx Metric 2 sc #update_job 1 Process.pid 40872
xxx Metric 2 sc #update_job 1 jobs:
xxx Metric 2 Job every_1482336051.9823449_3069051954259909338 scheduled in #update_job
xxx Metric 2 sc #update_job 2 Rufus::Scheduler 3.2.2 70152969118240
xxx Metric 2 sc #update_job 2 down? false
xxx Metric 2 sc #update_job 2 Process.pid 40872
xxx Metric 2 sc #update_job 2 jobs:
xxx Metric 2 sc #update_job 2   0: j: Rufus::Scheduler::EveryJob i: "every_1482336051.9823449_3069051954259909338" oi: 70152971745840 ts: ["2"] frq: 20.0 ua: nil c: 0 nt: 2016-12-21 10:01:11 -0600

Solution

  • When the above code runs for an active metric going inactive I successfully get the Logger messages and load the right job. As well, when I use better errors to inspect the code at that area the job itself sets the @unscheduled_at attribute correctly however the Job keeps getting rescheduled to run again.

    On the other side of it when the update_job method runs and is trying to reschedule the job it never starts.

    So are you trying to say that the Job gets rescheduled but that new, rescheduled, Job is never triggering?

    If I naively look at your code (rehashed it for a better understanding):

    def update_job
    
      if self.changed.present?
        job = SCHEDULER.jobs(tag: self.id).first
        if job.present?
          logger.info "Job #{job.id}, for Metric #{self.id} unscheduled."
          job.unschedule
        end
    
        if self.active
          metric = self
          new_job_id =
            SCHEDULER.every(
              self.frequency,
              :tags => self.id,
              :overlap => false,
              :timeout => self.try(:timeout) || '3m'
            ) { |job| metric.add_value }
          logger.info "Job #{new_job_id} for Metric #{self.id} scheduled."
        end
      end
    end
    

    Could the symptom you describe be resumed to "application never enters the reschedule block of the update_job method"?

    The Metric with ID 2 has a frequency of 20s. It runs twice then gets unscheduled correctly so I know its working properly outside of the model code.

    So it's something in your code model. There is a small chance that it's related with rufus-scheduler.

    Please rephrase carefully your "actual question". In its current version, it is confusing me.

    phase 2 - 2016-12-21

    Could you please try with the code below?

    Warning: it is not tested, it may contain errors you'll certainly spot and fix to make it usable.

    Then try your two scenarii and report, with a log grep of the "xxx " this code is emitting. It might tell us what is going wrong.

    Thanks in advance.

    # initializers/scheduler.rb
    
    require 'rufus-scheduler'
    
    class Rufus::Scheduler
      def to_report_s
        a = []
        a << self.class
        a << Rufus::Scheduler::VERSION
        a << self.object_id
        a << "down? #{self.down?}"
        %w[ at in every internal cron ].each do |flav|
          m = "#{flav}_jobs".to_sym
          a << "#{m}: #{self.send(m).size}"
        end
        a.collect(&:to_s).join(' ')
      end
    end
    class Rufus::Scheduler::Job
      def to_report_s
        "Job: #{self.class} #{@id} (#{self.object_id}) " +
        "Tags: #{@tags.inspect} Frequency: #{@frequency}"
      end
    end
    
    #SCHEDULER = Rufus::Scheduler.new(:lockfile => ".rufus-scheduler.lock")
    SCHEDULER = Rufus::Scheduler.new
    
    # Add "global" error handler to the rufus-scheduler instance
    #
    def SCHEDULER.on_error(job, error)
    
      Rails.logger.error(
        "xxx err#{error.object_id} rufus-scheduler intercepted #{error.inspect}" +
        " in job #{job.to_report_s}")
      error.backtrace.each_with_index do |line, i|
        Rails.logger.error(
          "xxx err#{error.object_id} #{i}: #{line}")
      end
    end
    
    logger.info("xxx SCHEDULER started: #{SCHEDULER.object_id}")
    
    unless (
      defined?(Rails::Console) || File.split($0).last == 'rake' ||
      SCHEDULER.down?
    )
      # launch All Matrics Jobs that are active
    
      Metric.active.each do |metric|
    
        SCHEDULER.every(
          metric.frequency,
          :tags => metric.id,
          :overlap => false,
          :timeout => metric.try(:timeout) || '3m'
        ) do |job|
          logger.info "xxx trigger (scheduled in initializer) #{job.to_report_s}"
          metric.add_value
        end
      end
    end
    

    and

    # models/metric.rb
    
    class Metric < ApplicationRecord
    
      after_save :update_job
    
      private
    
      def update_job
    
        lip = "Metric #{id}" # logger info prefix
        logger.info("xxx #{lip} #update_job active: #{self.active.inspect}")
    
        if self.changed.present?
    
          logger.info("xxx #{lip} #update_job 0 SCHEDULER #{SCHEDULER.to_report_s}")
    
          #job = SCHEDULER.jobs(tag: self.id).first
          #if job.present?
          if job = SCHEDULER.jobs(tag: self.id).first
            logger.info "xxx #{lip} found Job #{job.to_report_s}"
            #SCHEDULER.unschedule(job.id)
            job.unschedule
            logger.info "xxx #{lip} Job #{job.id} unscheduled"
          end
    
          logger.info("xxx #{lip} #update_job 1 SCHEDULER #{SCHEDULER.to_report_s}")
    
          if self.active
            metric = self
            job_id = SCHEDULER.every(
              self.frequency,
              :tags => self.id,
              :overlap => false,
              :timeout => self.try(:timeout) || '3m'
            ) do |job|
              logger.info "xxx trigger (scheduled in #updateJob) #{job.to_report_s}"
              metric.add_value
            end
            logger.info "xxx #{lip} Job #{job_id} scheduled"
          end
    
          logger.info("xxx #{lip} #update_job 2 SCHEDULER #{SCHEDULER.to_report_s}")
        end
      end
    end
    

    phase 3 - 2016-12-21

    Could you please retry with the code below:

    # initializers/scheduler.rb
    
    require 'rufus-scheduler'
    
    class Rufus::Scheduler
      def to_report_a
        a = []
        a << "#{self.class} #{Rufus::Scheduler::VERSION} #{object_id}"
        a << "down? #{self.down?}"
        a << "Process.pid #{Process.pid}"
        a << "jobs:"
        jobs.each_with_index { |job, i| a << "  #{i}: #{job.to_report_s}" }
        a.collect(&:to_s)
      end
    end
    class Rufus::Scheduler::Job
      def to_report_s
        {
          j: self.class, i: @id, oi: object_id, ts: @tags,
          frq: @frequency, ua: @unscheduled_at, c: count, nt: next_time
        }
          .collect { |k, v| "#{k}: #{v.inspect}" }
          .join(' ')
      end
    end
    
    #SCHEDULER = Rufus::Scheduler.new(:lockfile => ".rufus-scheduler.lock")
    SCHEDULER = Rufus::Scheduler.new
    
    # Add "global" error handler to the rufus-scheduler instance
    #
    def SCHEDULER.on_error(job, error)
      lep = "xxx err#{error.object_id}"
      Rails.logger.error(
        "xxx #{lep} rufus-scheduler intercepted #{error.inspect}" +
        " in job #{job.to_report_s}")
      error.backtrace.each_with_index do |line, i|
        Rails.logger.error(
          "xxx #{lep} #{i}: #{line}")
      end
      Rails.logger.error("xxx #{lep} scheduler:")
      SCHEDULER.to_report_a.each { |l| Rails.logger.error("xxx #{lep} #{l}") }
    end
    
    logger.info("xxx SCHEDULER started: #{SCHEDULER.object_id}")
    
    unless (
      defined?(Rails::Console) || File.split($0).last == 'rake' ||
      SCHEDULER.down?
    )
      # launch All Matrics Jobs that are active
    
      Metric.active.each do |metric|
    
        lip = "Metric #{metric.id}" # logger info prefix
    
        job_id =
          SCHEDULER.every(
            metric.frequency,
            :tags => metric.id,
            :overlap => false,
            :timeout => metric.try(:timeout) || '3m'
          ) do |job|
            logger.info(
              "xxx #{lip} trigger (scheduled in initializer) " +
              "#{job.to_report_s}")
            metric.add_value
          end
        logger.info(
          "xxx #{lip} Job #{job_id} scheduled in initializer " +
          "for Metric #{metric.id}")
      end
    
      SCHEDULER.to_report_a.each { |l| logger.info("xxx sc #{l}") }
      logger.info("xxx initializer over.")
    end
    

    and

    # models/metric.rb
    
    class Metric < ApplicationRecord
    
      after_save :update_job
    
      private
    
      def update_job
    
        lip = "Metric #{id}" # logger info prefix
        logger.info(
          "xxx #{lip} #update_job " +
          "changed: #{self.changed.inspect} active: #{self.active.inspect}")
    
        return unless self.changed.present?
    
        SCHEDULER.to_report_a
          .each { |l| logger.info("xxx #{lip} sc #update_job 0 #{l}") }
    
        jobs = SCHEDULER.jobs(tag: self.id)
    
        jobs.each_with_index do |job, i|
          logger.info(
            "xxx #{lip} found Job #{job.to_report_s} #{i == 0 ? 'first' : ''}")
        end
    
        if job = jobs.first
          #SCHEDULER.unschedule(job.id)
          job.unschedule
          logger.info "xxx #{lip} Job #{job.id} unscheduled"
          logger.info "xxx #{lip} Job #{job.id} scheduled? #{SCHEDULER.scheduled?(job.id)}"
        end
    
        SCHEDULER.to_report_a
          .each { |l| logger.info("xxx #{lip} sc #update_job 1 #{l}") }
    
        return unless self.active
    
        metric = self
        job_id = SCHEDULER.every(
          self.frequency,
          :tags => self.id,
          :overlap => false,
          :timeout => self.try(:timeout) || '3m'
        ) do |job|
          logger.info "xxx trigger (scheduled in #update_job) #{job.to_report_s}"
          metric.add_value
        end
        logger.info "xxx #{lip} Job #{job_id} scheduled in #update_job"
    
        SCHEDULER.to_report_a
          .each { |l| logger.info("xxx #{lip} sc #update_job 2 #{l}") }
      end
    end
    

    phase 4 - 2016-12-22

    Added phase 3 logs to question. It looks like some how there is a new scheduler process that is subsequently created and then destroyed inside the model code. Thanks again for your diligence on this!

    Is that really happening in the model code? Your logs tell us that it happens in another process. Your initial Ruby process instantiates rufus-scheduler then your HTTP requests are served in worker processes which are forks of your initial process (without the threads, in other words with inactive schedulers).

    You're using Puma in clustered mode. I should have immediately asked you about your configuration.

    Read carefully its documentation at https://github.com/puma/puma#configuration

    An easy fix would be not to use the clustered mode so that there is only one Ruby process involved, serving all the HTTP requests.

    On the other hand, if you need the clustered mode, you have to change your way of thinking. You probably don't want to have 1 rufus-scheduler instance per worker thread. You could focus on having the core (live) rufus-scheduler in the main process. It could have a "management" job that checks recently updated metrics and unschedules/schedules jobs.

    SCHEDULER.every '10s', overlap: false do
      Metric.recently_updated.each do |metric|
        SCHEDULER.jobs(tags: metric.id).each(&:unschedule)
        SCHEDULER.every(metric.frequency, tags: self.id) { metric.add_value }
      end
    end
      # or something like that...
    

    Have fun!