symfonyqueuesymfony-messenger

Symfony5 messenger, parallel queues for same message handlers


Symfony messenger:

https://symfony.com/doc/current/messenger.html

Problem:

Pool#1 = (user1 creates a Job, Job is splitted to 10 messenger's Message)
Pool#2 = (user2 creates a Job, Job is splitted to 10 messenger's Message)
...
Pool#100 = (user100 creates a Job, Job is splitted to 10 messenger's Message)

Pool#100 will not be executed, until all previous Pools will not be finished.

Goal:

I need parallel queues, that all Pools will be run separately, so each Pool will have personal queue.

Code example:

config/packages/messenger.yaml
framework:
    messenger:
        transports:
            sync: "%env(MESSENGER_TRANSPORT_DSN)%"
        routing:
            'App\Message\Job': sync
src/Message/Job.php
<?php

namespace App\Message;

class Job
{
    private $content;

    public function __construct(string $content)
    {
        $this->content = $content;
    }

    public function getContent(): string
    {
        return $this->content;
    }
}
src/MessageHandler/JobHandler.php
<?php

namespace App\MessageHandler;

use App\Message\Job;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class JobHandler implements MessageHandlerInterface
{
    public function __construct()
    {}

    public function __invoke(Job $message)
    {
        $params = json_decode($message->getContent(), true);
        dump($params);
    }
}
src/Controller/JobController.php
<?php

namespace App\Controller;

use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\JsonResponse;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Routing\Annotation\Route;

/**
 * @Route("/job")
 */
class JobController extends AbstractController
{
    /**
     * @Route("/create", name="app_job_create")
     * @param Request $request
     * @param MessageBusInterface $bus
     * @return JsonResponse
     */
    public function create(Request $request, MessageBusInterface $bus): JsonResponse
    {
        // ...
        $entityId = $entity->getId();
        // ...

        for ($i = 0; $i < 10; $i++) {
            $params['entityId'] = $entityId;
            $params['counter'] = $i;
            $bus->dispatch(new Job(json_encode($params)));
        }

        return new JsonResponse([]);
    }
}

More info:

I would like to continue use this, but can't find easiest solution to pass some unique queue name or id, that then say to worker that he must handle only this Pool of Messages.
I found custom transports https://symfony.com/doc/current/messenger/custom-transport.html, but I'm not sure that it may help. At least I think that only custom transport is not enough.
And I read about Actor models https://www.brianstorti.com/the-actor-model/ but I would like to use Messenger+Redis only, if possible.

Probably here is no solution and this messenger can't handle parallel queues yet. Anyway I'm glad for any help.
Thank you!


Solution

  • I ended up resolving this by using dynamic queue names.
    Unfortunately, I was forced to refuse the messenger.

    Also, the SDK for RabbitMQ https://github.com/php-amqplib/RabbitMqBundle is currently not supported for symfony 5 (June 2020), I'm not sure on 100%, but I used it on the 3rd version, but I couldn’t put it on the 5th. So I used another one.

    Here's a good quick start guide https://blog.programster.org/rabbitmq-job-queue-with-php
    From the example, I changed RABBITMQ_QUEUE_NAME to a dynamic name and it worked fine.

    Then everything is as usual, raise RabbitMQ and configure the supervisor https://symfony.com/doc/current/messenger.html#supervisor-configuration

    I will be glad if this helps someone, thanks!