phplaravelqueuemqtt

Running MQTT publish command in Laravel queue with database option fails but in sync it works


I am working on an app the has a background job that publishes command to an mqtt broker and when the queue is in sync it works but when it is database it only sends 1 out of 2 commands (separate dispatched jobs). No errors in laravel log or supervisor log.

Been googling various things all day and trying different things but nothing worked.

Here is my MQTT service class:

<?php

namespace App\Services;

use Illuminate\Contracts\Container\BindingResolutionException;
use PhpMqtt\Client\Exceptions\ConfigurationInvalidException;
use PhpMqtt\Client\Exceptions\ConnectingToBrokerFailedException;
use PhpMqtt\Client\Exceptions\ClientNotConnectedToBrokerException;
use PhpMqtt\Client\Exceptions\RepositoryException;
use PhpMqtt\Client\Exceptions\PendingMessageAlreadyExistsException;
use PhpMqtt\Client\Exceptions\DataTransferException;
use PhpMqtt\Client\MqttClient;
use Psr\Container\NotFoundExceptionInterface;
use Psr\Container\ContainerExceptionInterface;
use App\Models\Devices\Device;

class MqttService {

    public function sendCommand(Device $device, string $command, int $qos = 1)
    {
        $host = config('mqtt.host');
        $port = config('mqtt.port');
        $clientId = config('mqtt.client_id');


        $mqtt = new MqttClient($host, $port, $clientId);
        $mqtt->connect();

        $userId = $device->user->id;
        $deviceId = $device->id;
        $uniqueId = $device->unique_id;
        $topic = "v2/$userId/$deviceId/commands";

        if ($device->recognized_by_unique_id) {
            $topic = "v3/$uniqueId/commands";
        }

        $mqtt->publish($topic, $command, $qos);
        $mqtt->disconnect();
    }

    /**
     * Publishes error message for a specific device and user on the Mqtt broker.
     * 
     * @param string|null $userId the user id - first subtopic
     * @param string|null $deviceId the device id - last sub topic
     * @param string|null $uniqueId the device unique id - if it is using unique id instead of user id and device id
     * @param string|null $errorAsJson the json error message to publish on the mqtt broker
     * 
     * @return void 
     * 
     * @throws BindingResolutionException 
     * @throws NotFoundExceptionInterface 
     * @throws ContainerExceptionInterface 
     * @throws ConfigurationInvalidException 
     * @throws ConnectingToBrokerFailedException 
     * @throws ClientNotConnectedToBrokerException 
     * @throws RepositoryException 
     * @throws PendingMessageAlreadyExistsException 
     * @throws DataTransferException 
     */
    public static function sendError(?string $userId = null, ?string $deviceId = null, ?string $uniqueId = null, string $errorAsJson) 
    {
        $host = config('mqtt.host');
        $port = config('mqtt.port');
        $clientId = config('mqtt.client_id');

        if ($uniqueId) {
            $topic = "errors/$uniqueId";
        } else {
            $topic = "errors/$userId/$deviceId";
        }

        $mqtt = new MqttClient($host, $port, $clientId);
        $mqtt->connect();
        $mqtt->publish($topic, $errorAsJson, 0);
        $mqtt->disconnect();
    }

}

here is my job:

<?php

namespace App\Jobs;

use App\Services\CommandService;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use App\Models\Devices\Device;

class SendCommand implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * Create a new job instance.
     */
    public function __construct(public Device $device, public string $command) {}

    /**
     * Execute the job.
     */
    public function handle(): void
    {
        info("Before: {$this->device->unique_id}; {$this->command}");
        CommandService::sendCommands($this->device, $this->command);
        info("After: {$this->device->unique_id}; {$this->command}");
    }
}

Here is also the relevant part of the CommandService class:

public static function sendCommands(Device $device, string $command)
{
    $mqtt = new MqttService;

    $mqtt->sendCommand($device, $command);

    return true;
}

I added the logging calls, it logs two of each the only oddity is that it logs first two "Before:" etc and then two "After:" etc. though I when I think about it that should be ok as jobs are running in parallel.

In terms of which device's job is dispatched it seems to be random sometimes device 1 sometimes device 2. I had cases where both are dispatched but that is rarer.

I am lost, any ideas?


Solution

  • As per the comments this is likely to be a combination of things:

    This issue contains a detailed response to someone experiencing a similar problem. Potential solutions include: