rubycelluloidruby-2.2

Celluloid async inside ruby blocks does not work


Trying to implement Celluloid async on my working example seem to exhibit weird behavior.

here my code looks

 class Indefinite
    include Celluloid

      def run!
         loop do 
           [1].each do |i|
             async.on_background
           end
         end
      end 


       def on_background
         puts "Running in background" 
       end
   end

   Indefinite.new.run!

but when I run the above code, I never see the puts "Running in Background"

But, if I put a sleep the code seem to work.

class Indefinite
   include Celluloid

    def run! 
      loop do 
        [1].each do |i|
          async.on_background
        end
        sleep 0.5
      end 
    end


   def on_background
     puts "Running in background" 
   end
 end

 Indefinite.new.run!

Any idea? why such a difference in the above two scenario.

Thanks.


Solution

  • Your main loop is dominating the actor/application's threads.

    All your program is doing is spawning background processes, but never running them. You need that sleep in the loop purely to allow the background threads to get attention.

    It is not usually a good idea to have an unconditional loop spawn infinite background processes like you have here. There ought to be either a delay, or a conditional statement put in there... otherwise you just have an infinite loop spawning things that never get invoked.

    Think about it like this: if you put puts "looping" just inside your loop, while you do not see Running in the background ... you will see looping over and over and over.


    Approach #1: Use every or after blocks.

    The best way to fix this is not to use sleep inside a loop, but to use an after or every block, like this:

    every(0.1) {
        on_background
    }
    

    Or best of all, if you want to make sure the process runs completely before running again, use after instead:

    def run_method
        @running ||= false
        unless @running
            @running = true
            on_background
            @running = false
        end
        after(0.1) { run_method }
     end
    

    Using a loop is not a good idea with async unless there is some kind of flow control done, or a blocking process such as with @server.accept... otherwise it will just pull 100% of the CPU core for no good reason.

    By the way, you can also use now_and_every as well as now_and_after too... this would run the block right away, then run it again after the amount of time you want.

    Using every is shown in this gist:


    The ideal situation, in my opinion:

    This is a rough but immediately usable example:


    require 'celluloid/current'
    
    class Indefinite
      include Celluloid
    
      INTERVAL = 0.5
      ONE_AT_A_TIME = true
    
      def self.run!
        puts "000a Instantiating."
        indefinite = new
        indefinite.run
        puts "000b Running forever:"
        sleep
      end
    
      def initialize
        puts "001a Initializing."
        @mutex = Mutex.new if ONE_AT_A_TIME
        @running = false
        puts "001b Interval: #{INTERVAL}"
      end
    
      def run
        puts "002a Running."
        unless ONE_AT_A_TIME && @running
          if ONE_AT_A_TIME
            @mutex.synchronize {
              puts "002b Inside lock."
              @running = true
              on_background
              @running = false
            }
          else
            puts "002b Without lock."
            on_background
          end
        end
        puts "002c Setting new timer."
        after(INTERVAL) { run }
      end
    
    
      def on_background
        if ONE_AT_A_TIME
          puts "003 Running background processor in foreground."
        else
          puts "003 Running in background"
        end
      end
    end
    
    Indefinite.run!
    puts "004 End of application."
    

    This will be its output, if ONE_AT_A_TIME is true:

    000a Instantiating.
    001a Initializing.
    001b Interval: 0.5
    002a Running.
    002b Inside lock.
    003 Running background processor in foreground.
    002c Setting new timer.
    000b Running forever:
    002a Running.
    002b Inside lock.
    003 Running background processor in foreground.
    002c Setting new timer.
    002a Running.
    002b Inside lock.
    003 Running background processor in foreground.
    002c Setting new timer.
    002a Running.
    002b Inside lock.
    003 Running background processor in foreground.
    002c Setting new timer.
    002a Running.
    002b Inside lock.
    003 Running background processor in foreground.
    002c Setting new timer.
    002a Running.
    002b Inside lock.
    003 Running background processor in foreground.
    002c Setting new timer.
    002a Running.
    002b Inside lock.
    003 Running background processor in foreground.
    002c Setting new timer.
    

    And this will be its output if ONE_AT_A_TIME is false:

    000a Instantiating.
    001a Initializing.
    001b Interval: 0.5
    002a Running.
    002b Without lock.
    003 Running in background
    002c Setting new timer.
    000b Running forever:
    002a Running.
    002b Without lock.
    003 Running in background
    002c Setting new timer.
    002a Running.
    002b Without lock.
    003 Running in background
    002c Setting new timer.
    002a Running.
    002b Without lock.
    003 Running in background
    002c Setting new timer.
    002a Running.
    002b Without lock.
    003 Running in background
    002c Setting new timer.
    002a Running.
    002b Without lock.
    003 Running in background
    002c Setting new timer.
    002a Running.
    002b Without lock.
    003 Running in background
    002c Setting new timer.
    

    You need to be more "evented" than "threaded" to properly issue tasks and preserve scope and state, rather than issue commands between threads/actors... which is what the every and after blocks provide. And besides that, it's good practice either way, even if you didn't have a Global Interpreter Lock to deal with, because in your example, it doesn't seem like you are dealing with a blocking process. If you had a blocking process, then by all means have an infinite loop. But since you're just going to end up spawning an infinite number of background tasks before even one is processed, you need to either use a sleep like your question started with, or use a different strategy altogether, and use every and after which is how Celluloid itself encourages you to operate when it comes to handling data on sockets of any kind.


    Approach #2: Use a recursive method call.

    This just came up in the Google Group. The below example code will actually allow execution of other tasks, even though it's an infinite loop.

    This approach is less desirable because it will likely have more overhead, spawning a series of fibers.

    def work
        # ...
        async.work
    end
    

    Question #2: Thread vs. Fiber behaviors.

    The second question is why the following would work: loop { Thread.new { puts "Hello" } }

    That spawns an infinite number of process threads, which are managed by the RVM directly. Even though there is a Global Interpreter Lock in the RVM you are using... that only means no green threads are used, which are provided by the operating system itself... instead these are handled by the process itself. The CPU scheduler for the process runs each Thread itself, without hesitation. And in the case of the example, the Thread runs very quickly and then dies.

    Compared to an async task, a Fiber is used. So what's happening is this, in the default case:

    1. Process starts.
    2. Actor instantiated.
    3. Method call invokes loop.
    4. Loop invokes async method.
    5. async method adds task to mailbox.
    6. Mailbox is not invoked, and loop continues.
    7. Another async task is added to the mailbox.
    8. This continues infinitely.

    The above is because the loop method itself is a Fiber call, which is not ever being suspended ( unless a sleep is called! ) and therefore the additional task added to the mailbox is never an invoking a new Fiber. A Fiber behaves differently than a Thread. This is a good piece of reference material discussing the differences:


    Question #3: Celluloid vs. Celluloid::ZMQ behavior.

    The third question is why include Celluloid behaves differently than Celluloid::ZMQ ...

    That's because Celluloid::ZMQ uses a reactor-based evented mailbox, versus Celluloid which uses a condition variable based mailbox.

    Read more about pipelining and execution modes:

    That is the difference between the two examples. If you have additional questions about how these mailboxes behave, feel free to post on the Google Group ... the main dynamic you are facing is the unique nature of the GIL interacting with the Fiber vs. Thread vs. Reactor behavior.

    You can read more about the reactor-pattern here:

    And see the specific reactor used by Celluloid::ZMQ here:

    So what's happening in the evented mailbox scenario, is that when sleep is hit, that is a blocking call, which causes the reactor to move to the next task in the mailbox.

    But also, and this is unique to your situation, the specific reactor being used by Celluloid::ZMQ is using an eternal C library... specifically the 0MQ library. That reactor is external to your application, which behaves differently than Celluloid::IO or Celluloid itself, and that is also why the behavior is occurring differently than you expected.

    Multi-core Support Alternative

    If maintaining state and scope is not important to you, if you use jRuby or Rubinius which are not limited to one operating system thread, versus using MRI which has the Global Interpreter Lock, you can instantiate more than one actor and issue async calls between actors concurrently.

    But my humble opinion is that you would be much better served using a very high frequency timer, such as 0.001 or 0.1 in my example, which will seem instantaneous for all intents and purposes, but also allow the actor thread plenty of time to switch fibers and run other tasks in the mailbox.