Skip to content

Instantly share code, notes, and snippets.

@dketov
Created February 19, 2018 06:52
Show Gist options
  • Select an option

  • Save dketov/5e6cc5f8a73ff4325023b4b9365cf70c to your computer and use it in GitHub Desktop.

Select an option

Save dketov/5e6cc5f8a73ff4325023b4b9365cf70c to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import sys, os
import gi
gi.require_version('Gst', '1.0')
gi.require_version('Gtk', '3.0')
gi.require_version('GdkX11', '3.0')
gi.require_version('GstVideo', '1.0')
from gi.repository import Gst, Gtk, GLib, GdkX11, GstVideo
# http://docs.gstreamer.com/display/GstSDK/Basic+tutorial+5%3A+GUI+toolkit+integration
class Player(object):
def __init__(self):
# initialize GTK
Gtk.init(sys.argv)
# initialize GStreamer
Gst.init(sys.argv)
self.state = Gst.State.NULL
self.duration = Gst.CLOCK_TIME_NONE
self.pipeline = Gst.parse_launch(
"filesrc name=filesrc ! decodebin name=decodebin ! xvimagesink name=xvimagesink"
)
if not self.pipeline:
print("ERROR: Could not create pipeline.")
sys.exit(1)
self.filesrc = self.pipeline.get_child_by_name("filesrc")
self.decodebin = self.pipeline.get_child_by_name("decodebin")
self.xvimagesink = self.pipeline.get_child_by_name("xvimagesink")
# set up URI
self.filesrc.set_property(
"location", os.path.abspath(sys.argv[1]))
# connect to interesting signals in decodebin
self.decodebin.connect("autoplug-continue", self.on_tags_changed)
#self.playbin.connect("audio-tags-changed", self.on_tags_changed)
#self.playbin.connect("text-tags-changed", self.on_tags_changed)
# create the GUI
self.build_ui()
# instruct the bus to emit signals for each received message
# and connect to the interesting signals
bus = self.pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message::error", self.on_error)
bus.connect("message::eos", self.on_eos)
bus.connect("message::state-changed", self.on_state_changed)
bus.connect("message::application", self.on_application_message)
# set the playbin to PLAYING (start playback), register refresh callback
# and start the GTK main loop
def start(self):
# start playing
ret = self.pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
print("ERROR: Unable to set the pipeline to the playing state")
sys.exit(1)
# register a function that GLib will call every second
GLib.timeout_add_seconds(1, self.refresh_ui)
# start the GTK main loop. we will not regain control until
# Gtk.main_quit() is called
Gtk.main()
# free resources
self.cleanup()
# set the playbin state to NULL and remove the reference to it
def cleanup(self):
if self.pipeline:
self.pipeline.set_state(Gst.State.NULL)
self.pipeline = None
def build_ui(self):
main_window = Gtk.Window.new(Gtk.WindowType.TOPLEVEL)
main_window.connect("delete-event", self.on_delete_event)
video_window = Gtk.DrawingArea.new()
video_window.set_double_buffered(False)
video_window.connect("realize", self.on_realize)
video_window.connect("draw", self.on_draw)
play_button = Gtk.Button.new_from_stock(Gtk.STOCK_MEDIA_PLAY)
play_button.connect("clicked", self.on_play)
pause_button = Gtk.Button.new_from_stock(Gtk.STOCK_MEDIA_PAUSE)
pause_button.connect("clicked", self.on_pause)
stop_button = Gtk.Button.new_from_stock(Gtk.STOCK_MEDIA_STOP)
stop_button.connect("clicked", self.on_stop)
self.slider = Gtk.HScale.new_with_range(0, 100, 1)
self.slider.set_draw_value(False)
self.slider_update_signal_id = self.slider.connect(
"value-changed", self.on_slider_changed)
self.streams_list = Gtk.TextView.new()
self.streams_list.set_editable(False)
controls = Gtk.HBox.new(False, 0)
controls.pack_start(play_button, False, False, 2)
controls.pack_start(pause_button, False, False, 2)
controls.pack_start(stop_button, False, False, 2)
controls.pack_start(self.slider, True, True, 0)
main_hbox = Gtk.HBox.new(False, 0)
main_hbox.pack_start(video_window, True, True, 0)
main_hbox.pack_start(self.streams_list, False, False, 2)
main_box = Gtk.VBox.new(False, 0)
main_box.pack_start(main_hbox, True, True, 0)
main_box.pack_start(controls, False, False, 0)
main_window.add(main_box)
main_window.set_default_size(640, 480)
main_window.show_all()
# this function is called when the GUI toolkit creates the physical window
# that will hold the video
# at this point we can retrieve its handler and pass it to GStreamer
# through the XOverlay interface
def on_realize(self, widget):
window = widget.get_window()
window_handle = window.get_xid()
# pass it to playbin, which implements XOverlay and will forward
# it to the video sink
self.xvimagesink.set_window_handle(window_handle)
# self.playbin.set_xwindow_id(window_handle)
# this function is called when the PLAY button is clicked
def on_play(self, button):
self.pipeline.set_state(Gst.State.PLAYING)
pass
# this function is called when the PAUSE button is clicked
def on_pause(self, button):
self.pipeline.set_state(Gst.State.PAUSED)
pass
# this function is called when the STOP button is clicked
def on_stop(self, button):
self.pipeline.set_state(Gst.State.READY)
pass
# this function is called when the main window is closed
def on_delete_event(self, widget, event):
self.on_stop(None)
Gtk.main_quit()
# this function is called every time the video window needs to be
# redrawn. GStreamer takes care of this in the PAUSED and PLAYING states.
# in the other states we simply draw a black rectangle to avoid
# any garbage showing up
def on_draw(self, widget, cr):
if self.state < Gst.State.PAUSED:
allocation = widget.get_allocation()
cr.set_source_rgb(0, 0, 0)
cr.rectangle(0, 0, allocation.width, allocation.height)
cr.fill()
return False
# this function is called when the slider changes its position.
# we perform a seek to the new position here
def on_slider_changed(self, range):
value = self.slider.get_value()
self.pipeline.seek_simple(Gst.Format.TIME,
Gst.SeekFlags.FLUSH | Gst.SeekFlags.KEY_UNIT,
value * Gst.SECOND)
# this function is called periodically to refresh the GUI
def refresh_ui(self):
current = -1
# we do not want to update anything unless we are in the PAUSED
# or PLAYING states
if self.state < Gst.State.PAUSED:
return True
# if we don't know it yet, query the stream duration
if self.duration == Gst.CLOCK_TIME_NONE:
ret, self.duration = self.pipeline.query_duration(Gst.Format.TIME)
if not ret:
print("ERROR: Could not query current duration")
else:
# set the range of the slider to the clip duration (in seconds)
self.slider.set_range(0, self.duration / Gst.SECOND)
ret, current = self.pipeline.query_position(Gst.Format.TIME)
if ret:
# block the "value-changed" signal, so the on_slider_changed
# callback is not called (which would trigger a seek the user
# has not requested)
self.slider.handler_block(self.slider_update_signal_id)
# set the position of the slider to the current pipeline position
# (in seconds)
self.slider.set_value(current / Gst.SECOND)
# enable the signal again
self.slider.handler_unblock(self.slider_update_signal_id)
return True
# this function is called when new metadata is discovered in the stream
def on_tags_changed(self, *args, **kwargs):
print(args)
print(kwargs)
def _on_tags_changed(self, pipeline, stream):
# we are possibly in a GStreamer working thread, so we notify
# the main thread of this event through a message in the bus
self.pipeline.post_message(
Gst.Message.new_application(
self.pipeline,
Gst.Structure.new_empty("tags-changed")))
# this function is called when an error message is posted on the bus
def on_error(self, bus, msg):
err, dbg = msg.parse_error()
print("ERROR:", msg.src.get_name(), ":", err.message)
if dbg:
print("Debug info:", dbg)
# this function is called when an End-Of-Stream message is posted on the bus
# we just set the pipeline to READY (which stops playback)
def on_eos(self, bus, msg):
print("End-Of-Stream reached")
self.playbin.set_state(Gst.State.READY)
# this function is called when the pipeline changes states.
# we use it to keep track of the current state
def on_state_changed(self, bus, msg):
old, new, pending = msg.parse_state_changed()
if not msg.src == self.pipeline:
# not from the playbin, ignore
return
self.state = new
print("State changed from {0} to {1}".format(
Gst.Element.state_get_name(old), Gst.Element.state_get_name(new)))
if old == Gst.State.READY and new == Gst.State.PAUSED:
# for extra responsiveness we refresh the GUI as soons as
# we reach the PAUSED state
self.refresh_ui()
# extract metadata from all the streams and write it to the text widget
# in the GUI
def analyze_streams(self):
# clear current contents of the widget
buffer = self.streams_list.get_buffer()
buffer.set_text("")
# read some properties
#nr_video = self.pipeline.get_property("n-video")
#nr_audio = self.playbin.get_property("n-audio")
#nr_text = self.playbin.get_property("n-text")
for i in range(nr_video):
tags = None
# retrieve the stream's video tags
tags = self.playbin.emit("get-video-tags", i)
if tags:
buffer.insert_at_cursor("video stream {0}\n".format(i))
_, str = tags.get_string(Gst.TAG_VIDEO_CODEC)
buffer.insert_at_cursor(
" codec: {0}\n".format(
str or "unknown"))
for i in range(nr_audio):
tags = None
# retrieve the stream's audio tags
tags = self.playbin.emit("get-audio-tags", i)
if tags:
buffer.insert_at_cursor("\naudio stream {0}\n".format(i))
ret, str = tags.get_string(Gst.TAG_AUDIO_CODEC)
if ret:
buffer.insert_at_cursor(
" codec: {0}\n".format(
str or "unknown"))
ret, str = tags.get_string(Gst.TAG_LANGUAGE_CODE)
if ret:
buffer.insert_at_cursor(
" language: {0}\n".format(
str or "unknown"))
ret, str = tags.get_uint(Gst.TAG_BITRATE)
if ret:
buffer.insert_at_cursor(
" bitrate: {0}\n".format(
str or "unknown"))
for i in range(nr_text):
tags = None
# retrieve the stream's subtitle tags
tags = self.playbin.emit("get-text-tags", i)
if tags:
buffer.insert_at_cursor("\nsubtitle stream {0}\n".format(i))
ret, str = tags.get_string(Gst.TAG_LANGUAGE_CODE)
if ret:
buffer.insert_at_cursor(
" language: {0}\n".format(
str or "unknown"))
# this function is called when an "application" message is posted on the bus
# here we retrieve the message posted by the on_tags_changed callback
def on_application_message(self, bus, msg):
if msg.get_structure().get_name() == "tags-changed":
# if the message is the "tags-changed", update the stream info in
# the GUI
self.analyze_streams()
if __name__ == '__main__':
p = Player()
p.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment