I'm trying to understand (and accomplish) "interweaving" concurrency using ruby Fibers. To do that - I tried to implement my own scheduler but I hit a roadblock.
Here's the code to test:
f1 = Fiber.new do
puts "1: start"
sleep 2
puts "1: almost"
sleep 2
puts "1: done"
end.resume
f2 = Fiber.new do
puts "2: start"
sleep 2
puts "2: almost"
sleep 2
puts "2: done"
end.resume
The theory is: without scheduler, the fibers are run sequentially, and the whole program takes 8s.
If I manage to implement a scheduler that can pass on control to another thread on IO (sleep
in this case), the fibers should wait together, and the whole program should take ~4s.
So, I partially implemented my silly scheduler:
class S
def initialize
@fibers = []
end
def block(blocker, timeout = nil)
fiber = Fiber.current
puts "block", blocker.inspect, timeout.inspect, fiber.inspect
@fibers << fiber
end
def unblock(blocker, fiber)
puts "unblock", blocker.inspect, fiber.inspect
@fibers.reject!{ _1 == fiber} # remove the fiber
fiber.resume
end
def fiber(&block)
Fiber.new(blocking: false, &block).tap(&:resume)
end
def kernel_sleep(duration = nil)
@fibers << Fiber.current
end
def io_wait(io, events, timeout)
puts "io_wait not implemented"
end
end
I added this part before my test fibers:
Fiber.set_scheduler(S.new) if ENV['SCHEDULER'] == 'yes'
And the results completely surprised me: The fibers are not sleeping at all!
time SCHEDULER=yes ruby fiber-non-blocking-io.rb
1: start
1: almost
1: done
2: start
2: almost
2: done
SCHEDULER=yes ruby fiber-non-blocking-io.rb 0.05s user 0.03s system 99% cpu 0.083 total
What is going on here? I clearly don't understand how the scheduler is supposed to work.
My mental model was something like this (let's focus on kernel_sleep/sleep for simplicity):
kernel_sleep
is called, and I store it for later
2a. (I didn't get to implementing Fiber.yield
to pass back the control)(The fact that the puts
in the scheduler's code is not showing up also does not help. Clearly, the scheduler does something because the behavior changed, but I can't see those outputs)
Ok, I think I managed to improve my understanding.
What is going on here?
One can (conceptually) think of kernel_sleep
as an inline replacement of sleep()
inside the Fiber. So if its implementation does not implement some other way to sleep, it will not do it.
Thanks for Max's answer (please give it an upvote)
So, a better implementation looks like this:
def kernel_sleep(duration = nil)
wait_till = Time.now + duration
fiber = Fiber.current
debug "kernel_sleep(#{duration.inspect})", fiber.inspect
# Store the fiber together with the timestamp when it should be resumed
@sleeping << [fiber, wait_till]
# pass control to other fibers (a.k.a pause it)
Fiber.yield
end
I clearly don't understand how the scheduler is supposed to work.
So my block was the idea that the scheduler will somehow "circle back" later to all those paused fibers - it will not. It's completely up to us to ensure all fibers were resumed enough times.
So, adding another method to the scheduler like this one:
def run
while !@sleeping.empty?
ready_index = @sleeping.find_index { |_fiber, time| time <= Time.now }
next unless ready_index
ready = @sleeping.delete_at(ready_index)[0]
debug("resume #{ready}")
ready.resume
end
end
And then calling it after the the sleeping fibers were resumed for the first time is the part that will make it work as expected:
Fiber.set_scheduler(S.new) if ENV['SCHEDULER'] == 'yes'
f1 = Fiber.new do
debug "1: start"
sleep 2
debug "1: almost"
sleep 2
debug "1: done"
end.resume
f2 = Fiber.new do
debug "2: start"
sleep 2
debug "2: almost"
sleep 2
debug "2: done"
end.resume
Fiber.scheduler.run
The whole program now runs for 4 seconds, and each fiber sleeps for total of 4 seconds:
time SCHEDULER=yes ruby fiber-non-blocking-io.rb
1: start
kernel_sleep(2)
#<Fiber:0x000078af88f5f590 fiber-non-blocking-io.rb:43 (resumed)>
2: start
kernel_sleep(2)
#<Fiber:0x000078af88f5ef00 fiber-non-blocking-io.rb:51 (resumed)>
resume #<Fiber:0x000078af88f5f590 fiber-non-blocking-io.rb:43 (suspended)>
1: almost
kernel_sleep(2)
#<Fiber:0x000078af88f5f590 fiber-non-blocking-io.rb:43 (resumed)>
resume #<Fiber:0x000078af88f5ef00 fiber-non-blocking-io.rb:51 (suspended)>
2: almost
kernel_sleep(2)
#<Fiber:0x000078af88f5ef00 fiber-non-blocking-io.rb:51 (resumed)>
resume #<Fiber:0x000078af88f5f590 fiber-non-blocking-io.rb:43 (suspended)>
1: done
resume #<Fiber:0x000078af88f5ef00 fiber-non-blocking-io.rb:51 (suspended)>
2: done
SCHEDULER=yes ruby fiber-non-blocking-io.rb 4.04s user 0.02s system 99% cpu 4.066 total
Here's the full (not really, it works only with sleep
and no other blocking IO) example:
require 'fiber'
def debug(...)
# return unless ENV['DEBUG'] == 'yes'
puts(...)
end
class S
def initialize
@sleeping = []
end
# The following are requried by the interfacem but don't matter in this example
def block(blocker, timeout = nil)= raise "Not implemented"
def unblock(blocker, fiber)= raise "Not implemented"
def fiber(&block)= Fiber.new(blocking: false, &block).tap(&:resume)
def kernel_sleep(duration = nil)
wait_till = Time.now + duration
fiber = Fiber.current
debug "kernel_sleep(#{duration.inspect})", fiber.inspect
@sleeping << [fiber, wait_till]
Fiber.yield
end
def io_wait(io, events, timeout)
raise "io_wait not implemented"
end
def run
while !@sleeping.empty?
ready_index = @sleeping.find_index { |_fiber, time| time <= Time.now }
next unless ready_index
ready = @sleeping.delete_at(ready_index)[0]
debug("resume #{ready}")
ready.resume
end
end
end
Fiber.set_scheduler(S.new) if ENV['SCHEDULER'] == 'yes'
f1 = Fiber.new do
debug "1: start"
sleep 2
debug "1: almost"
sleep 2
debug "1: done"
end.resume
f2 = Fiber.new do
debug "2: start"
sleep 2
debug "2: almost"
sleep 2
debug "2: done"
end.resume
Fiber.scheduler.run