Skip to content

Instantly share code, notes, and snippets.

@TimSC
Last active January 2, 2016 15:29
Show Gist options
  • Select an option

  • Save TimSC/8324156 to your computer and use it in GitHub Desktop.

Select an option

Save TimSC/8324156 to your computer and use it in GitHub Desktop.
Demonstration of multipart/x-mixed-replace decoding to get MJPEG frames from web server
#Demonstration of multipart/x-mixed-replace decoding to get MJPEG frames from web server
#by Tim Sheerman-Chase 2014
#This code may be used under the terms of the CC0 license
#https://creativecommons.org/publicdomain/zero/1.0/
import pycurl
class HandleJpegData(object):
def __init__(self):
self.count = 0
def write(self, contentType, dat):
fi = open("test"+str(self.count)+".jpeg","wb")
fi.write(dat)
fi.close()
self.count += 1
def HandleBinData(contentType, dat):
print contentType, len(dat)
class DecodeXMixedReplace(object):
def __init__(self):
self.boundary = None
self.rxBuff = ""
self.inBinarySection = False
self.contentLength = None
self.contentType = None
self.callback = HandleBinData
self.foundBoundary = 0
def writeHeader(self, dat):
print dat
if dat[:13].lower()=="content-type:":
valsplit = dat[13:].split(";")
dataType = valsplit[0].strip()
if dataType.lower() != "multipart/x-mixed-replace":
raise Exception("Unexpected mime type: "+dataType)
boundaryVal = valsplit[1].strip()
boundarySplit = boundaryVal.split("=")
self.boundary = boundarySplit[1]
def writeBody(self, dat):
self.rxBuff += dat
eol = 0
while eol >= 0 and not self.inBinarySection:
eol = self.rxBuff.find("\r\n")
if eol < 0: continue
li = self.rxBuff[:eol]
#print eol, "'"+str(li[:100])+"'"
self.rxBuff = self.rxBuff[eol+2:]
if eol == 0 and self.foundBoundary:
self.inBinarySection = True
return
if li == self.boundary: #Boundary mark
self.foundBoundary = 1
if li == "--"+self.boundary: #Two different styles of boundary marks
self.foundBoundary = 1
if li[:15].lower()=="content-length:":
self.contentLength = int(li[15:].strip())
if li[:13].lower()=="content-type:":
self.contentType = li[13:].strip()
if self.inBinarySection and self.contentLength is None:
#If content length is not specified, look for the end of jpeg
jpegEnd = '\xff\xd9'
twoLineBreaks = jpegEnd+'\r\n\r\n'+self.boundary
eol = self.rxBuff.find(twoLineBreaks)
if eol >= 0:
#print ":".join("{0:x}".format(ord(c)) for c in self.rxBuff[eol-10:eol+10])
binData = self.rxBuff[:eol+2]
self.callback(self.contentType, binData)
self.rxBuff = self.rxBuff[eol+2:]
self.contentLength = None
self.inBinarySection = False
self.contentType = None
self.foundBoundary = 0
if self.inBinarySection and self.contentLength is not None and len(self.rxBuff) >= self.contentLength:
binData = self.rxBuff[:self.contentLength]
self.callback(self.contentType, binData)
self.rxBuff = self.rxBuff[self.contentLength:]
self.contentLength = None
self.inBinarySection = False
self.contentType = None
self.foundBoundary = 0
def Get(url, rx, userpass = None):
c = pycurl.Curl()
c.setopt(pycurl.URL, url)
if userpass is not None: c.setopt(pycurl.USERPWD, userpass)
c.setopt(pycurl.WRITEFUNCTION, rx.writeBody)
c.setopt(pycurl.HEADERFUNCTION, rx.writeHeader)
c.perform()
if __name__=="__main__":
rx = DecodeXMixedReplace()
jpegOut = HandleJpegData()
rx.callback = jpegOut.write
url = "http://umevakameran.net.umea.se/mjpg/video.mjpg"
Get(url, rx)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment