rubyfibers

Why are ruby fibers instantaneous when I set my silly scheduler?


I'm trying to understand (and accomplish) "interweaving" concurrency using ruby Fibers. To do that - I tried to implement my own scheduler but I hit a roadblock.

Here's the code to test:

f1 = Fiber.new do
  puts "1: start"
  sleep 2
  puts "1: almost" 
  sleep 2
  puts "1: done"
end.resume

f2 = Fiber.new do
  puts "2: start"
  sleep 2
  puts "2: almost" 
  sleep 2
  puts "2: done"
end.resume

The theory is: without scheduler, the fibers are run sequentially, and the whole program takes 8s. If I manage to implement a scheduler that can pass on control to another thread on IO (sleep in this case), the fibers should wait together, and the whole program should take ~4s.

So, I partially implemented my silly scheduler:

class S
  def initialize
    @fibers = []    
  end

  def block(blocker, timeout = nil) 
    fiber = Fiber.current
    puts "block", blocker.inspect, timeout.inspect, fiber.inspect
    @fibers << fiber
  end

  def unblock(blocker, fiber) 
    puts "unblock", blocker.inspect, fiber.inspect
    @fibers.reject!{ _1 == fiber} # remove the fiber
    fiber.resume
  end

  def fiber(&block)
    Fiber.new(blocking: false, &block).tap(&:resume)
  end

  def kernel_sleep(duration = nil) 
    @fibers << Fiber.current
  end

  def io_wait(io, events, timeout) 
    puts "io_wait not implemented"
  end
end

I added this part before my test fibers:

Fiber.set_scheduler(S.new) if ENV['SCHEDULER'] == 'yes'

And the results completely surprised me: The fibers are not sleeping at all!

time SCHEDULER=yes ruby fiber-non-blocking-io.rb
1: start
1: almost
1: done
2: start
2: almost
2: done
SCHEDULER=yes ruby fiber-non-blocking-io.rb  0.05s user 0.03s system 99% cpu 0.083 total

What is going on here? I clearly don't understand how the scheduler is supposed to work.

My mental model was something like this (let's focus on kernel_sleep/sleep for simplicity):

  1. My fiber is resumed, outputs the first message, and hits sleep
  2. scheduler's kernel_sleep is called, and I store it for later 2a. (I didn't get to implementing Fiber.yield to pass back the control)
  3. At some point, I should iterate over my sleeping fibers and check if it's time to resume them, but I didn't get to that part.

(The fact that the puts in the scheduler's code is not showing up also does not help. Clearly, the scheduler does something because the behavior changed, but I can't see those outputs)


Solution

  • Ok, I think I managed to improve my understanding.

    What is going on here?

    One can (conceptually) think of kernel_sleep as an inline replacement of sleep() inside the Fiber. So if its implementation does not implement some other way to sleep, it will not do it.

    Thanks for Max's answer (please give it an upvote)

    So, a better implementation looks like this:

    def kernel_sleep(duration = nil)
        wait_till = Time.now + duration
        fiber = Fiber.current
        debug "kernel_sleep(#{duration.inspect})", fiber.inspect
    
        # Store the fiber together with the timestamp when it should be resumed
        @sleeping << [fiber, wait_till]
        # pass control to other fibers (a.k.a pause it) 
        Fiber.yield
      end
    

    I clearly don't understand how the scheduler is supposed to work.

    So my block was the idea that the scheduler will somehow "circle back" later to all those paused fibers - it will not. It's completely up to us to ensure all fibers were resumed enough times.

    So, adding another method to the scheduler like this one:

    def run
        while !@sleeping.empty? 
          ready_index = @sleeping.find_index { |_fiber, time| time <= Time.now }
          next unless ready_index 
          ready = @sleeping.delete_at(ready_index)[0]
          debug("resume #{ready}")
          ready.resume
        end
      end
    

    And then calling it after the the sleeping fibers were resumed for the first time is the part that will make it work as expected:

    Fiber.set_scheduler(S.new) if ENV['SCHEDULER'] == 'yes'
    
    f1 = Fiber.new do
      debug "1: start"
      sleep 2
      debug "1: almost" 
      sleep 2
      debug "1: done"
    end.resume
    
    f2 = Fiber.new do
      debug "2: start"
      sleep 2
      debug "2: almost" 
      sleep 2
      debug "2: done"
    end.resume
    
    Fiber.scheduler.run
    

    The whole program now runs for 4 seconds, and each fiber sleeps for total of 4 seconds:

    time SCHEDULER=yes ruby fiber-non-blocking-io.rb
    1: start
    kernel_sleep(2)
    #<Fiber:0x000078af88f5f590 fiber-non-blocking-io.rb:43 (resumed)>
    2: start
    kernel_sleep(2)
    #<Fiber:0x000078af88f5ef00 fiber-non-blocking-io.rb:51 (resumed)>
    resume #<Fiber:0x000078af88f5f590 fiber-non-blocking-io.rb:43 (suspended)>
    1: almost
    kernel_sleep(2)
    #<Fiber:0x000078af88f5f590 fiber-non-blocking-io.rb:43 (resumed)>
    resume #<Fiber:0x000078af88f5ef00 fiber-non-blocking-io.rb:51 (suspended)>
    2: almost
    kernel_sleep(2)
    #<Fiber:0x000078af88f5ef00 fiber-non-blocking-io.rb:51 (resumed)>
    resume #<Fiber:0x000078af88f5f590 fiber-non-blocking-io.rb:43 (suspended)>
    1: done
    resume #<Fiber:0x000078af88f5ef00 fiber-non-blocking-io.rb:51 (suspended)>
    2: done
    SCHEDULER=yes ruby fiber-non-blocking-io.rb  4.04s user 0.02s system 99% cpu 4.066 total
    

    Here's the full (not really, it works only with sleep and no other blocking IO) example:

    require 'fiber' 
    
    def debug(...)
      # return unless ENV['DEBUG'] == 'yes'
      puts(...) 
    end
    
    class S
      def initialize
        @sleeping = []
      end
    
      # The following are requried by the interfacem but don't matter in this example
      def block(blocker, timeout = nil)= raise "Not implemented"
      def unblock(blocker, fiber)= raise "Not implemented"
      def fiber(&block)= Fiber.new(blocking: false, &block).tap(&:resume)
    
      def kernel_sleep(duration = nil)
        wait_till = Time.now + duration
        fiber = Fiber.current
        debug "kernel_sleep(#{duration.inspect})", fiber.inspect
    
        @sleeping << [fiber, wait_till]
        Fiber.yield
      end
    
      def io_wait(io, events, timeout) 
        raise "io_wait not implemented"
      end
    
      def run
        while !@sleeping.empty? 
          ready_index = @sleeping.find_index { |_fiber, time| time <= Time.now }
          next unless ready_index 
          ready = @sleeping.delete_at(ready_index)[0]
          debug("resume #{ready}")
          ready.resume
        end
      end
    end
    
    Fiber.set_scheduler(S.new) if ENV['SCHEDULER'] == 'yes'
    
    f1 = Fiber.new do
      debug "1: start"
      sleep 2
      debug "1: almost" 
      sleep 2
      debug "1: done"
    end.resume
    
    f2 = Fiber.new do
      debug "2: start"
      sleep 2
      debug "2: almost" 
      sleep 2
      debug "2: done"
    end.resume
    
    Fiber.scheduler.run