Created
June 4, 2015 22:25
-
-
Save mfm24/a9440c0fb4cc49634594 to your computer and use it in GitHub Desktop.
Progressive png writer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# MFM 2015-06-04 | |
# Trying to make this send data over wsgi with a | |
# generator to give a time-dependant graph | |
# goal is to acceot a generator like: | |
# def source(): | |
# while True: | |
# time.sleep(1) | |
# yield random() | |
# and get updates live in a webpage. | |
# need to use a zlib compress object and see if it works... | |
# We do a single IDAT per line! | |
# | |
# Changes from png_bargraph.py | |
# Uses 8-bits per pixel. This produces files | |
# pi.png: 94B (90B for bit version) | |
# random.png: 2KB (2KB for bit version) | |
# sine.png: 6286B (3642B for bit version) 6298B progressive | |
# 30818B with progressive IDATS! (one per line) | |
# So bits are more efficient, but more like 2x than 8x more | |
# | |
# This works fine in Chrome! Also works with a single IDAT, which can | |
# be slightly more efficient, but does need all data at once (for IDAT | |
# length to be valid...) | |
from __future__ import division | |
import itertools | |
import struct | |
import zlib | |
def yield_block(header, data): | |
assert len(header)==4, 'header must be 4 bytes!' | |
# length: | |
yield struct.pack('! L', len(data)) | |
# chunk type, 4 byte header | |
yield header | |
# data | |
yield data | |
# crc | |
yield struct.pack( | |
'! L', | |
zlib.crc32("".join([header, data])) & 0xffffffff) | |
def compress_gen(gen): | |
# yield compressed bytes from gen | |
# we need to do a flush per line or else we don't get many | |
# sections | |
c_obj = zlib.compressobj() | |
for d in gen: | |
yield c_obj.compress(d) | |
yield c_obj.flush(zlib.Z_SYNC_FLUSH) | |
yield c_obj.flush() | |
def int_to_pngline(data, width, filt=0, white=0xFF, black=0): | |
# converts ints to a png line with a filter byte, then enough | |
# pixels to fill the width, starting with n white and padding with | |
# black | |
for d in data: | |
d = min(d, width) | |
r = [filt] + ([white] * d) | |
r += [black] * (width - d) | |
yield str(bytearray(r)) | |
def make_bar_png(data_generator, width, height, progressive=True): | |
def make_line(line): | |
r = [0xFF] * line | |
r += [0x00] * (width - line) | |
return r | |
bit_depth = 8 | |
color_type = 0 # grayscale | |
compression_method, filter_method, interlace_method = 0, 0, 0 | |
yield str(bytearray([0x89, 'P', 'N', 'G', '\r', '\n', 0x1A, '\n'])) | |
# our header block | |
for b in yield_block('IHDR', | |
struct.pack('! LLBBBBB', | |
width, height, bit_depth, | |
color_type, compression_method, | |
filter_method, interlace_method)): | |
yield b | |
#unfiltered data (start with 0 as filterbyte) | |
if progressive: | |
# we compress all data with single zlib chunk | |
# but can send intermediate IDATs as we go | |
# we need a 0 filter byte per line | |
src = compress_gen(int_to_pngline(data_generator, width)) | |
for d in src: | |
if d: | |
for b in yield_block('IDAT', d): | |
yield b | |
else: | |
# we do a single IDAT: | |
dat = [str(bytearray([0]+make_line(d))) for d in data_generator] | |
for b in yield_block('IDAT', zlib.compress("".join(dat))): | |
yield b | |
for b in yield_block('IEND', ''): | |
yield b | |
def test(): | |
import math | |
with open('../pi.png', 'w') as f: | |
dat = [int(x) for x in str(math.pi) if x != '.'] | |
for d in make_bar_png(dat, 10, len(dat)): | |
f.write(d) | |
import random | |
with open('../random.png', 'w') as f: | |
dat = [int(500**random.random()) for x in xrange(512)] | |
for d in make_bar_png(dat, 500, len(dat)): | |
f.write(d) | |
with open('../sine.png', 'w') as f: | |
dat = [int(512*(1+math.sin(x/32.0))) for x in xrange(1024)] | |
for d in make_bar_png(dat, 1024, len(dat)): | |
f.write(d) | |
def slow_gen(g, delay=0.01): | |
import time | |
for d in g: | |
time.sleep(delay) | |
yield d | |
def test_web(): | |
# create a bottle app that yields our generator. Hopefully | |
# browsers will display as data arrives! | |
import bottle | |
@bottle.get('/slow_png.png') | |
def get_slow_png(): | |
# lots of IDATs | |
bottle.response.content_type = "image/png" | |
dat = [int(512*(1+math.sin(x/32.0))) for x in xrange(1024)] | |
return make_bar_png(slow_gen(dat), 1024, len(dat)) | |
@bottle.get('/slow_png2.png') | |
def get_slow_png2(): | |
# single IDAT, still displays progressively | |
bottle.response.content_type = "image/png" | |
dat = [int(512*(1+math.sin(x/32.0))) for x in xrange(1024)] | |
data = list(make_bar_png(dat, 1024, len(dat), progressive=False)) | |
return slow_gen("".join(data), delay=0.001) | |
@bottle.get('/fast_png.png') | |
def get_fast_png(): | |
bottle.response.content_type = "image/png" | |
dat = [int(512*(1+math.sin(x/32.0))) for x in xrange(1024)] | |
return make_bar_png(dat, 1024, len(dat)) | |
@bottle.get('/counter') | |
def get_counter(): | |
for x in xrange(100): | |
yield "<br>%s" % x | |
time.sleep(1) | |
bottle.run(debug=True) | |
if __name__ == "__main__": | |
import math | |
import time | |
test_web() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment