multithreadingperlrabbitmqforkanyevent

Howto deal with AnyEvent, RabbitMQ (heartbeat) and long running jobs in Perl?


I am implementing a system for distributed cronjob execution (so called cron computing cluster). Cronjobs should be queued into a message queue (RabbitMQ) when the time of action is there. On the other side (the nodes/workers of the cluster) is a Perl daemon utilizing AnyEvent::RabbitMQ to receive exactly one cronjob/task/message from the message queue, process the task and request another exactly one cronjob/task/message from the message queue and so on.

I use RabbitMQ's heartbeat feature which is implemented with AnyEvent::RabbitMQ to help RabbitMQ identifying broken connections.

Never mind the actual value of the heartbeat interval! I also have very long running jobs that takes days. So setting the interval to somewhat the longest cronjob would take is not an option.

See the following snippet for executing the actual cronjob within the Perl daemon worker. It is implemented within an ´AnyEvent->timer´ to not DoSing RabbitMQ for a message. This method was used due to RabbitMQ's consume was prohibited (by management).

sub _timer_tick {

  $rabbitmq_channel->get(
    queue      => 'job_queue',
    on_success => sub {
      my ($amqp_method) = @_;
      if ( not $amqp_method->{empty} ) {
        pause_timer();
        progress_job($amqp_method);
        resume_timer();
      }
    },
    on_failure => sub { $quit_programm->send( 'RABBITMQ_ERROR', @_ ) },
  );

  return;
}

progress_job() is where the message gets parsed and the job will be executed. pause_timer() and resume_timer() controls the AnyEvent->timer that triggers _timer_tick().

use Capture::Tiny 'capture';
sub progress_job {
  my ($amqp_method) = @_;
  my $job = decode_json( $amqp_method->{body}->to_raw_payload() );
  my ( $stdout, $stderr, $exit ) = capture {
    system $job->{execute};
  };
  return;
}

The first long running jobs went in and the system "crashes" with various error messages. Sometimes it throws 'Unknown channel id: 1', other times it throws 'Channel has already been closed'. So I did 'dumb debug' (trying to mess with the config) and found out that when the heartbeat interval is shorter than the time taken within progress_job() those errors will be thrown. After some thinking it makes sense. progress_job() is a blocking subroutine and AnyEvent cannot proceed sending heartbeat packages to RabbitMQ.

My first quick thought on solving the blocking-heatbeat issue was to fork and do progress_job() in the child process. AnyEvents documentation on FORK points out that it is save to use fork when there is no access to the event system (e.g. through AnyEvent) within the child. Next thought: OK, there is no access to the event system so I can do fork. BUT: The timer should resume (resume_timer()) AFTER progress_job() has returned. Theoretically resume_timer() would be called right after fork() and not after progress_job() returns. So I stopped my implementation.

My question: How to solve the last bit? How to resume_timer() after progress_job() (or in other words the forked child) returns? I cannot put resume_timer() within the child due to forking and the eventing system are not thread-safe.


Solution

  • AE can't process events unless the program is blocked using an AE-aware call. system is not AE-aware. Use run_cmd from AnyEvent::Util instead.