After Reading the Rufus Scheduler Docs about 100 times, every StackOverflow question I can find, looking at the source code, and finally reading how to report bugs effectively twice. I have come to the point where @jmettraux I need your help:
I am writing an internal application that allows my team to create metrics that periodically run code that saves values to a database. I have the following setup.
schema.rb
create_table "metrics", force: :cascade do |t|
t.string "frequency"
t.string "name"
t.text "data"
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.enter code hereboolean "active"
end
initializers/scheduler.rb
require 'rufus-scheduler'
class Rufus::Scheduler::Job
def report
logger = Logger.new(STDOUT)
logger.info "Job: #{@id} Tags: #{@tags} Frequency: #{@frequency}"
end
end
SCHEDULER = Rufus::Scheduler.new(:lockfile => ".rufus-scheduler.lock")
unless defined?(Rails::Console) || File.split($0).last == 'rake'
#Launch All Matrics Jobs that are active
@metrics = Metric.active
unless SCHEDULER.down?
@metrics.each_with_index do | metric, index |
SCHEDULER.every metric.frequency, :tags => metric.id, :overlap => false, :timeout => metric.try(:timeout) || '3m' do | job |
metric.add_value
job.report
end
end
end
end
The code above works flawlessly and Rufus initializes properly and all the jobs start as they should.
Where I am running into a problem is that once the app is running I need to be able to unschedule jobs and reschedule them when a Metric is saved. I use the Rufus tags to assign a tag with the ID of the Metric when they are created and I'm using that to load them for unscheduling. Here is my current metric model with the relevant code.
models/metric.rb
class Metric < ApplicationRecord
...
after_save :update_job
private
def update_job
if self.changed.present?
job = SCHEDULER.jobs(tag: self.id).first
if job.present?
logger.info "Job #{job.id}, for Metric #{self.id} unscheduled."
SCHEDULER.unschedule(job.id)
end
if self.active
metric = self
new_job = SCHEDULER.every self.frequency, :tags => self.id, :overlap => false, :timeout => self.try(:timeout) || '3m' do | job |
metric.add_value
end
logger.info "Job #{new_job} for Metric #{self.id} scheduled."
end
end
end
end
Scenario 1:
- Application Starts
- scheduler initilizer loads all active metrics
- scheduler initilizer starts all active metrics as 'every' job
- User edits an active metric setting it to inactive
- The update_job method is ran after the metric is saved
- inside the update_job method the job for the current metric is found successfully
- the unscheudle method is called on the job
- PROBLEM HERE: The job runs again even after being unscheduled
Scenario 2:
- Application Starts
- scheduler initilizer loads all active metrics
- scheduler initilizer starts all active metrics as 'every' job
- User edits an inactive metric setting it to active
- The update_job method is ran after the metric is saved
- inside the update_job method a new 'every' job is scheduled for the metric
- PROBLEM HERE: The job never gets started
I have tried adding the following code to the bottom of the initializer to ensure that the unschedule method works properly in my app.
sleep 50
job = SCHEDULER.jobs(tag: 2).first
SCHEDULER.unschedule(job)
The Metric with ID 2 has a frequency of 20s. It runs twice then gets unscheduled correctly so I know its working properly outside of the model code.
Scenario 1:
xxx SCHEDULER started: 70168058449780
xxx trigger (scheduled in initializer) Job: Rufus::Scheduler::EveryJob every_1482273018.01852_1684784125738549751 (70168045492220) Tags: ["2"] Frequency: 20.0
xxx Metric 2 #update_job active: false
xxx Metric 2 #update_job 0 SCHEDULER Rufus::Scheduler 3.2.2 70168058449780 down? false at_jobs: 0 in_jobs: 0 every_jobs: 2 interval_jobs: 0 cron_jobs: 0
xxx Metric 2 found Job Job: Rufus::Scheduler::EveryJob every_1482273018.01852_1684784125738549751 (70168045492220) Tags: ["2"] Frequency: 20.0
xxx Metric 2 Job every_1482273018.01852_1684784125738549751 unscheduled
xxx Metric 2 #update_job 1 SCHEDULER Rufus::Scheduler 3.2.2 70168058449780 down? false at_jobs: 0 in_jobs: 0 every_jobs: 1 interval_jobs: 0 cron_jobs: 0
xxx Metric 2 #update_job 2 SCHEDULER Rufus::Scheduler 3.2.2 70168058449780 down? false at_jobs: 0 in_jobs: 0 every_jobs: 1 interval_jobs: 0 cron_jobs: 0
xxx trigger (scheduled in initializer) Job: Rufus::Scheduler::EveryJob every_1482273018.01852_1684784125738549751 (70168045492220) Tags: ["2"] Frequency: 20.0
xxx trigger (scheduled in initializer) Job: Rufus::Scheduler::EveryJob every_1482273018.01852_1684784125738549751 (70168045492220) Tags: ["2"] Frequency: 20.0
Scenario 2:
xxx SCHEDULER started: 70272158805840
xxx Metric 2 #update_job active: true
xxx Metric 2 #update_job 0 SCHEDULER Rufus::Scheduler 3.2.2 70272158805840 down? false at_jobs: 0 in_jobs: 0 every_jobs: 1 interval_jobs: 0 cron_jobs: 0
xxx Metric 2 #update_job 1 SCHEDULER Rufus::Scheduler 3.2.2 70272158805840 down? false at_jobs: 0 in_jobs: 0 every_jobs: 1 interval_jobs: 0 cron_jobs: 0
xxx Metric 2 Job every_1482273159.799382_1489585858424637616 scheduled
xxx Metric 2 #update_job 2 SCHEDULER Rufus::Scheduler 3.2.2 70272158805840 down? false at_jobs: 0 in_jobs: 0 every_jobs: 2 interval_jobs: 0 cron_jobs: 0
Scenario 1:
xxx SCHEDULER started: 70321240613760
xxx sc Rufus::Scheduler 3.2.2 70321240613760
xxx sc down? false
xxx sc Process.pid 39975
xxx sc jobs:
xxx sc 0: j: Rufus::Scheduler::EveryJob i: "every_1482335616.453129_4198855636205920237" oi: 70321198587560 ts: ["2"] frq: 20.0 ua: nil c: 0 nt: 2016-12-21 09:53:56 -0600
xxx initializer over.
xxx Metric 2 Job every_1482335616.453129_4198855636205920237 scheduled in initializer for Metric 2
xxx Metric 2 Job every_1482335616.453129_4198855636205920237 scheduled in initializer for Metric 2
xxx Metric 2 Job every_1482335616.453129_4198855636205920237 scheduled in initializer for Metric 2
xxx Metric 2 #update_job changed: ["active", "updated_at"] active: false
xxx Metric 2 sc #update_job 0 Rufus::Scheduler 3.2.2 70321240613760
xxx Metric 2 sc #update_job 0 down? false
xxx Metric 2 sc #update_job 0 Process.pid 40037
xxx Metric 2 sc #update_job 0 jobs:
xxx Metric 2 sc #update_job 0 0: j: Rufus::Scheduler::EveryJob i: "every_1482335616.453129_4198855636205920237" oi: 70321198587560 ts: ["2"] frq: 20.0 ua: nil c: 0 nt: 2016-12-21 09:53:56 -0600
xxx Metric 2 found Job j: Rufus::Scheduler::EveryJob i: "every_1482335616.453129_4198855636205920237" oi: 70321198587560 ts: ["2"] frq: 20.0 ua: nil c: 0 nt: 2016-12-21 09:53:56 -0600 first
xxx Metric 2 Job every_1482335616.453129_4198855636205920237 unscheduled
xxx Metric 2 Job every_1482335616.453129_4198855636205920237 scheduled? false
xxx Metric 2 sc #update_job 1 Rufus::Scheduler 3.2.2 70321240613760
xxx Metric 2 sc #update_job 1 down? false
xxx Metric 2 sc #update_job 1 Process.pid 40037
xxx Metric 2 sc #update_job 1 jobs:
xxx Metric 2 Job every_1482335616.453129_4198855636205920237 scheduled in initializer for Metric 2
Scenario 2:
xxx SCHEDULER started: 70152969118240
xxx sc Rufus::Scheduler 3.2.2 70152969118240
xxx sc down? false
xxx sc Process.pid 40811
xxx sc jobs:
xxx initializer over.
xxx Metric 2 #update_job changed: ["active", "updated_at"] active: true
xxx Metric 2 sc #update_job 0 Rufus::Scheduler 3.2.2 70152969118240
xxx Metric 2 sc #update_job 0 down? false
xxx Metric 2 sc #update_job 0 Process.pid 40872
xxx Metric 2 sc #update_job 0 jobs:
xxx Metric 2 sc #update_job 1 Rufus::Scheduler 3.2.2 70152969118240
xxx Metric 2 sc #update_job 1 down? false
xxx Metric 2 sc #update_job 1 Process.pid 40872
xxx Metric 2 sc #update_job 1 jobs:
xxx Metric 2 Job every_1482336051.9823449_3069051954259909338 scheduled in #update_job
xxx Metric 2 sc #update_job 2 Rufus::Scheduler 3.2.2 70152969118240
xxx Metric 2 sc #update_job 2 down? false
xxx Metric 2 sc #update_job 2 Process.pid 40872
xxx Metric 2 sc #update_job 2 jobs:
xxx Metric 2 sc #update_job 2 0: j: Rufus::Scheduler::EveryJob i: "every_1482336051.9823449_3069051954259909338" oi: 70152971745840 ts: ["2"] frq: 20.0 ua: nil c: 0 nt: 2016-12-21 10:01:11 -0600
When the above code runs for an active metric going inactive I successfully get the Logger messages and load the right job. As well, when I use better errors to inspect the code at that area the job itself sets the @unscheduled_at attribute correctly however the Job keeps getting rescheduled to run again.
On the other side of it when the update_job method runs and is trying to reschedule the job it never starts.
So are you trying to say that the Job gets rescheduled but that new, rescheduled, Job is never triggering?
If I naively look at your code (rehashed it for a better understanding):
def update_job
if self.changed.present?
job = SCHEDULER.jobs(tag: self.id).first
if job.present?
logger.info "Job #{job.id}, for Metric #{self.id} unscheduled."
job.unschedule
end
if self.active
metric = self
new_job_id =
SCHEDULER.every(
self.frequency,
:tags => self.id,
:overlap => false,
:timeout => self.try(:timeout) || '3m'
) { |job| metric.add_value }
logger.info "Job #{new_job_id} for Metric #{self.id} scheduled."
end
end
end
Could the symptom you describe be resumed to "application never enters the reschedule block of the update_job method"?
The Metric with ID 2 has a frequency of 20s. It runs twice then gets unscheduled correctly so I know its working properly outside of the model code.
So it's something in your code model. There is a small chance that it's related with rufus-scheduler.
Please rephrase carefully your "actual question". In its current version, it is confusing me.
phase 2 - 2016-12-21
Could you please try with the code below?
Warning: it is not tested, it may contain errors you'll certainly spot and fix to make it usable.
Then try your two scenarii and report, with a log grep of the "xxx " this code is emitting. It might tell us what is going wrong.
Thanks in advance.
# initializers/scheduler.rb
require 'rufus-scheduler'
class Rufus::Scheduler
def to_report_s
a = []
a << self.class
a << Rufus::Scheduler::VERSION
a << self.object_id
a << "down? #{self.down?}"
%w[ at in every internal cron ].each do |flav|
m = "#{flav}_jobs".to_sym
a << "#{m}: #{self.send(m).size}"
end
a.collect(&:to_s).join(' ')
end
end
class Rufus::Scheduler::Job
def to_report_s
"Job: #{self.class} #{@id} (#{self.object_id}) " +
"Tags: #{@tags.inspect} Frequency: #{@frequency}"
end
end
#SCHEDULER = Rufus::Scheduler.new(:lockfile => ".rufus-scheduler.lock")
SCHEDULER = Rufus::Scheduler.new
# Add "global" error handler to the rufus-scheduler instance
#
def SCHEDULER.on_error(job, error)
Rails.logger.error(
"xxx err#{error.object_id} rufus-scheduler intercepted #{error.inspect}" +
" in job #{job.to_report_s}")
error.backtrace.each_with_index do |line, i|
Rails.logger.error(
"xxx err#{error.object_id} #{i}: #{line}")
end
end
logger.info("xxx SCHEDULER started: #{SCHEDULER.object_id}")
unless (
defined?(Rails::Console) || File.split($0).last == 'rake' ||
SCHEDULER.down?
)
# launch All Matrics Jobs that are active
Metric.active.each do |metric|
SCHEDULER.every(
metric.frequency,
:tags => metric.id,
:overlap => false,
:timeout => metric.try(:timeout) || '3m'
) do |job|
logger.info "xxx trigger (scheduled in initializer) #{job.to_report_s}"
metric.add_value
end
end
end
and
# models/metric.rb
class Metric < ApplicationRecord
after_save :update_job
private
def update_job
lip = "Metric #{id}" # logger info prefix
logger.info("xxx #{lip} #update_job active: #{self.active.inspect}")
if self.changed.present?
logger.info("xxx #{lip} #update_job 0 SCHEDULER #{SCHEDULER.to_report_s}")
#job = SCHEDULER.jobs(tag: self.id).first
#if job.present?
if job = SCHEDULER.jobs(tag: self.id).first
logger.info "xxx #{lip} found Job #{job.to_report_s}"
#SCHEDULER.unschedule(job.id)
job.unschedule
logger.info "xxx #{lip} Job #{job.id} unscheduled"
end
logger.info("xxx #{lip} #update_job 1 SCHEDULER #{SCHEDULER.to_report_s}")
if self.active
metric = self
job_id = SCHEDULER.every(
self.frequency,
:tags => self.id,
:overlap => false,
:timeout => self.try(:timeout) || '3m'
) do |job|
logger.info "xxx trigger (scheduled in #updateJob) #{job.to_report_s}"
metric.add_value
end
logger.info "xxx #{lip} Job #{job_id} scheduled"
end
logger.info("xxx #{lip} #update_job 2 SCHEDULER #{SCHEDULER.to_report_s}")
end
end
end
phase 3 - 2016-12-21
Could you please retry with the code below:
# initializers/scheduler.rb
require 'rufus-scheduler'
class Rufus::Scheduler
def to_report_a
a = []
a << "#{self.class} #{Rufus::Scheduler::VERSION} #{object_id}"
a << "down? #{self.down?}"
a << "Process.pid #{Process.pid}"
a << "jobs:"
jobs.each_with_index { |job, i| a << " #{i}: #{job.to_report_s}" }
a.collect(&:to_s)
end
end
class Rufus::Scheduler::Job
def to_report_s
{
j: self.class, i: @id, oi: object_id, ts: @tags,
frq: @frequency, ua: @unscheduled_at, c: count, nt: next_time
}
.collect { |k, v| "#{k}: #{v.inspect}" }
.join(' ')
end
end
#SCHEDULER = Rufus::Scheduler.new(:lockfile => ".rufus-scheduler.lock")
SCHEDULER = Rufus::Scheduler.new
# Add "global" error handler to the rufus-scheduler instance
#
def SCHEDULER.on_error(job, error)
lep = "xxx err#{error.object_id}"
Rails.logger.error(
"xxx #{lep} rufus-scheduler intercepted #{error.inspect}" +
" in job #{job.to_report_s}")
error.backtrace.each_with_index do |line, i|
Rails.logger.error(
"xxx #{lep} #{i}: #{line}")
end
Rails.logger.error("xxx #{lep} scheduler:")
SCHEDULER.to_report_a.each { |l| Rails.logger.error("xxx #{lep} #{l}") }
end
logger.info("xxx SCHEDULER started: #{SCHEDULER.object_id}")
unless (
defined?(Rails::Console) || File.split($0).last == 'rake' ||
SCHEDULER.down?
)
# launch All Matrics Jobs that are active
Metric.active.each do |metric|
lip = "Metric #{metric.id}" # logger info prefix
job_id =
SCHEDULER.every(
metric.frequency,
:tags => metric.id,
:overlap => false,
:timeout => metric.try(:timeout) || '3m'
) do |job|
logger.info(
"xxx #{lip} trigger (scheduled in initializer) " +
"#{job.to_report_s}")
metric.add_value
end
logger.info(
"xxx #{lip} Job #{job_id} scheduled in initializer " +
"for Metric #{metric.id}")
end
SCHEDULER.to_report_a.each { |l| logger.info("xxx sc #{l}") }
logger.info("xxx initializer over.")
end
and
# models/metric.rb
class Metric < ApplicationRecord
after_save :update_job
private
def update_job
lip = "Metric #{id}" # logger info prefix
logger.info(
"xxx #{lip} #update_job " +
"changed: #{self.changed.inspect} active: #{self.active.inspect}")
return unless self.changed.present?
SCHEDULER.to_report_a
.each { |l| logger.info("xxx #{lip} sc #update_job 0 #{l}") }
jobs = SCHEDULER.jobs(tag: self.id)
jobs.each_with_index do |job, i|
logger.info(
"xxx #{lip} found Job #{job.to_report_s} #{i == 0 ? 'first' : ''}")
end
if job = jobs.first
#SCHEDULER.unschedule(job.id)
job.unschedule
logger.info "xxx #{lip} Job #{job.id} unscheduled"
logger.info "xxx #{lip} Job #{job.id} scheduled? #{SCHEDULER.scheduled?(job.id)}"
end
SCHEDULER.to_report_a
.each { |l| logger.info("xxx #{lip} sc #update_job 1 #{l}") }
return unless self.active
metric = self
job_id = SCHEDULER.every(
self.frequency,
:tags => self.id,
:overlap => false,
:timeout => self.try(:timeout) || '3m'
) do |job|
logger.info "xxx trigger (scheduled in #update_job) #{job.to_report_s}"
metric.add_value
end
logger.info "xxx #{lip} Job #{job_id} scheduled in #update_job"
SCHEDULER.to_report_a
.each { |l| logger.info("xxx #{lip} sc #update_job 2 #{l}") }
end
end
phase 4 - 2016-12-22
Added phase 3 logs to question. It looks like some how there is a new scheduler process that is subsequently created and then destroyed inside the model code. Thanks again for your diligence on this!
Is that really happening in the model code? Your logs tell us that it happens in another process. Your initial Ruby process instantiates rufus-scheduler then your HTTP requests are served in worker processes which are forks of your initial process (without the threads, in other words with inactive schedulers).
You're using Puma in clustered mode. I should have immediately asked you about your configuration.
Read carefully its documentation at https://github.com/puma/puma#configuration
An easy fix would be not to use the clustered mode so that there is only one Ruby process involved, serving all the HTTP requests.
On the other hand, if you need the clustered mode, you have to change your way of thinking. You probably don't want to have 1 rufus-scheduler instance per worker thread. You could focus on having the core (live) rufus-scheduler in the main process. It could have a "management" job that checks recently updated metrics and unschedules/schedules jobs.
SCHEDULER.every '10s', overlap: false do
Metric.recently_updated.each do |metric|
SCHEDULER.jobs(tags: metric.id).each(&:unschedule)
SCHEDULER.every(metric.frequency, tags: self.id) { metric.add_value }
end
end
# or something like that...
Have fun!