So I use beanstalk_console (https://github.com/ptrofimov/beanstalk_console) to monitor my beanstalkd queue and at times, I would get 1 or 2 jobs stuck in the buried state. This is due to how I have my worker code setup, where I immediately bury a job upon being reserved and delete it if all goes well (else it stays buried). I also use supervisord to manage multiple queue workers. Quick code sample below:
$pheanstalk = new Pheanstalk();
$pheanstalk->watch('tube');
while ($job = $pheanstalk->reserve()) {
$pheanstalk->bury($job);
$success = false;
// Process job here. Set $success to true if no errors were encountered
if ($success) {
$pheanstalk->delete($job);
}
}
Now to my problem: I noticed that if I use beanstalk_console to move my buried jobs back to their designated tubes, there would be random times when all my available workers would reserve the same job at the same time (interestingly, all with a different job id), leading to duplicate work. They all would also get buried at the same time, though beanstalk_console would only show the latest buried job so I'm not sure what's going on. If I add something like sleep(1)
just before I bury my job, then only 1 worker reserves the job and I don't get duplicate work. It's almost like immediately burying a job after reservation puts it back into the queue. Is there something wrong with my code? Or something odd with how beanstalk_console is putting jobs back?
So I revisited this problem and see why it occurs now. Beanstalk Console uses a do while loop to search for remaining jobs and will only exit out of this loop once an exception is thrown from not finding any more buried jobs left.
https://github.com/ptrofimov/beanstalk_console/blob/1.7.7/lib/include.php#L765
Unfortunately, it looks like the loop can catch jobs that I immediately bury (just like in my sample code) well before an exception is thrown, which can lead to the same job being processed more than once. It also doesn't matter if I have 1 or more workers waiting for jobs. More workers just means more duplicate processing.