First of all I would like to start by saying that I'm really new to Gstreamer and its capabilities so pardon my ignorance if my understanding or implementation was wrong, I am still learning .
I would like to build a small streaming application in PYTHON
or JAVA
(as I am not proficient in C) with QOS
integrated in it, especially for the package drop count statistics, and RTP
and RTCP seem the perfect match.
For this purpose I implemented a SERVER
#! /usr/bin/env python
import gi
import sys
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
#gst-launch -v rtpbin name=rtpbin audiotestsrc ! audioconvert ! alawenc ! rtppcmapay ! rtpbin.send_rtp_sink_0 \
# rtpbin.send_rtp_src_0 ! udpsink port=10000 host=xxx.xxx.xxx.xxx \
# rtpbin.send_rtcp_src_0 ! udpsink port=10001 host=xxx.xxx.xxx.xxx sync=false async=false \
# udpsrc port=10002 ! rtpbin.recv_rtcp_sink_0
DEST_HOST = '127.0.0.1'
AUDIO_SRC = 'audiotestsrc'
AUDIO_ENC = 'alawenc'
AUDIO_PAY = 'rtppcmapay'
RTP_SEND_PORT = 5002
RTCP_SEND_PORT = 5003
RTCP_RECV_PORT = 5007
GObject.threads_init()
Gst.init(sys.argv)
# the pipeline to hold everything
pipeline = Gst.Pipeline.new('rtp_server')
# the pipeline to hold everything
audiosrc = Gst.ElementFactory.make(AUDIO_SRC, 'audiosrc')
audioconv = Gst.ElementFactory.make('audioconvert', 'audioconv')
audiores = Gst.ElementFactory.make('audioresample', 'audiores')
# the pipeline to hold everything
audioenc = Gst.ElementFactory.make(AUDIO_ENC, 'audioenc')
audiopay = Gst.ElementFactory.make(AUDIO_PAY, 'audiopay')
# add capture and payloading to the pipeline and link
pipeline.add(audiosrc)
pipeline.add(audioconv)
pipeline.add(audiores)
pipeline.add(audioenc)
pipeline.add(audiopay)
audiosrc.link(audioconv)
audioconv.link(audiores)
audiores.link(audioenc)
audioenc.link(audiopay)
# the rtpbin element
rtpbin = Gst.ElementFactory.make('rtpbin', 'rtpbin')
pipeline.add(rtpbin)
# the udp sinks and source we will use for RTP and RTCP
rtpsink = Gst.ElementFactory.make('udpsink', 'rtpsink')
rtpsink.set_property('port', RTP_SEND_PORT)
rtpsink.set_property('host', DEST_HOST)
rtcpsink = Gst.ElementFactory.make('udpsink', 'rtcpsink')
rtcpsink.set_property('port', RTCP_SEND_PORT)
rtcpsink.set_property('host', DEST_HOST)
# no need for synchronisation or preroll on the RTCP sink
rtcpsink.set_property('async', False)
rtcpsink.set_property('sync', False)
rtcpsrc = Gst.ElementFactory.make('udpsrc', 'rtcpsrc')
rtcpsrc.set_property('port', RTCP_RECV_PORT)
pipeline.add(rtpsink)
pipeline.add(rtcpsink)
pipeline.add(rtcpsrc)
# now link all to the rtpbin, start by getting an RTP sinkpad for session 0
sinkpad = Gst.Element.get_request_pad(rtpbin, 'send_rtp_sink_0')
srcpad = Gst.Element.get_static_pad(audiopay, 'src')
lres = Gst.Pad.link(srcpad, sinkpad)
# get the RTP srcpad that was created when we requested the sinkpad above and
# link it to the rtpsink sinkpad
srcpad = Gst.Element.get_static_pad(rtpbin, 'send_rtp_src_0')
sinkpad = Gst.Element.get_static_pad(rtpsink, 'sink')
lres = Gst.Pad.link(srcpad, sinkpad)
# get an RTCP srcpad for sending RTCP to the receiver
srcpad = Gst.Element.get_request_pad(rtpbin, 'send_rtcp_src_0')
sinkpad = Gst.Element.get_static_pad(rtcpsink, 'sink')
lres = Gst.Pad.link(srcpad, sinkpad)
# we also want to receive RTCP, request an RTCP sinkpad for session 0 and
# link it to the srcpad of the udpsrc for RTCP
srcpad = Gst.Element.get_static_pad(rtcpsrc, 'src')
sinkpad = Gst.Element.get_request_pad(rtpbin, 'recv_rtcp_sink_0')
lres = Gst.Pad.link(srcpad, sinkpad)
# set the pipeline to playing
Gst.Element.set_state(pipeline, Gst.State.PLAYING)
# we need to run a GLib main loop to get the messages
mainloop = GObject.MainLoop()
mainloop.run()
Gst.Element.set_state(pipeline, Gst.State.NULL)
and a Client
#! /usr/bin/env python
import gi
import sys
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
#
# A simple RTP receiver
#
# receives alaw encoded RTP audio on port 5002, RTCP is received on port 5003.
# the receiver RTCP reports are sent to port 5007
#
# .-------. .----------. .---------. .-------. .--------.
# RTP |udpsrc | | rtpbin | |pcmadepay| |alawdec| |alsasink|
# port=5002 | src->recv_rtp recv_rtp->sink src->sink src->sink |
# '-------' | | '---------' '-------' '--------'
# | |
# | | .-------.
# | | |udpsink| RTCP
# | send_rtcp->sink | port=5007
# .-------. | | '-------' sync=false
# RTCP |udpsrc | | | async=false
# port=5003 | src->recv_rtcp |
# '-------' '----------'
AUDIO_CAPS = 'application/x-rtp,media=(string)audio,clock-rate=(int)8000,encoding-name=(string)PCMA'
AUDIO_DEPAY = 'rtppcmadepay'
AUDIO_DEC = 'alawdec'
AUDIO_SINK = 'autoaudiosink'
DEST = '127.0.0.1'
RTP_RECV_PORT = 5002
RTCP_RECV_PORT = 5003
RTCP_SEND_PORT = 5007
GObject.threads_init()
Gst.init(sys.argv)
#gst-launch -v rtpbin name=rtpbin \
# udpsrc caps=$AUDIO_CAPS port=$RTP_RECV_PORT ! rtpbin.recv_rtp_sink_0 \
# rtpbin. ! rtppcmadepay ! alawdec ! audioconvert ! audioresample ! autoaudiosink \
# udpsrc port=$RTCP_RECV_PORT ! rtpbin.recv_rtcp_sink_0 \
# rtpbin.send_rtcp_src_0 ! udpsink port=$RTCP_SEND_PORT host=$DEST sync=false async=false
def pad_added_cb(rtpbin, new_pad, depay):
sinkpad = Gst.Element.get_static_pad(depay, 'sink')
lres = Gst.Pad.link(new_pad, sinkpad)
# the pipeline to hold eveything
pipeline = Gst.Pipeline.new('rtp_client')
# the udp src and source we will use for RTP and RTCP
rtpsrc = Gst.ElementFactory.make('udpsrc', 'rtpsrc')
rtpsrc.set_property('port', RTP_RECV_PORT)
# we need to set caps on the udpsrc for the RTP data
caps = Gst.caps_from_string(AUDIO_CAPS)
rtpsrc.set_property('caps', caps)
rtcpsrc = Gst.ElementFactory.make('udpsrc', 'rtcpsrc')
rtcpsrc.set_property('port', RTCP_RECV_PORT)
rtcpsink = Gst.ElementFactory.make('udpsink', 'rtcpsink')
rtcpsink.set_property('port', RTCP_SEND_PORT)
rtcpsink.set_property('host', DEST)
# no need for synchronisation or preroll on the RTCP sink
rtcpsink.set_property('async', False)
rtcpsink.set_property('sync', False)
pipeline.add(rtpsrc)
pipeline.add(rtcpsrc)
pipeline.add(rtcpsink)
# the depayloading and decoding
audiodepay = Gst.ElementFactory.make(AUDIO_DEPAY, 'audiodepay')
audiodec = Gst.ElementFactory.make(AUDIO_DEC, 'audiodec')
# the audio playback and format conversion
audioconv = Gst.ElementFactory.make('audioconvert', 'audioconv')
audiores = Gst.ElementFactory.make('audioresample', 'audiores')
audiosink = Gst.ElementFactory.make(AUDIO_SINK, 'audiosink')
# add depayloading and playback to the pipeline and link
pipeline.add(audiodepay)
pipeline.add(audiodec)
pipeline.add(audioconv)
pipeline.add(audiores)
pipeline.add(audiosink)
audiodepay.link(audiodec)
audiodec.link(audioconv)
audioconv.link(audiores)
audiores.link(audiosink)
# the rtpbin element
rtpbin = Gst.ElementFactory.make('rtpbin', 'rtpbin')
pipeline.add(rtpbin)
# now link all to the rtpbin, start by getting an RTP sinkpad for session 0
srcpad = Gst.Element.get_static_pad(rtpsrc, 'src')
sinkpad = Gst.Element.get_request_pad(rtpbin, 'recv_rtp_sink_0')
lres = Gst.Pad.link(srcpad, sinkpad)
# get an RTCP sinkpad in session 0
srcpad = Gst.Element.get_static_pad(rtcpsrc, 'src')
sinkpad = Gst.Element.get_request_pad(rtpbin, 'recv_rtcp_sink_0')
lres = Gst.Pad.link(srcpad, sinkpad)
# get an RTCP srcpad for sending RTCP back to the sender
srcpad = Gst.Element.get_request_pad(rtpbin, 'send_rtcp_src_0')
sinkpad = Gst.Element.get_static_pad(rtcpsink, 'sink')
lres = Gst.Pad.link(srcpad, sinkpad)
rtpbin.connect('pad-added', pad_added_cb, audiodepay)
# def newManager():
# rtpbin.connect('on-ssrc-active', onSSRCActive)
# def onSSRCActive(self):
# print (self)
# rtpsrc.connect('new-manager', newManager)
Gst.Element.set_state(pipeline, Gst.State.PLAYING)
mainloop = GObject.MainLoop()
mainloop.run()
Gst.Element.set_state(pipeline, Gst.State.NULL)
While the server and client works without problems I cannot find a way or information on how to retrieve the dropped package statistics from RTCP for this RTP stream even if it is in command line.
And any help in that regard would be appreciated!
Here is a mock up, on Linux, just to illustrate a method of listening to the bus messages.
I haven't the time to sort out the QOS Gst.Message, so I hacked something in there to force a QOS, so that you at least see it.
Hopefully this will be enough to get you going in the right direction.
#!/usr/bin/python3
from os import path
import time
import gi
gi.require_version('Gst', '1.0')
gi.require_version('Gtk', '3.0')
gi.require_version('GstVideo', '1.0')
from gi.repository import GObject, Gst, Gtk
from gi.repository import GdkX11, GstVideo
GObject.threads_init()
Gst.init(None)
filename = path.join(path.dirname(path.abspath(__file__)), '../H.mkv')
uri = 'file://' + filename
class Player(object):
def __init__(self):
self.window = Gtk.Window()
self.window.connect('destroy', self.quit)
self.window.set_default_size(800, 450)
self.drawingarea = Gtk.DrawingArea()
self.window.add(self.drawingarea)
# Create GStreamer pipeline
self.pipeline = Gst.Pipeline()
# Create bus to get events from GStreamer pipeline
self.bus = self.pipeline.get_bus()
self.bus.add_signal_watch()
self.bus.connect('message::eos', self.on_eos)
self.bus.connect('message::error', self.on_error)
self.bus.connect('message::qos', self.on_quality_of_service)
self.bus.enable_sync_message_emission()
self.bus.connect('sync-message::element', self.on_sync_message)
# Create GStreamer elements
self.playbin = Gst.ElementFactory.make('playbin', None)
# Add playbin to the pipeline
self.pipeline.add(self.playbin)
# Set properties
self.playbin.set_property('uri', uri)
def run(self):
self.window.show_all()
self.xid = self.drawingarea.get_property('window').get_xid()
self.pipeline.set_state(Gst.State.PLAYING)
time.sleep(2)
self.bus.emit('message::qos',Gst.Message('xxxxx'))
Gtk.main()
def quit(self, window):
self.pipeline.set_state(Gst.State.NULL)
Gtk.main_quit()
def on_sync_message(self, bus, msg):
if msg.get_structure().get_name() == 'prepare-window-handle':
print('prepare-window-handle')
msg.src.set_window_handle(self.xid)
def on_eos(self, bus, msg):
print('End of Service')
def on_error(self, bus, msg):
print('Error', msg.parse_error())
def on_quality_of_service(self, bus, msg):
print('Qos Message:', msg.parse_qos())
p = Player()
p.run()