I have the following two pipelines to transmit opus encoded audio from server to client:
The server:
gst-launch-1.0 -v alsasrc ! audioconvert ! audioresample ! audio/x-raw, rate=16000, channels=1, format=S16LE ! opusenc ! rtpopuspay ! udpsink host=0.0.0.0 port=4000
The client:
gst-launch-1.0 udpsrc port=4000 ! application/x-rtp,payload=96,encoding-name=OPUS ! rtpopusdepay ! opusdec ! autoaudiosink
I try to create a custom GstElement
based plugin to replace rtpopusdepay
in the client side with a hand-crafted one (to be backward compatible with an existing server implementation that doesn't use rtpopuspay
but uses a hand-crafted byte-format to wrap the opus encoded data).
To test the concept I would like to use the pipelines above, but replace the client side with:
GST_PLUGIN_PATH=. gst-launch-1.0 udpsrc port=4000 ! simpacketdepay ! opusdec ! autoaudiosink
Where simpacketdepay
is the plugin I created. The plugin is quite simple, it has fixed caps (ANY for its sink and "audio/x-opus" for its src). In its chain function I simply remove the payload rtpopuspay
adds to the encoded opus stream (first 96 bits) and push the data forward.
The full code:
#include "gstsimpacketdepay.h"
#include <stdio.h>
#include <string.h>
#include <gst/gst.h>
#include <gst/gstcaps.h>
GST_DEBUG_CATEGORY_STATIC (gst_simpacketdepay_debug);
#define GST_CAT_DEFAULT gst_simpacketdepay_debug
/* Enum to identify properties */
enum
{
PROP_0
};
static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE(
"sink",
GST_PAD_SINK,
GST_PAD_ALWAYS,
GST_STATIC_CAPS("ANY")
);
static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE (
"src",
GST_PAD_SRC,
GST_PAD_ALWAYS,
GST_STATIC_CAPS("audio/x-opus, rate=16000, channels=1, channel-mapping-family=0, stream-count=1, coupled-count=0")
);
/* Define our element type. Standard GObject/GStreamer boilerplate stuff */
#define gst_simpacketdepay_parent_class parent_class
G_DEFINE_TYPE(GstSimpacketdepay, gst_simpacketdepay, GST_TYPE_ELEMENT);
static GstFlowReturn gst_simpacketdepay_chain (GstPad *pad, GstObject *parent, GstBuffer *buf);
static void gst_simpacketdepay_class_init (GstSimpacketdepayClass * klass)
{
GObjectClass *gobject_class;
GstElementClass *gstelement_class;
gstelement_class = (GstElementClass *) klass;
/* Set sink and src pad capabilities */
gst_element_class_add_pad_template (gstelement_class, gst_static_pad_template_get(&src_factory));
gst_element_class_add_pad_template (gstelement_class, gst_static_pad_template_get(&sink_factory));
/* Set metadata describing the element */
gst_element_class_set_details_simple (
gstelement_class,
"simpacketdepay plugin",
"simpacketdepay plugin",
"Sim Packet depay",
"Test"
);
}
static void gst_simpacketdepay_init (GstSimpacketdepay * simpacketdepay)
{
simpacketdepay->sinkpad = gst_pad_new_from_static_template (&sink_factory, "sink");
simpacketdepay->srcpad = gst_pad_new_from_static_template (&src_factory, "src");
gst_pad_use_fixed_caps(simpacketdepay->sinkpad);
gst_pad_use_fixed_caps(simpacketdepay->srcpad);
gst_element_add_pad (GST_ELEMENT (simpacketdepay), simpacketdepay->sinkpad);
gst_element_add_pad (GST_ELEMENT (simpacketdepay), simpacketdepay->srcpad);
gst_pad_set_chain_function (simpacketdepay->sinkpad, gst_simpacketdepay_chain);
}
static GstFlowReturn gst_simpacketdepay_chain (GstPad *pad, GstObject *parent, GstBuffer *inBuf)
{
GstSimpacketdepay *filter = GST_SIMPACKETDEPAY(parent);
GstMapInfo info;
gst_buffer_map(inBuf, &info, GST_MAP_READ);
const size_t inSize = info.size;
printf("Incoming size %lu\n", info.size);
gst_buffer_unmap(inBuf, &info);
GstBuffer* outBuf = gst_buffer_copy(inBuf);
GstMemory* const inMemory = gst_buffer_get_memory(inBuf, 0);
GstMemory* const outMemory = gst_memory_share(inMemory, 12, inSize - 12);
gst_buffer_remove_all_memory(outBuf);
gst_buffer_prepend_memory(outBuf, outMemory);
gst_buffer_map(outBuf, &info, GST_MAP_READ);
printf("Outgoing size: %lu\n", info.size);
fflush(stdout);
gst_buffer_unmap(outBuf, &info);
gst_buffer_unref (inBuf);
GstFlowReturn result = gst_pad_push (filter->srcpad, outBuf);
return result;
}
static gboolean simpacketdepay_plugin_init (GstPlugin * plugin)
{
GST_DEBUG_CATEGORY_INIT (gst_simpacketdepay_debug, "simpacketdepay", 0, "simpacketdepay");
return gst_element_register (plugin, "simpacketdepay", GST_RANK_NONE, GST_TYPE_SIMPACKETDEPAY);
}
#ifndef VERSION
#define VERSION "1.0.0"
#endif
#ifndef PACKAGE
#define PACKAGE "FIXME_package"
#endif
#ifndef PACKAGE_NAME
#define PACKAGE_NAME "FIXME_package_name"
#endif
#ifndef GST_PACKAGE_ORIGIN
#define GST_PACKAGE_ORIGIN "http://FIXME.org/"
#endif
GST_PLUGIN_DEFINE (
GST_VERSION_MAJOR,
GST_VERSION_MINOR,
simpacketdepay,
"FIXME plugin description",
simpacketdepay_plugin_init,
VERSION,
"LGPL",
PACKAGE_NAME,
GST_PACKAGE_ORIGIN
)
The negotiations and everything goes well, until I push the first buffer to the source pad from gst_simpacketdepay_chain
with GstFlowReturn result = gst_pad_push (filter->srcpad, outBuf);
Then I get the following error (pasted the detailed debug log here)
0:00:00.510871708 42302 0x55fbd0c44000 LOG audiodecoder gstaudiodecoder.c:2034:gst_audio_decoder_chain:<opusdec0> received buffer of size 160 with ts 0:00:00.006492658, duration 99:99:99.999999999
0:00:00.510877845 42302 0x55fbd0c44000 WARN audiodecoder gstaudiodecoder.c:2084:gst_audio_decoder_chain:<opusdec0> error: decoder not initialized
0:00:00.510882963 42302 0x55fbd0c44000 DEBUG GST_MESSAGE gstelement.c:2110:gst_element_message_full_with_details:<opusdec0> start
0:00:00.510896592 42302 0x55fbd0c44000 INFO GST_ERROR_SYSTEM gstelement.c:2140:gst_element_message_full_with_details:<opusdec0> posting message: GStreamer error: negotiation problem.
0:00:00.510910301 42302 0x55fbd0c44000 LOG GST_MESSAGE gstmessage.c:303:gst_message_new_custom: source opusdec0: creating new message 0x7f519c002910 error
0:00:00.510919198 42302 0x55fbd0c44000 WARN structure gststructure.c:1861:priv_gst_structure_append_to_gstring: No value transform to serialize field 'gerror' of type 'GError'
0:00:00.510929043 42302 0x55fbd0c44000 DEBUG GST_BUS gstbus.c:315:gst_bus_post:<bus1> [msg 0x7f519c002910] posting on bus error message: 0x7f519c002910, time 99:99:99.999999999, seq-num 43, element 'opusdec0', GstMessageError, gerror=(GError)NULL, debug=(string)"gstaudiodecoder.c\(2084\):\ gst_audio_decoder_chain\ \(\):\ /GstPipeline:pipeline0/GstOpusDec:opusdec0:\012decoder\ not\ initialized";
0:00:00.510937098 42302 0x55fbd0c44000 DEBUG bin gstbin.c:3718:gst_bin_handle_message_func:<pipeline0> [msg 0x7f519c002910] handling child opusdec0 message of type error
0:00:00.510942210 42302 0x55fbd0c44000 DEBUG bin gstbin.c:3727:gst_bin_handle_message_func:<pipeline0> got ERROR message, unlocking state change
0:00:00.510947151 42302 0x55fbd0c44000 DEBUG bin gstbin.c:4065:gst_bin_handle_message_func:<pipeline0> posting message upward
0:00:00.510955219 42302 0x55fbd0c44000 WARN structure gststructure.c:1861:priv_gst_structure_append_to_gstring: No value transform to serialize field 'gerror' of type 'GError'
0:00:00.510962328 42302 0x55fbd0c44000 DEBUG GST_BUS gstbus.c:315:gst_bus_post:<bus2> [msg 0x7f519c002910] posting on bus error message: 0x7f519c002910, time 99:99:99.999999999, seq-num 43, element 'opusdec0', GstMessageError, gerror=(GError)NULL, debug=(string)"gstaudiodecoder.c\(2084\):\ gst_audio_decoder_chain\ \(\):\ /GstPipeline:pipeline0/GstOpusDec:opusdec0:\012decoder\ not\ initialized";
<opusdec0> error: decoder not initialized
? Do I need to do something special to initialize the opus decoder? What step do I miss?
I was able to solve the issue. When the plugin element enters playing state we should push a gst_event_new_caps
event to the source pad. Even with fixed caps... I haven't found anything in the documentation that can explain this requirement.
So I added the following state change handler and the pipeline started to work:
static GstStateChangeReturn gst_simpacketdepay_change_state (GstElement *element, GstStateChange transition)
{
const GstStateChangeReturn result = GST_ELEMENT_CLASS(parent_class)->change_state (element, transition);
if (result == GST_STATE_CHANGE_FAILURE) {
return result;
}
switch (transition) {
case GST_STATE_CHANGE_PAUSED_TO_PLAYING: {
GstSimpacketdepay *filter = GST_SIMPACKETDEPAY(element);
gst_pad_push_event(filter->srcpad, gst_event_new_caps(gst_pad_template_get_caps(gst_static_pad_template_get(&src_factory))));
} break;
default:
break;
}
return result;
}
I'm sad to see how underdocumented this part of GStreamer is.