c++cazureazure-iot-hublibmosquitto

Libmosquitto publish doesn't deliver all messages to Azure IoT Hub


I'm trying to publish more than 100 messages per second to Azure Iot Hub built-in event hub. I'm using libmosquitto 1.6.8 library. I'm using the Free tier package for Azure Iot Hub, I know that there is throttle limit of 100 messages per second. But this is not related to that issue. I have not been able to publish even half of the messages to AZ Iot Hub.

Basically, I have a list of multiple values in multimap that needs to be sent. The metric list:

std::multimap< const std::string, std::tuple< const std::string, const std::string, float> > calculatedMetricList;

I'll be iterating through the multimap and constructing each value into a object payload and will be sending it through. What this means is that the mosquitto_publish method will be called multiple times.

Following is the code for publishing the messages:

void MosquittoClient::sendDataToUpstreamSystem(){

StatisticalMethod statisticalMethod;
int rc;

MosquittoClient pub_mosq(
    "<IoT Hub Name>.azure-devices.net",
    "<deviceID>", 
    "<username>", 
    "<Password>", 
    "devices/<deviceID>/messages/events/");

printf("Using MQTT to get data payload from host: %s and on port: %d.\r\n", pub_mosq.get_host(), pub_mosq.get_port());
// init the mosquitto lib
mosquitto_lib_init();

// create the mosquito object
struct mosquitto* mosq = mosquitto_new(pub_mosq.get_deviceID(), false, NULL);

// add callback functions
mosquitto_connect_callback_set(mosq, MosquittoClient::connect_callback);
mosquitto_publish_callback_set(mosq, MosquittoClient::publish_callback);
mosquitto_message_callback_set(mosq, MosquittoClient::on_message);
mosquitto_disconnect_callback_set(mosq, MosquittoClient::on_disconnect_callback);

// set mosquitto username, password and options
mosquitto_username_pw_set(mosq, pub_mosq.get_userName(), pub_mosq.get_password());

// specify the certificate to use
std::ifstream infile(pub_mosq.get_certificate());
bool certExists = infile.good();
infile.close();

if (!certExists)
{
    printf("Warning: Could not find file '%s'! The mosquitto loop may fail.\r\n", pub_mosq.get_certificate());
}

printf("Using certificate: %s\r\n", pub_mosq.get_certificate());
mosquitto_tls_set(mosq, pub_mosq.get_certificate(), NULL, NULL, NULL, NULL);

// specify the mqtt version to use
int* option = new int(MQTT_PROTOCOL_V311);
rc = mosquitto_opts_set(mosq, MOSQ_OPT_PROTOCOL_VERSION, option);
if (rc != MOSQ_ERR_SUCCESS)
{
    rc = pub_mosq.mosquitto_error(rc, "Error: opts_set protocol version");
}
else
{
    printf("Setting up options OK\r\n");
}

// connect
printf("Connecting...\r\n");
rc = mosquitto_connect_async(mosq, pub_mosq.get_host(), pub_mosq.get_port(), 4);
if (rc != MOSQ_ERR_SUCCESS)
{
    rc = pub_mosq.mosquitto_error(rc, NULL);
}
else
{
    printf("Connect returned OK\r\n");

    rc = mosquitto_loop_start(mosq);

    if (rc != MOSQ_ERR_SUCCESS)
    {
        rc = pub_mosq.mosquitto_error(rc, NULL);
    }
    else
    {
        do
        {
            for (auto itr = Metrics::calculatedMetricList.begin(); itr != Metrics::calculatedMetricList.end(); itr++) { 
                int msgId = rand();

                std::string test1= itr->first;
                std::string test2 = std::get<0>(itr->second);
                std::string test3= std::get<1>(itr->second); // metric type 
                float value = std::get<2>(itr->second); // value

                DataPayload objectPayload(
                    75162345,
                    496523,
                    test3,
                    value,
                    "test",
                    test1,
                    "test",
                    "test",
                    123,
                    213,
                    23
                );

                objectPayload.setPayload();
                std::string dataPayload = objectPayload.getPayload();

                //DEBUG
                std::cout << "dataPayload: " << dataPayload << std::endl;
                //DEBUG
                std::cout << "dataPayload Size: " << dataPayload.size() << std::endl;

                // once connected, we can publish (send) a Telemetry message
                printf("Publishing to topic: %s\r\n", pub_mosq.get_topic());

                rc = pub_mosq.publishToTopic(mosq, &msgId, dataPayload.size(), (char *)dataPayload.c_str());

                if (rc == MOSQ_ERR_SUCCESS)
                {
                    printf("Publish returned OK\r\n");
                }
                else 
                {               
                    rc = pub_mosq.mosquitto_error(rc, NULL);
                }
            } 

        } while (rc != MOSQ_ERR_SUCCESS);   
    }
}

mosquitto_loop_stop(mosq, true);
mosquitto_destroy(mosq);

mosquitto_lib_cleanup();}

Publish method:

    int MosquittoClient::publishToTopic(struct mosquitto *mosq, int *msgId, int sizeOfData, char *data)
{
    return mosquitto_publish(mosq, msgId, p_topic, sizeOfData, data, 1, true);
}

When running the program all the messages published return ok, according to the console. But only one or two messages are appearing on the Azure IoT Hub side.

The following image shows the monitoring of IoT Hub, at that time only one message got through. Device Explorer Twin Monitoring

I have tried so many different solutions, but the program was unable to publish all the messages. It looks like the publish method is waiting to complete the first message but the iteration is moving onto the next message, causing it to be dropped. If that is the cause of the dropped messages, what is the best way to sequence the message sending? Otherwise, what else could be causing messages to be dropped?

Update

The problem was the program didn't waiting until the messages were successfully published to the broker (Azure IoT Hub). You will know if the message is successfully published to the broker if the publish_callback is returned.

void MosquittoClient::publish_callback(struct mosquitto* mosq, void* userdata, int mid)
{
    printf("Publish OK.\r\n");
}

Solution was to sleep thread before destroy, cleanup calls and start Mosquitto loop before connection is established.

sleep(30);
mosquitto_loop_stop(mosq, true);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();

Solution

  • mosquitto_publish() is asynchronous: having it return MOSQ_ERR_SUCCESS simply means that the publication of the message has properly been passed to the Mosquitto thread. So at the moment you are enqueuing lots of messages, and then have your program terminate before it had a chance to actually send the packets.

    You can use your MosquittoClient::publish_callback callback to check that all the messages have effectively been sent before stopping the loop and terminating your program.