Last active
January 2, 2016 15:29
-
-
Save TimSC/8324156 to your computer and use it in GitHub Desktop.
Demonstration of multipart/x-mixed-replace decoding to get MJPEG frames from web server
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #Demonstration of multipart/x-mixed-replace decoding to get MJPEG frames from web server | |
| #by Tim Sheerman-Chase 2014 | |
| #This code may be used under the terms of the CC0 license | |
| #https://creativecommons.org/publicdomain/zero/1.0/ | |
| import pycurl | |
| class HandleJpegData(object): | |
| def __init__(self): | |
| self.count = 0 | |
| def write(self, contentType, dat): | |
| fi = open("test"+str(self.count)+".jpeg","wb") | |
| fi.write(dat) | |
| fi.close() | |
| self.count += 1 | |
| def HandleBinData(contentType, dat): | |
| print contentType, len(dat) | |
| class DecodeXMixedReplace(object): | |
| def __init__(self): | |
| self.boundary = None | |
| self.rxBuff = "" | |
| self.inBinarySection = False | |
| self.contentLength = None | |
| self.contentType = None | |
| self.callback = HandleBinData | |
| self.foundBoundary = 0 | |
| def writeHeader(self, dat): | |
| print dat | |
| if dat[:13].lower()=="content-type:": | |
| valsplit = dat[13:].split(";") | |
| dataType = valsplit[0].strip() | |
| if dataType.lower() != "multipart/x-mixed-replace": | |
| raise Exception("Unexpected mime type: "+dataType) | |
| boundaryVal = valsplit[1].strip() | |
| boundarySplit = boundaryVal.split("=") | |
| self.boundary = boundarySplit[1] | |
| def writeBody(self, dat): | |
| self.rxBuff += dat | |
| eol = 0 | |
| while eol >= 0 and not self.inBinarySection: | |
| eol = self.rxBuff.find("\r\n") | |
| if eol < 0: continue | |
| li = self.rxBuff[:eol] | |
| #print eol, "'"+str(li[:100])+"'" | |
| self.rxBuff = self.rxBuff[eol+2:] | |
| if eol == 0 and self.foundBoundary: | |
| self.inBinarySection = True | |
| return | |
| if li == self.boundary: #Boundary mark | |
| self.foundBoundary = 1 | |
| if li == "--"+self.boundary: #Two different styles of boundary marks | |
| self.foundBoundary = 1 | |
| if li[:15].lower()=="content-length:": | |
| self.contentLength = int(li[15:].strip()) | |
| if li[:13].lower()=="content-type:": | |
| self.contentType = li[13:].strip() | |
| if self.inBinarySection and self.contentLength is None: | |
| #If content length is not specified, look for the end of jpeg | |
| jpegEnd = '\xff\xd9' | |
| twoLineBreaks = jpegEnd+'\r\n\r\n'+self.boundary | |
| eol = self.rxBuff.find(twoLineBreaks) | |
| if eol >= 0: | |
| #print ":".join("{0:x}".format(ord(c)) for c in self.rxBuff[eol-10:eol+10]) | |
| binData = self.rxBuff[:eol+2] | |
| self.callback(self.contentType, binData) | |
| self.rxBuff = self.rxBuff[eol+2:] | |
| self.contentLength = None | |
| self.inBinarySection = False | |
| self.contentType = None | |
| self.foundBoundary = 0 | |
| if self.inBinarySection and self.contentLength is not None and len(self.rxBuff) >= self.contentLength: | |
| binData = self.rxBuff[:self.contentLength] | |
| self.callback(self.contentType, binData) | |
| self.rxBuff = self.rxBuff[self.contentLength:] | |
| self.contentLength = None | |
| self.inBinarySection = False | |
| self.contentType = None | |
| self.foundBoundary = 0 | |
| def Get(url, rx, userpass = None): | |
| c = pycurl.Curl() | |
| c.setopt(pycurl.URL, url) | |
| if userpass is not None: c.setopt(pycurl.USERPWD, userpass) | |
| c.setopt(pycurl.WRITEFUNCTION, rx.writeBody) | |
| c.setopt(pycurl.HEADERFUNCTION, rx.writeHeader) | |
| c.perform() | |
| if __name__=="__main__": | |
| rx = DecodeXMixedReplace() | |
| jpegOut = HandleJpegData() | |
| rx.callback = jpegOut.write | |
| url = "http://umevakameran.net.umea.se/mjpg/video.mjpg" | |
| Get(url, rx) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment