phplaravelqueuehorizon

Is it possible to balance queued jobs per user


User A can trigger a process that can dispatch over 1,000 jobs to a queue. If user B triggers the same process they need to wait until user's A jobs are finished since its the same job on the same queue.

Is it possible to balance jobs based on users? So in above case, it should process jobs from user A and B simultaneously, slowing down the jobs of user A to accommodate user B.

I am using horizon, but not sure how to setup above configuration.

// dispatch 1000 jobs for user A and B on different requests
ProcessCSV::dispatch()->onQueue('default');

// horizon.php
'supervisor' => [
    'connection' => 'redis',
    'queue' => ['default'],
    'balance' => 'auto',
    ...
]

The only way I could think of accomplishing my use case is having some random queues, however this is not ideal, as the same random numbers might happen between requests, and if more users trigger the jobs it will also not work as expected.

For example:

// request A dispatch 1000 jobs for user A
$userA = rand(1, 5);
ProcessCSV::dispatch()->onQueue($userA);

// request B dispatch 1000 jobs for user B
$userB = rand(1, 5);
ProcessCSV::dispatch()->onQueue($userB);

// horizon.php
'supervisor' => [
    'connection' => 'redis',
    'queue' => ['1', '2', '3', '4', '5'],
    'balance' => 'auto',
    ...
]

Solution

  • In the end I decided to use a combination of Bus::batch and multiple queues. I check which queue is free and pick that queue. If all are busy I just pick the first one. It works well with progress bars and for load balancing the same jobs among queues.

    $jobs = [];
    foreach ($this->getData() as $data) {
        // Can contain over 1000 entries
        $jobs[] = new ProcessData($data);
    }
    
    Bus::batch($jobs)
        ->onQueue(Queue::getFreeQueue())
        ->dispatch();
    
    
    namespace App\Jobs;
    
    use Illuminate\Bus\BatchRepository;
    
    class Queue
    {
        const QUEUES = [
            'queue-1',
            'queue-2',
            'queue-3',
            'queue-4',
            'queue-5',
        ];
    
        public static function getFreeQueue(): string
        {
            $queues = array_flip(self::QUEUES);
    
            /** @var BatchRepository $batches */
            $batches = app(BatchRepository::class);
            $batches = $batches->get(limit: 10, before: null);
            foreach($batches as $batch) {
                /** @var Batch $batch */
                if(!$batch->finished()) {
                    unset($queues[$batch->queue]);
                }
            }
            $availableQueues = array_keys($queues);
            return $availableQueues[0] ?? self::QUEUES[0];
        }
    
    }
    

    And horizon:

    'supervisor' => [
        'connection' => 'redis',
        'queue' => Queue::QUEUES,
        'balance' => 'auto',
        ...