Skip to content

Instantly share code, notes, and snippets.

@RussellLuo
Last active April 25, 2022 12:34
Show Gist options
  • Save RussellLuo/7ec8364f4ed784bdffb0fb974e211406 to your computer and use it in GitHub Desktop.
Save RussellLuo/7ec8364f4ed784bdffb0fb974e211406 to your computer and use it in GitHub Desktop.
JPEG encoder based on gstreamer.
# -*- coding: utf-8 -*-
"""
JPEG encoder based on gstreamer.
Many thanks to the following code and docs:
- The usage of `appsrc` in Python: https://github.com/tamaggo/gstreamer-examples/blob/master/test_gst_appsrc_testvideo_mp4mux.py
- The usage of `appsink` in Python: https://gist.github.com/willpatera/7984486
- The usage of both `appsrc` and `appsink` in C++: https://github.com/dkorobkov/gstreamer-appsrc-appsink-example/blob/master/JpegGstEncoder.cpp
- What's the string representation of a JPEG caps: See the "Element Pads" section of https://gstreamer.freedesktop.org/data/doc/gstreamer/head/gst-plugins-good/html/gst-plugins-good-plugins-jpegenc.html
"""
import argparse
import time
from io import BytesIO
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
GObject.threads_init()
Gst.init(None)
class MemSrc(object):
def __init__(self, name, **properties):
self._src = Gst.ElementFactory.make('appsrc', name)
for p, value in properties.items():
self._src.set_property(p, value)
@property
def src(self):
return self._src
def push(self, data):
buf = Gst.Buffer.new_allocate(None, len(data), None)
assert buf is not None
buf.fill(0, data)
self._src.emit('push-buffer', buf)
self._src.emit('end-of-stream')
class JPEGMemSrc(MemSrc):
def __init__(self, name, width, height, frame_rate='0/1', sof_marker=0):
caps = Gst.Caps.from_string(
'image/jpeg, width=%s, height=%s, framerate=(fraction)%s, sof-marker=%d' % (
width, height, frame_rate, sof_marker))
super(JPEGMemSrc, self).__init__(name, caps=caps, format=Gst.Format.TIME,
blocksize=width * height)
class MemSink(object):
def __init__(self, name, **properties):
self._sink = Gst.ElementFactory.make('appsink', name)
for p, value in properties.items():
self._sink.set_property(p, value)
self._sink.set_property('emit-signals', True)
self._data = None
self._sink.connect('new-sample', self.pull, self._sink)
@property
def sink(self):
return self._sink
@property
def data(self):
return self._data
def pull(self, sink, data):
sample = sink.emit('pull-sample')
buf = sample.get_buffer()
caps = sample.get_caps()
# print caps.get_structure(0).get_value('format')
# print caps.get_structure(0).get_value('width')
# print caps.get_structure(0).get_value('height')
# print buf.get_size()
self._data = buf.extract_dup(0, buf.get_size())
return Gst.FlowReturn.OK
class JPEGMemSink(MemSink):
def __init__(self, name):
caps = Gst.Caps.from_string(
'image/jpeg, width=(int)[ 16, 65535 ], height=(int)[ 16, 65535 ], '
'framerate=(fraction)[ 0/1, 2147483647/1 ], sof-marker=(int){ 0, 1, 2, 9 }')
super(JPEGMemSink, self).__init__(name, caps=caps)
class Pipeline(Gst.Pipeline):
def add_and_link_many(self, *elements):
length = len(elements)
assert length > 1
for element in elements:
self.add(element)
for i in range(length - 1):
elements[i].link(elements[i+1])
def play_in_loop(self):
bus = self.get_bus()
while True:
msg = bus.poll(Gst.MessageType.ANY, Gst.CLOCK_TIME_NONE)
t = msg.type
if t == Gst.MessageType.EOS:
# print('EOS')
self.set_state(Gst.State.NULL)
break
elif t == Gst.MessageType.ERROR:
err, debug = msg.parse_error()
# print('Error: %s' % err, debug)
break
elif t == Gst.MessageType.WARNING:
err, debug = msg.parse_warning()
# print('Warning: %s' % err, debug)
elif t == Gst.MessageType.STATE_CHANGED:
pass
elif t == Gst.MessageType.STREAM_STATUS:
pass
# else:
# print(t)
# print('Unknown message: %s' % msg)
# print('Bye')
class JPEGEncoder(object):
def __init__(self, data, width, height):
self._data = data
self._width = width
self._height = height
def encode(self, gamma, quality):
jpegsrc = JPEGMemSrc('jpegsrc', self._width, self._height)
jpegdec = Gst.ElementFactory.make('jpegdec', 'jpegdec')
gamma_e = Gst.ElementFactory.make('gamma', 'gamma')
gamma_e.set_property('gamma', gamma)
jpegenc = Gst.ElementFactory.make('jpegenc', 'jpegenc')
jpegenc.set_property('quality', quality)
jpegsink = JPEGMemSink('jpegsink')
pipeline = Pipeline() # 'jpeg-pipeline')
pipeline.add_and_link_many(jpegsrc.src, jpegdec, gamma_e, jpegenc, jpegsink.sink)
pipeline.set_state(Gst.State.PLAYING)
jpegsrc.push(self._data)
pipeline.play_in_loop()
return jpegsink.data
if __name__ == '__main__':
# Runable example
parser = argparse.ArgumentParser()
parser.add_argument('in_jpeg_path', help='The path of the input JPEG.')
parser.add_argument('out_jpeg_path', help='The path of the output JPEG.')
parser.add_argument('--gamma', type=float, default=1.0,
help='The gamma property of the output JPEG. Defaults to 1.0')
parser.add_argument('--quality', type=int, default=85,
help='The quality property of the output JPEG. Defaults to 85')
args = parser.parse_args()
with open(args.in_jpeg_path, 'rb') as in_f:
in_data = in_f.read()
# from PIL import Image
# in_jpeg = Image.open(BytesIO(in_data))
width = 986 # in_jpeg.width
height = 766 # in_jpeg.height
encoded_data = JPEGEncoder(in_data, width, height).encode(args.gamma, args.quality)
if encoded_data is not None:
with open(args.out_jpeg_path, 'wb') as out_f:
out_f.write(encoded_data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment