gstreamerqtgstreamer

QtGstreamer Appsink: hangs and slow/unusable samples


my goal is to create a simple custom sink able to receive data from a pipeline which should than be used for different applications (recording, broadcasting, internal buffering, etc.).

In my first attempt the idea is to retransmit a Http(s)/Udp/etc. streaming over Http again so i'm using souphttpsrc, a queue and QHttp to serve the data to one or more clients.

The applications seems to work since i can start the pipeline with my custom sink (which by default just ignores any sample until at least one client is connected), so i simply copy the samples in the client response.

What's really strange is that the speed of the download is really really really slower (5-10 KB/s instead of 2-300KB/s, as expected) than the incoming streaming speed and moreover the data seems to be completely unusable. I've tried to insert a decodebin in the pipeline before the sink and the speed reached 33 MBit/s, so i would exclude performance issues caused by the Queued invocation of the method in the main thread, responsible for sending the actual sample over the network; even in this case the data seem just garbage.

In second place if i add a tee (or multiqueue) to run in parallel an autovideosink and/or an autoaudiosink, the pipeline gets stucked on the startup (this doesn't happen just with two autosink) and again the defect is the same if i try to link just two custom appsink to the tee.

Debugging the code seems that newSample() is never called when the pipeline hangs even if the last message received on the bus is the playing state change of the custom sink.

Thank you!

Here the code:

QGst::FlowReturn MultiHttpSink::newSample() {

        QGst::SamplePtr sample = pullSample();
        char data[sample->buffer()->size()];
        qDebug() << "New Sample: size " << sample->buffer()->size();
        qDebug() << sample->buffer()->duration().toTime();
        qDebug() << sample->buffer()->presentationTimeStamp().toTime();

        sample->buffer()->extract(0, &data, sample->buffer()->size());
        for (auto it = resources.begin(); it != resources.end(); it++)
            QMetaObject::invokeMethod(&*server, "writeToHttpResource", Qt::QueuedConnection, Q_ARG(QPointer<qhttp::server::QHttpResponse>, *it), Q_ARG(QByteArray, data));

        return QGst::FlowOk;
    }


    void Server::setupServer() {
        server = new QHttpServer(app);
        server->listen(QHostAddress::Any, 8080, [&](QHttpRequest* req, QHttpResponse * res) {

            res->setStatusCode(qhttp::ESTATUS_OK); // http status 200
            res->addHeader("Content-Type", "video/mp2t");

            m_sink.addAudience(res);
        });

        if (!server->isListening()) {
            fprintf(stderr, "failed. can not listen at port 8080!\n");
            throw std::exception();
        }

    void Server::setupPipeline() {

        /* source pipeline */
        QString pipe1Descr = QString(
                            "souphttpsrc location=\"%1\" ! "
                "queue ! tee name=splitter "
                "splitter.! decodebin ! autoaudiosink "
                "splitter.! decodebin ! autovideosink "
                "splitter.! appsink name=\"mysink\" "
                            ).arg("URL");
        pipeline1 = QGst::Parse::launch(pipe1Descr).dynamicCast<QGst::Pipeline>();
        m_sink.setElement(pipeline1->getElementByName("mysink"));

        QGlib::connect(pipeline1->bus(), "message", this, &Server::onBusMessage);
        pipeline1->bus()->addSignalWatch();
        /* start playing */
        pipeline1->setState(QGst::StatePlaying);

    }



    QString stateString(QGst::State state) {
        switch (state) {
            case 0:
                return "Void Pending";
            case 1:
                return "Null";
            case 2:
                return "Ready";
            case 3:
                return "Playing";
            case 4:
                return "Paused";
        }
        return "";
    }

    QString streamStateString(QGst::StreamStatusType type) {
        switch (type) {
            case 0:
                return "Create";
            case 1:
                return "Enter";
            case 2:
                return "Leave";
            case 3:
                return "Destroy";
            case 4:
                return "Start";
            case 5:
                return "Pause";
            case 6:
                return "Stop";
        }
    }

    void Server::onBusMessage(const QGst::MessagePtr& message) {

        switch (message->type()) {
            case QGst::MessageEos:
                app->quit();
                break;
            case QGst::MessageError:
                qCritical() << message.staticCast<QGst::ErrorMessage>()->error();
                qCritical() << message.staticCast<QGst::ErrorMessage>()->debugMessage();
                break;
            case QGst::MessageStateChanged:
            {
                QGlib::RefPointer<QGst::StateChangedMessage> msg = message.staticCast<QGst::StateChangedMessage>();
                qDebug() << msg->source()->name() <<": State changed from " << stateString(msg->oldState())
                        << " -> " << stateString(msg->newState()) << " Transitioning to " << stateString(msg->pendingState());
                if(msg->source()->name().compare("pipeline0")==0 ){
                    if( msg->newState() == QGst::StatePaused ){
                        need_reset = true;
                        playing = false;
                    } else { 
                        need_reset = false;
                        if( msg->newState() == QGst::StatePlaying ){
                            playing = true;
                        }
                    }
                }
                break;
            }
            case QGst::MessageStreamStatus:
            {
                QGlib::RefPointer<QGst::StreamStatusMessage> msg = message.staticCast<QGst::StreamStatusMessage>();
                qDebug() << "Stream Status: " << streamStateString(msg->statusType());
                break;
            }
            default:
                qDebug() << "Unhandles Bus Message Type: " << message->typeName();
                break;
        }
    }

    void Server::writeToHttpResource(QPointer<qhttp::server::QHttpResponse> res, QByteArray data) {
        qDebug() << "Writing data...";
        res->write(data);
    }

UPDATE

The debug log of gstreamer is reporting something useful...

    0:00:00.733339246 [335m 8248[00m      0x2747450 [37mDEBUG  [00m [00m         souphttpsrc gstsouphttpsrc.c:1430:gst_soup_http_src_chunk_allocator:<souphttpsrc0>[00m alloc 4096 bytes <= 18446744073709551615
    0:00:00.733350762 [335m 8248[00m      0x274a8f0 [37mDEBUG  [00m [00m                 tee gsttee.c:774:gst_tee_chain:<splitter>[00m received buffer 0x7f69d000fb80
    0:00:00.733353629 [335m 8248[00m      0x2747450 [37mDEBUG  [00m [00;01;34m          GST_MEMORY gstmemory.c:138:gst_memory_init:[00m new memory 0x7f69d0015120, maxsize:4103 offset:0 size:4096
    0:00:00.733361430 [335m 8248[00m      0x274a8f0 [37mDEBUG  [00m [00;01;35m      GST_SCHEDULING gstpad.c:4174:gst_pad_chain_data_unchecked:<mysink2:sink>[00m calling chainfunction &gst_base_sink_chain with buffer buffer: 0x7f69d000fb80, pts 99:99:99.999999999, dts 99:99:99.999999999, dur 99:99:99.999999999, size 1430, offset 15582, offset_end none, flags 0x0

The size of the incoming network packets is 4096 but the size of the buffer is 1430 most of the time... investigating.... i think i should access the buffer in a different way to obtain access to raw data.

The size of the network packets is 1430, 4096 seems to be the buffer area...

    0:00:29.316301105 [335m 8248[00m      0x2747450 [37mDEBUG  [00m [00m         souphttpsrc gstsouphttpsrc.c:1482:gst_soup_http_src_got_chunk_cb:<souphttpsrc0>[00m got chunk of 1430 bytes

Why the transfer is slow and pointless remains a mistery for now. Why it hangs too!

UPDATE 2

I've also tried this way, same results (and same sizes reported):

    QGst::FlowReturn MultiHttpSink::newSample() {

        QGst::SamplePtr sample = pullSample();
        QGst::BufferPtr buffer = sample->buffer();

        qDebug() << "New Sample: size " << buffer->size();

        QGst::MapInfo map;

        if (!buffer->map(map, QGst::MapRead))
            return QGst::FlowError;

        qDebug() << "New Buffer Map: size " << map.size();


        for (auto it = resources.begin(); it != resources.end(); it++)
            QMetaObject::invokeMethod(&*server, "writeToHttpResource", Qt::QueuedConnection, Q_ARG(QPointer<qhttp::server::QHttpResponse>, *it), Q_ARG(QByteArray, (char *)map.data()));

        buffer->unmap(map);

        return QGst::FlowOk;
    }

Here the output of the last version, same as before:

... ...
New Sample: size  1430
New Buffer Map: size  1430
QTime("23:34:33.709")
QTime("23:34:33.709")
Writing data...
New Sample: size  4096
New Buffer Map: size  4096
QTime("23:34:33.709")
QTime("23:34:33.709")
Writing data...
... ...

UPDATE 3

This is the log between each chunk of data:

    New Sample: size  1430
    New Buffer Map: size  1430
    QTime("23:34:33.709")
    QTime("23:34:33.709")
    0:04:28.221638796 [335m28768[00m       0xd8a8f0 [37mDEBUG  [00m [00m            basesink gstbasesink.c:3566:gst_base_sink_chain_unlocked:<mysink2>[00m object unref after render 0x7f894400f850
    0:04:28.221653137 [335m28768[00m       0xd8a8f0 [37mDEBUG  [00m [00;01;35m      GST_SCHEDULING gstpad.c:4180:gst_pad_chain_data_unchecked:<mysink2:sink>[00m called chainfunction &gst_base_sink_chain with buffer 0x7f894400f850, returned ok
    0:04:28.221667131 [335m28768[00m       0xd8a8f0 [37mDEBUG  [00m [00m                 tee gsttee.c:778:gst_tee_chain:<splitter>[00m handled buffer ok
    0:04:28.221676164 [335m28768[00m       0xd8a8f0 [37mDEBUG  [00m [00;01;35m      GST_SCHEDULING gstpad.c:4180:gst_pad_chain_data_unchecked:<splitter:sink>[00m called chainfunction &gst_tee_chain with buffer 0x7f894400f850, returned ok
    0:04:28.221687738 [335m28768[00m       0xd8a8f0 [37mDEBUG  [00m [00m      queue_dataflow gstqueue.c:1482:gst_queue_loop:<queue0>[00m queue is empty
    0:04:28.222861204 [335m28768[00m       0xd87450 [37mDEBUG  [00m [00m         souphttpsrc gstsouphttpsrc.c:1430:gst_soup_http_src_chunk_allocator:<souphttpsrc0>[00m alloc 4096 bytes <= 18446744073709551615
    0:04:28.222896720 [335m28768[00m       0xd87450 [37mDEBUG  [00m [00;01;34m          GST_MEMORY gstmemory.c:138:gst_memory_init:[00m new memory 0x7f8944012fe0, maxsize:4103 offset:0 size:4096
    0:04:28.222926663 [335m28768[00m       0xd87450 [37mDEBUG  [00m [00m         souphttpsrc gstsouphttpsrc.c:1482:gst_soup_http_src_got_chunk_cb:<souphttpsrc0>[00m got chunk of 1430 bytes
    0:04:28.222956032 [335m28768[00m       0xd87450 [37mDEBUG  [00m [00m             basesrc gstbasesrc.c:2316:gst_base_src_do_sync:<souphttpsrc0>[00m no sync needed
    0:04:28.222968263 [335m28768[00m       0xd87450 [37mDEBUG  [00m [00m             basesrc gstbasesrc.c:2520:gst_base_src_get_range:<souphttpsrc0>[00m buffer ok
    0:04:28.222978663 [335m28768[00m       0xd87450 [37mDEBUG  [00m [00;01;35m      GST_SCHEDULING gstpad.c:4174:gst_pad_chain_data_unchecked:<queue0:sink>[00m calling chainfunction &gst_queue_chain with buffer buffer: 0x7f894400f630, pts 99:99:99.999999999, dts 99:99:99.999999999, dur 99:99:99.999999999, size 1430, offset 20789192, offset_end none, flags 0x0
    0:04:28.223004500 [335m28768[00m       0xd87450 [37mDEBUG  [00m [00;01;35m      GST_SCHEDULING gstpad.c:4180:gst_pad_chain_data_unchecked:<queue0:sink>[00m called chainfunction &gst_queue_chain with buffer 0x7f894400f630, returned ok
    0:04:28.223013700 [335m28768[00m       0xd8a8f0 [37mDEBUG  [00m [00m      queue_dataflow gstqueue.c:1494:gst_queue_loop:<queue0>[00m queue is not empty
    0:04:28.223031507 [335m28768[00m       0xd87450 [37mDEBUG  [00m [00m             basesrc gstbasesrc.c:2355:gst_base_src_update_length:<souphttpsrc0>[00m reading offset 20790622, length 4096, size -1, segment.stop -1, maxsize -1
    0:04:28.223066379 [335m28768[00m       0xd87450 [37mDEBUG  [00m [00m             basesrc gstbasesrc.c:2456:gst_base_src_get_range:<souphttpsrc0>[00m calling create offset 20790622 length 4096, time 0
    0:04:28.223056071 [335m28768[00m       0xd8a8f0 [37mDEBUG  [00m [00;01;35m      GST_SCHEDULING gstpad.c:4174:gst_pad_chain_data_unchecked:<splitter:sink>[00m calling chainfunction &gst_tee_chain with buffer buffer: 0x7f894400f630, pts 99:99:99.999999999, dts 99:99:99.999999999, dur 99:99:99.999999999, size 1430, offset 20789192, offset_end none, flags 0x0
    0:04:28.223111831 [335m28768[00m       0xd8a8f0 [37mDEBUG  [00m [00m                 tee gsttee.c:774:gst_tee_chain:<splitter>[00m received buffer 0x7f894400f630
    0:04:28.223121952 [335m28768[00m       0xd87450 [37mDEBUG  [00m [00m         souphttpsrc gstsouphttpsrc.c:1430:gst_soup_http_src_chunk_allocator:<souphttpsrc0>[00m alloc 4096 bytes <= 18446744073709551615
    0:04:28.223139071 [335m28768[00m       0xd8a8f0 [37mDEBUG  [00m [00;01;35m      GST_SCHEDULING gstpad.c:4174:gst_pad_chain_data_unchecked:<mysink2:sink>[00m calling chainfunction &gst_base_sink_chain with buffer buffer: 0x7f894400f630, pts 99:99:99.999999999, dts 99:99:99.999999999, dur 99:99:99.999999999, size 1430, offset 20789192, offset_end none, flags 0x0
    0:04:28.223152343 [335m28768[00m       0xd87450 [37mDEBUG  [00m [00;01;34m          GST_MEMORY gstmemory.c:138:gst_memory_init:[00m new memory 0x7f8944015120, maxsize:4103 offset:0 size:4096
    0:04:28.223179317 [335m28768[00m       0xd8a8f0 [37mDEBUG  [00m [00m            basesink gstbasesink.c:3409:gst_base_sink_chain_unlocked:<mysink2>[00m got times start: 99:99:99.999999999, end: 99:99:99.999999999
    0:04:28.223208511 [335m28768[00m       0xd8a8f0 [37mDEBUG  [00m [00m            basesink gstbasesink.c:1958:gst_base_sink_get_sync_times:<mysink2>[00m got times start: 99:99:99.999999999, stop: 99:99:99.999999999, do_sync 0
    0:04:28.223222869 [335m28768[00m       0xd87450 [37mDEBUG  [00m [00;01;34m          GST_MEMORY gstmemory.c:87:_gst_memory_free:[00m free memory 0x7f8944015120
    0:04:28.223238437 [335m28768[00m       0xd8a8f0 [37mDEBUG  [00m [00;04m             default gstsegment.c:731:gst_segment_to_running_time_full:[00m invalid position (-1)
    0:04:28.223281513 [335m28768[00m       0xd8a8f0 [37mDEBUG  [00m [00;04m             default gstsegment.c:731:gst_segment_to_running_time_full:[00m invalid position (-1)
    0:04:28.223298205 [335m28768[00m       0xd8a8f0 [37mDEBUG  [00m [00m            basesink gstbasesink.c:3520:gst_base_sink_chain_unlocked:<mysink2>[00m rendering object 0x7f894400f630
    0:04:28.223312428 [335m28768[00m       0xd8a8f0 [37mDEBUG  [00m [00m            basesink gstbasesink.c:946:gst_base_sink_set_last_buffer_unlocked:<mysink2>[00m setting last buffer to 0x7f894400f630
    0:04:28.223321131 [335m28768[00m       0xd8a8f0 [37mDEBUG  [00m [00;01;34m          GST_MEMORY gstmemory.c:87:_gst_memory_free:[00m free memory 0x7f8944014080
    0:04:28.223344396 [335m28768[00m       0xd8a8f0 [37mDEBUG  [00m [00;01;34m            GST_CAPS gstpad.c:2641:gst_pad_has_current_caps:<mysink2:sink>[00m check current pad caps (NULL)
    0:04:28.223355939 [335m28768[00m       0xd8a8f0 [37mDEBUG  [00m [00m             appsink gstappsink.c:760:gst_app_sink_render:<mysink2>[00m pushing render buffer 0x7f894400f630 on queue (0)
    0:04:28.223369213 [335m28768[00m       0xd8a8f0 [37mDEBUG  [00m [00m             appsink gstappsink.c:1286:gst_app_sink_pull_sample:<mysink2>[00m trying to grab a buffer
    0:04:28.223377762 [335m28768[00m       0xd8a8f0 [37mDEBUG  [00m [00m             appsink gstappsink.c:706:dequeue_buffer:<mysink2>[00m dequeued buffer 0x7f894400f630
    0:04:28.223386131 [335m28768[00m       0xd8a8f0 [37mDEBUG  [00m [00m             appsink gstappsink.c:1301:gst_app_sink_pull_sample:<mysink2>[00m we have a buffer 0x7f894400f630
    New Sample: size  1430
    New Buffer Map: size  1430
    QTime("23:34:33.709")
    QTime("23:34:33.709")

Solution

  • I've finally found the misterious bug!

    QMetaObject::invokeMethod(&*server, "writeToHttpResource", Qt::QueuedConnection, Q_ARG(QPointer<qhttp::server::QHttpResponse>, *it), Q_ARG(QByteArray, (char *)map.data()));
    

    Even if this code compiles it runs badly since this:

    Q_ARG(QByteArray, (char *)map.data())
    

    Using QByteArray(char *) is wrong in this case since we are not initializing it from a string but rather from a chunk of raw data. Written as it was the chunk was parsed all the time looking for the new line combination, cutting the chunk of data randomly.

    To avoid this the constructor to be used is QByteArray(char * ptr, int size).

    This is the right version of the method:

    QMetaObject::invokeMethod(&*server, "writeToHttpResource", Qt::QueuedConnection, Q_ARG(QPointer<GssClientRequest>, *it), Q_ARG(QByteArray, QByteArray((char *) map.data(), map.size())));
    

    Wasn't a so hard bug to defeat in the end ;)

    Thanks for the support!