rubyasynchronousconcurrencyfibersruby-3

Ruby 3 collecting results from multiple scheduled fibers


Ruby 3 introduced Fiber.schedule to dispatch async tasks concurrently.

Similar to what's being asked in this question (which is about threaded concurrency) I would like a way to start multiple concurrent tasks on the fiber scheduler and once they have all been scheduled wait for their combined result, sort of equivalent to Promise.all in JavaScript.

I can come up with this naive way:

require 'async'

def io_work(t)
  sleep t
  :ok
end

Async do
  results = []

  [0.1, 0.3, 'cow'].each_with_index do |t, i|
    n = i + 1
    Fiber.schedule do
      puts "Starting fiber #{n}\n"
      result = io_work t
      puts "Done working for #{t} seconds in fiber #{n}"
      results << [n, result]
    rescue
      puts "Execution failed in fiber #{n}"
      results << [n, :error]
    end
  end

  # await combined results
  sleep 0.1 until results.size >= 3

  puts "Results: #{results}"
end

Is there a simpler construct that will do the same?


Solution

  • Since Async tasks are already scheduled I am not sure you need all of that.

    If you just want to wait for all the items to finish you can use an Async::Barrier

    Example:

    require 'async'
    require 'async/barrier'
    
    
    def io_work(t)
      sleep t
      :ok
    end
    
    Async do
      barrier = Async::Barrier.new
      results = []
      [1, 0.3, 'cow'].each.with_index(1) do |data, idx|
        barrier.async do 
          results << begin
            puts "Starting task #{idx}\n"
            result = io_work data
            puts "Done working for #{data} seconds in task #{idx}"
            [idx,result]
          rescue
            puts "Execution failed in task #{idx}"
            [idx, :error]
          end          
        end 
      end
      barrier.wait
      puts "Results: #{results}"
    end
    

    Based on the sleep values this will output

    Starting task 1
    Starting task 2
    Starting task 3
    Execution failed in task 3
    Done working for 0.3 seconds in task 2
    Done working for 1 seconds in task 1
    Results: [[3, :error], [2, :ok], [1, :ok]]
    

    The barrier.wait will wait until all the asynchronous tasks are complete, without it the output would look like

    Starting fiber 1
    Starting fiber 2
    Starting fiber 3
    Execution failed in fiber 3
    Results: [[3, :error]]
    Done working for 0.3 seconds in fiber 2
    Done working for 1 seconds in fiber 1