Last active
April 25, 2022 12:34
-
-
Save RussellLuo/7ec8364f4ed784bdffb0fb974e211406 to your computer and use it in GitHub Desktop.
JPEG encoder based on gstreamer.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
JPEG encoder based on gstreamer. | |
Many thanks to the following code and docs: | |
- The usage of `appsrc` in Python: https://github.com/tamaggo/gstreamer-examples/blob/master/test_gst_appsrc_testvideo_mp4mux.py | |
- The usage of `appsink` in Python: https://gist.github.com/willpatera/7984486 | |
- The usage of both `appsrc` and `appsink` in C++: https://github.com/dkorobkov/gstreamer-appsrc-appsink-example/blob/master/JpegGstEncoder.cpp | |
- What's the string representation of a JPEG caps: See the "Element Pads" section of https://gstreamer.freedesktop.org/data/doc/gstreamer/head/gst-plugins-good/html/gst-plugins-good-plugins-jpegenc.html | |
""" | |
import argparse | |
import time | |
from io import BytesIO | |
import gi | |
gi.require_version('Gst', '1.0') | |
from gi.repository import GObject, Gst | |
GObject.threads_init() | |
Gst.init(None) | |
class MemSrc(object): | |
def __init__(self, name, **properties): | |
self._src = Gst.ElementFactory.make('appsrc', name) | |
for p, value in properties.items(): | |
self._src.set_property(p, value) | |
@property | |
def src(self): | |
return self._src | |
def push(self, data): | |
buf = Gst.Buffer.new_allocate(None, len(data), None) | |
assert buf is not None | |
buf.fill(0, data) | |
self._src.emit('push-buffer', buf) | |
self._src.emit('end-of-stream') | |
class JPEGMemSrc(MemSrc): | |
def __init__(self, name, width, height, frame_rate='0/1', sof_marker=0): | |
caps = Gst.Caps.from_string( | |
'image/jpeg, width=%s, height=%s, framerate=(fraction)%s, sof-marker=%d' % ( | |
width, height, frame_rate, sof_marker)) | |
super(JPEGMemSrc, self).__init__(name, caps=caps, format=Gst.Format.TIME, | |
blocksize=width * height) | |
class MemSink(object): | |
def __init__(self, name, **properties): | |
self._sink = Gst.ElementFactory.make('appsink', name) | |
for p, value in properties.items(): | |
self._sink.set_property(p, value) | |
self._sink.set_property('emit-signals', True) | |
self._data = None | |
self._sink.connect('new-sample', self.pull, self._sink) | |
@property | |
def sink(self): | |
return self._sink | |
@property | |
def data(self): | |
return self._data | |
def pull(self, sink, data): | |
sample = sink.emit('pull-sample') | |
buf = sample.get_buffer() | |
caps = sample.get_caps() | |
# print caps.get_structure(0).get_value('format') | |
# print caps.get_structure(0).get_value('width') | |
# print caps.get_structure(0).get_value('height') | |
# print buf.get_size() | |
self._data = buf.extract_dup(0, buf.get_size()) | |
return Gst.FlowReturn.OK | |
class JPEGMemSink(MemSink): | |
def __init__(self, name): | |
caps = Gst.Caps.from_string( | |
'image/jpeg, width=(int)[ 16, 65535 ], height=(int)[ 16, 65535 ], ' | |
'framerate=(fraction)[ 0/1, 2147483647/1 ], sof-marker=(int){ 0, 1, 2, 9 }') | |
super(JPEGMemSink, self).__init__(name, caps=caps) | |
class Pipeline(Gst.Pipeline): | |
def add_and_link_many(self, *elements): | |
length = len(elements) | |
assert length > 1 | |
for element in elements: | |
self.add(element) | |
for i in range(length - 1): | |
elements[i].link(elements[i+1]) | |
def play_in_loop(self): | |
bus = self.get_bus() | |
while True: | |
msg = bus.poll(Gst.MessageType.ANY, Gst.CLOCK_TIME_NONE) | |
t = msg.type | |
if t == Gst.MessageType.EOS: | |
# print('EOS') | |
self.set_state(Gst.State.NULL) | |
break | |
elif t == Gst.MessageType.ERROR: | |
err, debug = msg.parse_error() | |
# print('Error: %s' % err, debug) | |
break | |
elif t == Gst.MessageType.WARNING: | |
err, debug = msg.parse_warning() | |
# print('Warning: %s' % err, debug) | |
elif t == Gst.MessageType.STATE_CHANGED: | |
pass | |
elif t == Gst.MessageType.STREAM_STATUS: | |
pass | |
# else: | |
# print(t) | |
# print('Unknown message: %s' % msg) | |
# print('Bye') | |
class JPEGEncoder(object): | |
def __init__(self, data, width, height): | |
self._data = data | |
self._width = width | |
self._height = height | |
def encode(self, gamma, quality): | |
jpegsrc = JPEGMemSrc('jpegsrc', self._width, self._height) | |
jpegdec = Gst.ElementFactory.make('jpegdec', 'jpegdec') | |
gamma_e = Gst.ElementFactory.make('gamma', 'gamma') | |
gamma_e.set_property('gamma', gamma) | |
jpegenc = Gst.ElementFactory.make('jpegenc', 'jpegenc') | |
jpegenc.set_property('quality', quality) | |
jpegsink = JPEGMemSink('jpegsink') | |
pipeline = Pipeline() # 'jpeg-pipeline') | |
pipeline.add_and_link_many(jpegsrc.src, jpegdec, gamma_e, jpegenc, jpegsink.sink) | |
pipeline.set_state(Gst.State.PLAYING) | |
jpegsrc.push(self._data) | |
pipeline.play_in_loop() | |
return jpegsink.data | |
if __name__ == '__main__': | |
# Runable example | |
parser = argparse.ArgumentParser() | |
parser.add_argument('in_jpeg_path', help='The path of the input JPEG.') | |
parser.add_argument('out_jpeg_path', help='The path of the output JPEG.') | |
parser.add_argument('--gamma', type=float, default=1.0, | |
help='The gamma property of the output JPEG. Defaults to 1.0') | |
parser.add_argument('--quality', type=int, default=85, | |
help='The quality property of the output JPEG. Defaults to 85') | |
args = parser.parse_args() | |
with open(args.in_jpeg_path, 'rb') as in_f: | |
in_data = in_f.read() | |
# from PIL import Image | |
# in_jpeg = Image.open(BytesIO(in_data)) | |
width = 986 # in_jpeg.width | |
height = 766 # in_jpeg.height | |
encoded_data = JPEGEncoder(in_data, width, height).encode(args.gamma, args.quality) | |
if encoded_data is not None: | |
with open(args.out_jpeg_path, 'wb') as out_f: | |
out_f.write(encoded_data) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment