Last active
December 28, 2015 09:19
-
-
Save papaver/7478629 to your computer and use it in GitHub Desktop.
MPEG-1 / MPEG-2 Audio Layer III (.mp3) parser
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#------------------------------------------------------------------------------ | |
# mp3.py - MPEG-1 / MPEG-2 Audio Layer III (.mp3) parser | |
#------------------------------------------------------------------------------ | |
import collections | |
import itertools | |
import os.path | |
import struct | |
#------------------------------------------------------------------------------ | |
# defines | |
#------------------------------------------------------------------------------ | |
# mpeg audio versions | |
kMpeg1 = 0b11 # ISO/IEC 11172-3 | |
kMpeg2 = 0b10 # ISO/IEC 13818-3 | |
kMpeg25 = 0b00 # unofficial extension of mpeg2 | |
kMpegReserved = 0b01 | |
# audio layers | |
kLayerI = 0b11 | |
kLayerII = 0b10 | |
kLayerIII = 0b01 | |
kLayerReserved = 0b00 | |
# bitrate definitions | |
kBitrateFree = 0b0000 | |
kBitrateReserved = 0b1111 | |
# channel modes | |
kChannelModeStereo = 0b00 | |
kChannelModeJointStereo = 0b01 # stereo | |
kChannelModeDual = 0b10 # two mono channels | |
kChannelModeMono = 0b11 | |
# emphasis | |
kEmphasisNone = 0b00 | |
kEmphasis5015Ms = 0b01 | |
kEmphasisCCITJ17 = 0b11 | |
kEmphasisReserved = 0b10 | |
#------------------------------------------------------------------------------ | |
# data tables | |
#------------------------------------------------------------------------------ | |
# bitrate table, values in kilobits | |
sBitrateLookupTable = { | |
kMpeg1 : { | |
kLayerI : [32, 64, 96, 128, 160, 192, 224, 256, 288, 320, 352, 384, 416, 448], | |
kLayerII : [32, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256, 320, 384], | |
kLayerIII : [32, 40, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256, 320] | |
}, | |
kMpeg2 : { | |
kLayerI : [32, 48, 56, 64, 80, 96, 112, 128, 144, 160, 176, 192, 224, 256], | |
kLayerII : [ 8, 16, 24, 32, 40, 48, 56, 64, 80, 96, 112, 128, 144, 160] | |
} | |
} | |
# sampling rate table, values in hertz | |
sSamplingRateLookupTable = { | |
kMpeg1 : [44100, 48000, 32000], | |
kMpeg2 : [22050, 24000, 16000], | |
kMpeg25 : [11025, 12000, 8000] | |
} | |
# samples per frame | |
sSamplesPerFrameLookupTable = { | |
kMpeg1 : { kLayerI : 384, kLayerII : 1152, kLayerIII : 1152 }, | |
kMpeg2 : { kLayerI : 384, kLayerII : 1152, kLayerIII : 576 }, | |
kMpeg25 : { kLayerI : 384, kLayerII : 1152, kLayerIII : 576 } | |
} | |
# side information size, values in bytes | |
sSideInfoSizeLookupTable = { | |
kMpeg1 : { kChannelModeStereo : 32, kChannelModeMono : 17 }, | |
kMpeg2 : { kChannelModeStereo : 17, kChannelModeMono : 9 }, | |
} | |
#------------------------------------------------------------------------------ | |
# structs | |
#------------------------------------------------------------------------------ | |
ID3v2Version = collections.namedtuple('ID3v2Version', ['major', 'revision']) | |
ID3v2Frame = collections.namedtuple('ID3v2Frame', ['tag', 'size', 'flags', 'content']) | |
#------------------------------------------------------------------------------ | |
# helper methods | |
#------------------------------------------------------------------------------ | |
def _versionToStr(version): | |
if version == kMpeg1: return 'Mpeg1' | |
elif version == kMpeg2: return 'Mpeg2' | |
elif version == kMpeg25: return 'Mpeg2.5' | |
else: raise ValueError("Invalid audio version.") | |
#------------------------------------------------------------------------------ | |
def _layerToStr(layer): | |
if layer == kLayerI: return 'LayerI' | |
elif layer == kLayerII: return 'LayerII' | |
elif layer == kLayerIII: return 'LayerIII' | |
else: raise ValueError("Invalid audio layer.") | |
#------------------------------------------------------------------------------ | |
def _bytesToStr(bytes_, wide=16): | |
chunk = lambda input_, size: map(None, *([iter(input_)] * size)) | |
bytes_ += '\x00' * (wide - (len(bytes_) % wide)) | |
return '\n'.join( \ | |
map(lambda l: ' '.join( \ | |
map(lambda s: ''.join( \ | |
[x.encode('hex') for x in s]), chunk(l, 2))), chunk(bytes_, wide))) | |
#------------------------------------------------------------------------------ | |
# Mp3FrameHeader | |
#------------------------------------------------------------------------------ | |
class Mp3FrameHeader(object): | |
"""Frame header consists of information about frame (bitrate, stereo, | |
mode, etc) and because of that frames are independent items. Each of them | |
can have its own characteristic. It is used eg. in Variable Bitate files, | |
where each frame can have different bitrate. | |
""" | |
#-------------------------------------------------------------------------- | |
# defines | |
#-------------------------------------------------------------------------- | |
kSyncWord = 0xffe00000 | |
# header bit masks | |
kSyncWordMask = 0xffe00000 | |
kAudioVersionMask = 0x00180000 | |
kLayerMask = 0x00060000 | |
kProtectionMask = 0x00010000 | |
kBitrateMask = 0x0000f000 | |
kSamplingRateMask = 0x00000c00 | |
kPaddingMask = 0x00000200 | |
kPrivateMask = 0x00000100 | |
kChannelModeMask = 0x000000c0 | |
kModeExtensionMask = 0x00000030 | |
kCopyrightMask = 0x00000008 | |
kOriginalMask = 0x00000004 | |
kEmphasisMask = 0x00000003 | |
kHeaderLength = 4 | |
#-------------------------------------------------------------------------- | |
# statics | |
#-------------------------------------------------------------------------- | |
@classmethod | |
def hasSyncWord(cls, bytes_): | |
"""Check if the bytes contains the sync word identifier. | |
""" | |
# convert into an integer if string buffer passed in | |
if isinstance(bytes_, str): | |
return cls.hasSyncWord(struct.unpack('>I', bytes_)[0]) | |
elif isinstance(bytes_, int): | |
return (bytes_ & cls.kSyncWordMask) == cls.kSyncWord | |
else: | |
raise ValueError("Invalid bytes type '%s', should be str[4] or int.") | |
#-------------------------------------------------------------------------- | |
# object | |
#-------------------------------------------------------------------------- | |
def __init__(self, bytes_): | |
"""Bytes can either be a 4 char string or an integer. | |
""" | |
super(Mp3FrameHeader, self).__init__() | |
# validate the header | |
if not Mp3FrameHeader.hasSyncWord(bytes_): | |
raise ValueError("Frame header '%s' is invalid." % bytes_) | |
# store as int, reference bits when header info requested | |
self._header = \ | |
bytes_ if isinstance(bytes_, int) else struct.unpack('>I', bytes_)[0] | |
#-------------------------------------------------------------------------- | |
def __repr__(self): | |
"""Identify frame by audio version, layer, bitrate, sampling rate. | |
""" | |
bitrate = self.bitrate | |
return "MpegFrameHeader(%(version)s, %(layer)s, %(bitrate)dkbps @ %(samplingrate)dhz)" % { | |
'version' : _versionToStr(self.version), | |
'layer' : _layerToStr(self.layer), | |
'bitrate' : '?' if bitrate == None else (bitrate / 1000), | |
'samplingrate' : self.samplingRate | |
} | |
#-------------------------------------------------------------------------- | |
# properties | |
#-------------------------------------------------------------------------- | |
@property | |
def version(self): | |
"""MPEG audio version, either MPEG-1, MPEG-2, or MPEG-2.5. An exception | |
will be raised if audio version is set to reserved. | |
""" | |
# validate value is not set to reserved | |
value = self._getValue(Mp3FrameHeader.kAudioVersionMask) | |
if value == kMpegReserved: | |
raise ValueError("Mpeg audio version is set to reserved value.") | |
return value | |
#-------------------------------------------------------------------------- | |
@property | |
def layer(self): | |
"""MPEG audio layer, either Layer I, Layer II, or Layer III. An | |
exception will be raised if layer is set to reserved. | |
""" | |
# validate value is not set to reserved | |
value = self._getValue(Mp3FrameHeader.kLayerMask) | |
if value == kLayerReserved: | |
raise ValueError("Mpeg layer is set to reserved value.") | |
return value | |
#-------------------------------------------------------------------------- | |
@property | |
def protection(self): | |
"""Protection bit signifing if a 16 bit CRC is present right after the | |
header. Returns true if CRC is present. | |
""" | |
return not self._bitSet(Mp3FrameHeader.kProtectionMask) | |
#-------------------------------------------------------------------------- | |
@property | |
def bitrate(self): | |
"""The bitrates are always displayed in kilobits per second. The | |
bitrate index 1111 is reserved and should never be used. In the MPEG | |
audio standard there is a free format described. This free format means | |
that the file is encoded with a constant bitrate, which is not one of | |
the predefined bitrates. None is returned if bitrate is free, an | |
exception will be thrown if the bitrate is set to reserved, else the | |
rate will be returned in bits per second. | |
""" | |
# validate value is not set to reserved | |
value = self._getValue(Mp3FrameHeader.kBitrateMask) | |
if value == kBitrateReserved: | |
raise ValueError("Mpeg bitrate is set to reserved value.") | |
# check if bitrate is set to free | |
if value == kBitrateFree: | |
return None | |
# grab audio version, mpeg2.5 shares mpeg2 bitrates | |
version = self.version | |
if version == kMpeg25: | |
version = kMpeg2 | |
# grab audio layer, layerII and layerIII share bitrates in mpeg2|2.5 | |
layer = self.layer | |
if (version == kMpeg2) and (layer == kLayerIII): | |
layer = kLayerII | |
# lookup bitrate in table, index off by one because of free setting | |
return sBitrateLookupTable[version][layer][value - 1] * 1000 | |
#-------------------------------------------------------------------------- | |
@property | |
def samplingRate(self): | |
"""The sampling rate specifies how many samples per second are | |
recorded. Each MPEG version can handle different sampling rates. | |
""" | |
# validate value is not set to reserved | |
value = self._getValue(Mp3FrameHeader.kSamplingRateMask) | |
if value == kBitrateReserved: | |
raise ValueError("Mpeg sampling rate is set to reserved value.") | |
# grab audio version | |
version = self.version | |
# lookup sampling rate in table | |
return sSamplingRateLookupTable[version][value] | |
#-------------------------------------------------------------------------- | |
@property | |
def padding(self): | |
"""If it is set, data is padded with with one slot. | |
""" | |
return self._bitSet(Mp3FrameHeader.kPaddingMask) | |
#-------------------------------------------------------------------------- | |
@property | |
def private(self): | |
"""Private bit (only informative). | |
""" | |
return self._bitSet(Mp3FrameHeader.kPrivateMask) | |
#-------------------------------------------------------------------------- | |
@property | |
def channelMode(self): | |
"""Channel mode. Dual channel files are made of two independent mono | |
channels. Each one uses exactly half the bitrate of the file. Most | |
decoders output them as stereo, but it might not always be the case. | |
""" | |
return self._getValue(Mp3FrameHeader.kChannelModeMask) | |
#-------------------------------------------------------------------------- | |
@property | |
def modeExtension(self): | |
"""Mode extension (only used in joint stereo). | |
""" | |
raise NotImplementedError("Property 'modeExtension' not implemented.") | |
#-------------------------------------------------------------------------- | |
@property | |
def copyright(self): | |
"""Copyright on media (only informative). Returns true if media is | |
copyrighted. | |
""" | |
return self._bitSet(Mp3FrameHeader.kCopyrightMask) | |
#-------------------------------------------------------------------------- | |
@property | |
def original(self): | |
"""Original or copy of media (only informative). Returns true if | |
original media. | |
""" | |
return self._bitSet(Mp3FrameHeader.kOriginalMask) | |
#-------------------------------------------------------------------------- | |
@property | |
def emphasis(self): | |
"""The emphasis indication is here to tell the decoder that the file | |
must be de-emphasized, that means the decoder must 're-equalize' the | |
sound after a Dolby-like noise suppression. It is rarely used. An | |
exception will be raised if emphasis set to reserved. | |
""" | |
# validate value is not set to reserved | |
value = self._getValue(Mp3FrameHeader.kEmphasisMask) | |
if value == kEmphasisReserved: | |
raise ValueError("Mpeg emphasis is set to reserved value.") | |
return value | |
#-------------------------------------------------------------------------- | |
# methods | |
#-------------------------------------------------------------------------- | |
def slotSize(self): | |
"""Calculate the slot size based on the audio layer, value in bytes. | |
""" | |
return 4 if self.layer == kLayerI else 1 | |
#-------------------------------------------------------------------------- | |
def samplesPerFrame(self): | |
"""Sample count in a frame is dependent on audio version and layer. | |
""" | |
version = self.version | |
layer = self.layer | |
samples = sSamplesPerFrameLookupTable[version][layer] | |
return samples | |
#-------------------------------------------------------------------------- | |
def frameSize(self): | |
"""Calculate the frame size (bytes) based on bit and sampling rate. | |
""" | |
bitrate = self.bitrate | |
samplingRate = self.samplingRate | |
samples = self.samplesPerFrame() | |
slotSize = self.slotSize() | |
padding = 1 if self.padding else 0 | |
# calculate the size based on samples per frame in terms of slots | |
slots = ((samples * bitrate) / (slotSize * 8.0) / samplingRate) + padding | |
# convert into bytes | |
frameSize = int(slots) * slotSize | |
return frameSize | |
#-------------------------------------------------------------------------- | |
def duration(self): | |
"""Calculate duration of the frame in milliseconds. | |
""" | |
samples = self.samplesPerFrame() | |
samplingRate = self.samplingRate | |
return samples / (samplingRate / 1000.0) | |
#-------------------------------------------------------------------------- | |
def sideInformationSize(self): | |
"""Size of the side information, general decoding instructions for the | |
frame, present in LayerIII frames. | |
""" | |
# mpeg2 and mpeg2.5 are the same | |
version = self.version | |
if version == kMpeg25: | |
version = kMpeg2 | |
# only really care if there are single or dual channels | |
channelMode = self.channelMode | |
if channelMode != kChannelModeMono: | |
channelMode = kChannelModeStereo | |
return sSideInfoSizeLookupTable[version][channelMode] | |
#-------------------------------------------------------------------------- | |
# internal methods | |
#-------------------------------------------------------------------------- | |
def _getValue(self, mask): | |
"""Extract the value from the header selected by the mask. | |
""" | |
# grab the masked value from the header | |
value = self._header & mask | |
# shift the value to correct its power | |
while not (mask & 0b1): | |
mask >>= 1 | |
value >>= 1 | |
return value | |
#-------------------------------------------------------------------------- | |
def _bitSet(self, mask): | |
"""Returns true if bit in mask is set, else false. | |
""" | |
return (self._header & mask) == mask | |
#------------------------------------------------------------------------------ | |
# ID3v1Tag | |
#------------------------------------------------------------------------------ | |
class ID3v1Tag(object): | |
"""The tag is used to describe the MPEG Audio file. It contains information | |
about artist, title, album, publishing year and genre. There is some extra | |
space for comments. It is exactly 128 bytes long and is located at very end | |
of the audio data. | |
""" | |
#-------------------------------------------------------------------------- | |
# defines | |
#-------------------------------------------------------------------------- | |
kTagId = 'TAG' | |
kTagLength = 128 | |
#-------------------------------------------------------------------------- | |
# statics | |
#-------------------------------------------------------------------------- | |
@classmethod | |
def hasTagId(cls, buffer_): | |
"""Check if the buffer starts with the proper tag identifier. | |
""" | |
return buffer_[0:3] == cls.kTagId | |
#-------------------------------------------------------------------------- | |
# object | |
#-------------------------------------------------------------------------- | |
def __init__(self, buffer_): | |
super(ID3v1Tag, self).__init__() | |
# verify the buffer has the correct tag | |
if not ID3v1Tag.hasTagId(buffer_): | |
raise ValueError("Invalid tag id '%s' in buffer, expecting '%s'." % \ | |
(buffer_[0:3], ID3v1Tag.kTagId)) | |
# verify the length is correct | |
length = len(buffer_) | |
if length != ID3v1Tag.kTagLength: | |
raise ValueError("Invalid tag buffer size %d, expecting %d." % \ | |
(length, ID3v1Tag.kTagLength)) | |
# looks like valid tag | |
self._buffer = buffer_ | |
#-------------------------------------------------------------------------- | |
# properties | |
#-------------------------------------------------------------------------- | |
@property | |
def identifier(self): | |
return self._buffer[0:3] | |
#-------------------------------------------------------------------------- | |
@property | |
def title(self): | |
return self._buffer[3:33] | |
#-------------------------------------------------------------------------- | |
@property | |
def artist(self): | |
return self._buffer[33:63] | |
#-------------------------------------------------------------------------- | |
@property | |
def album(self): | |
return self._buffer[63:93] | |
#-------------------------------------------------------------------------- | |
@property | |
def year(self): | |
return self._buffer[93:97] | |
#-------------------------------------------------------------------------- | |
@property | |
def comment(self): | |
return self._buffer[97:127] | |
#-------------------------------------------------------------------------- | |
@property | |
def genre(self): | |
return struct.unpack('>B', self._buffer[127])[0] | |
#------------------------------------------------------------------------------ | |
# ID3v2Tag | |
#------------------------------------------------------------------------------ | |
class ID3v2Tag(object): | |
"""The ID3v2 offers a flexible way of storing information about an audio | |
file within itself to determine its origin and contents. The information | |
may be technical information, such as equalisation curves, as well as | |
related meta information, such as title, performer, copyright etc. | |
""" | |
#-------------------------------------------------------------------------- | |
# defines | |
#-------------------------------------------------------------------------- | |
kTagId = 'ID3' | |
kTagHeaderLength = 10 | |
# options present in flags field | |
kFlagUnsynchronisation = 0x70 | |
kFlagExtendedHeader = 0x40 | |
kFlagExperimentalIndicator = 0x20 | |
#-------------------------------------------------------------------------- | |
# statics | |
#-------------------------------------------------------------------------- | |
@classmethod | |
def hasTagId(cls, buffer_): | |
"""Check if the buffer starts with the proper tag identifier. | |
""" | |
return buffer_[0:3] == cls.kTagId | |
#-------------------------------------------------------------------------- | |
@classmethod | |
def parseSize(cls, buffer_): | |
"""Retrieve the size of the entire tag from the buffer. The size is | |
encoded with four bytes where the most significant bit (bit 7) is set | |
to zero in every byte, making a total of 28 bits. Size is the size of | |
the complete tag after unsychronisation, including padding, excluding | |
the header but not excluding the extended header. | |
""" | |
bytes_ = zip(struct.unpack('>BBBB', buffer_[6:10]), range(21, -1, -7)) | |
size = reduce(lambda x, (v,s): ((v & 0x7f) << s) | x, bytes_, 0) | |
return cls.kTagHeaderLength + size | |
#-------------------------------------------------------------------------- | |
# object | |
#-------------------------------------------------------------------------- | |
def __init__(self, buffer_): | |
super(ID3v2Tag, self).__init__() | |
# verify the buffer has the correct tag | |
if not ID3v2Tag.hasTagId(buffer_): | |
raise ValueError("Invalid tag id '%s' in buffer, expecting '%s'." % \ | |
(buffer_[0:3], ID3v2Tag.kTagId)) | |
# verify the length is correct | |
tagLength = len(buffer_) | |
bufferLength = ID3v2Tag.parseSize(buffer_) | |
if bufferLength != tagLength: | |
raise ValueError("Invalid tag buffer size %d, expecting %d." % \ | |
(bufferLength, tagLength)) | |
# parse tag frames | |
self._frames = self._parseTagFrames(buffer_[ID3v2Tag.kTagHeaderLength:]) | |
# looks like valid tag | |
self._buffer = buffer_ | |
#-------------------------------------------------------------------------- | |
def _parseTagFrames(self, buffer_): | |
"""Parse all of the tag frames from the buffer. | |
""" | |
frames = [] | |
# grouper function to help slice up the header, pretty much returns | |
# the same value x times in a row as defined by the list z | |
def sliceby(z): | |
for i in itertools.chain(*map(lambda (x,y): [y] * x, zip(z, range(len(z))))): | |
yield i | |
# quit parsing if there isn't enough buffer to read in a tag header | |
index = 0 | |
while index < len(buffer_) - ID3v2Tag.kTagHeaderLength: | |
# grab the header info | |
start = index | |
end = index + ID3v2Tag.kTagHeaderLength | |
header = buffer_[start:end] | |
# split into respective segments | |
slicer = lambda _, i=sliceby([4,4,2]): next(i) | |
tag, size, flags = [''.join(g) for _, g in itertools.groupby(header, slicer)] | |
# bail out if tag is invalid | |
if tag == '\x00' * 4: | |
break | |
# convert into integer data | |
size, = struct.unpack('>I', size) | |
flags, = struct.unpack('>H', flags) | |
# save frame | |
start = end | |
end = end + size | |
frame = ID3v2Frame(tag, size, flags, buffer_[start:end]) | |
frames.append(frame) | |
# update index | |
index = end | |
return frames | |
#-------------------------------------------------------------------------- | |
# properties | |
#-------------------------------------------------------------------------- | |
@property | |
def identifier(self): | |
return self._buffer[0:3] | |
#-------------------------------------------------------------------------- | |
@property | |
def version(self): | |
return ID3v2Version(*struct.unpack('>BB', self._buffer[3:5])) | |
#-------------------------------------------------------------------------- | |
@property | |
def flags(self): | |
return struct.unpack('>B', self._buffer[5])[0] | |
#-------------------------------------------------------------------------- | |
@property | |
def size(self): | |
return ID3v2Tag.parseSize(self._buffer) | |
#-------------------------------------------------------------------------- | |
@property | |
def frames(self): | |
return self._frames | |
#------------------------------------------------------------------------------ | |
# XingHeader | |
#------------------------------------------------------------------------------ | |
class XingHeader(object): | |
"""This header is often (but unfortunately not always) added to files which | |
are encoded with variable bitrate mode. | |
""" | |
#-------------------------------------------------------------------------- | |
# defines | |
#-------------------------------------------------------------------------- | |
kHeaderIdXing = 'Xing' | |
kHeaderIdInfo = 'Info' | |
# options present in flags field | |
kFlagFrames = 0x0001 | |
kFlagBytes = 0x0002 | |
kFlagTableOfContents = 0x0004 | |
kFlagQualityIndicator = 0x0008 | |
# size of fields if present | |
sFieldSizes = { | |
kFlagFrames : 4, | |
kFlagBytes : 4, | |
kFlagTableOfContents : 100, | |
kFlagQualityIndicator : 4 | |
} | |
# size of headers with mandatory fields | |
kHeaderMinLength = 8 | |
#-------------------------------------------------------------------------- | |
# statics | |
#-------------------------------------------------------------------------- | |
@classmethod | |
def hasHeaderId(cls, buffer_): | |
"""Check if the buffer starts with the proper identifier. | |
""" | |
return buffer_[0:4] in [cls.kHeaderIdXing, cls.kHeaderIdInfo] | |
#-------------------------------------------------------------------------- | |
@classmethod | |
def calculateOffset(cls, buffer_, flag=None): | |
"""Calculate the offset of the flags field by checking which fields | |
are present in the header. If flag is not provided size of header | |
will be returned. | |
""" | |
# set default flag if not provided | |
if flag == None: | |
flag = 0xffff | |
# grab the flags field from the buffer | |
flags, = struct.unpack('>I', buffer_[4:8]) | |
# add up the offsets of the present fields | |
hasField = lambda f: (flags & f) != 0 | |
fieldFlags = filter(lambda f: hasField(f) and (f < flag), cls.sFieldSizes) | |
offset = 8 + sum(map(lambda f: cls.sFieldSizes[f], fieldFlags)) | |
return offset | |
#-------------------------------------------------------------------------- | |
# object | |
#-------------------------------------------------------------------------- | |
def __init__(self, buffer_): | |
super(XingHeader, self).__init__() | |
# verify the buffer has the correct header id | |
if not XingHeader.hasHeaderId(buffer_): | |
ids = [XingHeader.kHeaderIdXing, XingHeader.kHeaderIdInfo] | |
raise ValueError("Invalid id '%s' in buffer, expecting '%s'." % \ | |
(buffer_[0:4], ' or '.join(ids))) | |
# verify the length is correct | |
headerLength = XingHeader.calculateOffset(buffer_) | |
bufferLength = len(buffer_) | |
if bufferLength != headerLength: | |
raise ValueError("Invalid header buffer size %d, expecting %d." % \ | |
(bufferLength, headerLength)) | |
# looks like valid header | |
self._buffer = buffer_ | |
#-------------------------------------------------------------------------- | |
# properties | |
#-------------------------------------------------------------------------- | |
@property | |
def identifier(self): | |
return self._buffer[0:4] | |
#-------------------------------------------------------------------------- | |
@property | |
def flags(self): | |
return struct.unpack('>I', self._buffer[4:8])[0] | |
#-------------------------------------------------------------------------- | |
@property | |
def frames(self): | |
if self.hasFramesField(): | |
return struct.unpack('>I', self._buffer[8:12])[0] | |
return None | |
#-------------------------------------------------------------------------- | |
@property | |
def bytes(self): | |
if self.hasBytesField(): | |
field = self._getField(XingHeader.kFlagBytes) | |
return struct.unpack('>I', field)[0] | |
return None | |
#-------------------------------------------------------------------------- | |
@property | |
def tableOfContents(self): | |
if self.hasTableOfContentsField(): | |
field = self._getField(XingHeader.kFlagTableOfContents) | |
return struct.unpack('>' + 'B' * len(field), field) | |
return None | |
#-------------------------------------------------------------------------- | |
@property | |
def qualityIndicator(self): | |
if self.hasQualityIndicatorField(): | |
field = self._getField(XingHeader.kFlagQualityIndicator) | |
return struct.unpack('>I', field)[0] | |
return None | |
#-------------------------------------------------------------------------- | |
# methods | |
#-------------------------------------------------------------------------- | |
def hasFramesField(self): | |
return self._isFlagSet(XingHeader.kFlagFrames) | |
#-------------------------------------------------------------------------- | |
def hasBytesField(self): | |
return self._isFlagSet(XingHeader.kFlagBytes) | |
#-------------------------------------------------------------------------- | |
def hasTableOfContentsField(self): | |
return self._isFlagSet(XingHeader.kFlagTableOfContents) | |
#-------------------------------------------------------------------------- | |
def hasQualityIndicatorField(self): | |
return self._isFlagSet(XingHeader.kFlagQualityIndicator) | |
#-------------------------------------------------------------------------- | |
# internal methods | |
#-------------------------------------------------------------------------- | |
def _isFlagSet(self, flag): | |
return (self.flags & flag) != 0 | |
#-------------------------------------------------------------------------- | |
def _getField(self, flag): | |
size = XingHeader.sFieldSizes[flag] | |
offset = XingHeader.calculateOffset(self._buffer, flag) | |
return self._buffer[offset:offset+size] | |
#------------------------------------------------------------------------------ | |
# Mp3Frame | |
#------------------------------------------------------------------------------ | |
class Mp3Frame(object): | |
"""A lightweight object containing a header, reference to a buffer and a | |
start position. | |
""" | |
#-------------------------------------------------------------------------- | |
# object | |
#-------------------------------------------------------------------------- | |
def __init__(self, header, buffer_, start): | |
super(Mp3Frame, self).__init__() | |
self._header = header | |
self._buffer = buffer_ | |
self._start = start | |
self._xing = None | |
#-------------------------------------------------------------------------- | |
# properties | |
#-------------------------------------------------------------------------- | |
@property | |
def header(self): | |
return self._header | |
#-------------------------------------------------------------------------- | |
@property | |
def content(self): | |
start = self._start | |
end = start + self._header.frameSize() | |
return self._buffer[start+Mp3FrameHeader.kHeaderLength:end] | |
#-------------------------------------------------------------------------- | |
# methods | |
#-------------------------------------------------------------------------- | |
def getXingHeader(self): | |
"""Checks if the frame contains a xing header instead of actual audio | |
data. The header is returned if found else None. | |
""" | |
# check if already retrieved | |
if self._xing == None: | |
# check for the xing header identifier | |
start = self._start + 4 + self._header.sideInformationSize() | |
end = start + XingHeader.kHeaderMinLength | |
header = self._buffer[start:end] | |
if XingHeader.hasHeaderId(header): | |
# calculate the xing header size and extract from the buffer | |
size = XingHeader.calculateOffset(header) | |
end = start + size | |
self._xing = XingHeader(self._buffer[start:end]) | |
return self._xing | |
#------------------------------------------------------------------------------ | |
# Mp3Buffer | |
#------------------------------------------------------------------------------ | |
class Mp3Buffer(object): | |
"""Abstracts parsing data out of a buffer. Methods available to examine | |
the buffer for mp3 specific properties such as sycn words, tags, and | |
headers. | |
""" | |
#-------------------------------------------------------------------------- | |
# object | |
#-------------------------------------------------------------------------- | |
def __init__(self, buffer_): | |
super(Mp3Buffer, self).__init__() | |
# set fields to default values | |
self._buffer = buffer_ | |
self._start = 0 | |
self._end = len(self._buffer) | |
self._index = 0 | |
#-------------------------------------------------------------------------- | |
# methods | |
#-------------------------------------------------------------------------- | |
def isEOF(self): | |
"""Returns true if the current index is past the end of the buffer. | |
""" | |
return self._index >= self._end | |
#-------------------------------------------------------------------------- | |
def reset(self): | |
"""Reset the index to the start of the buffer. | |
""" | |
self._index = self._start | |
#-------------------------------------------------------------------------- | |
def availableBytes(self): | |
"""Returns the number of bytes from the current index to the end of the | |
buffer. | |
""" | |
return 0 if self.isEOF() else (self._end - self._index) | |
#-------------------------------------------------------------------------- | |
def hasID3v1Tag(self): | |
"""Checks the end of the buffer for the presence of a ID3v1 tag. | |
""" | |
# calculate the start index of the id3v1 tag | |
start = self._end - ID3v1Tag.kTagLength | |
# check for a valid tag id | |
return ID3v1Tag.hasTagId(self._buffer[start:]) | |
#-------------------------------------------------------------------------- | |
def hasID3v2Tag(self): | |
"""Checks the start of the buffer for the presence of a ID3v2 tag. | |
""" | |
# check for a valid tag id | |
return ID3v2Tag.hasTagId(self._buffer) | |
#-------------------------------------------------------------------------- | |
def popID3v1Tag(self): | |
"""Slices the ID3v1 tag out of the buffer and returns the tag object, | |
the size of the buffer will be updated. | |
""" | |
# calculate the start index of the id3v1 tag | |
start = self._end - ID3v1Tag.kTagLength | |
# create the tag object from the buffer | |
id3v1 = ID3v1Tag(self._buffer[start:]) | |
# update the buffer to reflect the splicing | |
self._end = start | |
return id3v1 | |
#-------------------------------------------------------------------------- | |
def popID3v2Tag(self): | |
"""Slices the ID3v2 tag out of the buffer and returns the tag object, | |
the size of the buffer will be updated, index will be repositioned to | |
the start of the buffer if invalidated. | |
""" | |
# calculate the size of the id3v2 tag buffer | |
size = ID3v2Tag.parseSize(self._buffer) | |
# create the tag object from the buffer | |
id3v2 = ID3v2Tag(self._buffer[:size]) | |
# update the buffer/index to reflect the splicing | |
self._start = size | |
self._index = max(self._index, self._start) | |
return id3v2 | |
#-------------------------------------------------------------------------- | |
def syncToMp3Frame(self): | |
"""Walk the index forward until a frame sync word marker is found. If | |
the index lies on a sync marker, index will not be updated. True will | |
will be returned if a sync word is encountered, otherwise false will | |
be returned when the available buffer space runs to low. | |
""" | |
# make sure there enough room for the frame header | |
while self.availableBytes() >= Mp3FrameHeader.kHeaderLength: | |
# check the current index for the sycn word | |
start = self._index | |
end = start + Mp3FrameHeader.kHeaderLength | |
if Mp3FrameHeader.hasSyncWord(self._buffer[start:end]): | |
return True | |
# step to the next byte | |
print "-> Skipping index %d: %s" % (start, self._buffer[start]) | |
self._index += 1 | |
# ran out of buffer space to accomodate a frame header | |
return False | |
#-------------------------------------------------------------------------- | |
def extractMp3Frame(self): | |
"""Create an mp3 frame object using the header information available at | |
the current index. The index will be forwarded to the end of the frame | |
and the frame object will be returned. | |
""" | |
# create the mp3 frame header | |
start = self._index | |
end = start + Mp3FrameHeader.kHeaderLength | |
frameHeader = Mp3FrameHeader(self._buffer[start:end]) | |
# create the mp3 frame | |
frame = Mp3Frame(frameHeader, self._buffer, self._index) | |
# forward index to end of frame | |
self._index = start + frameHeader.frameSize() | |
return frame | |
#-------------------------------------------------------------------------- | |
def getLeftoverBuffer(self): | |
"""Returns the content from the current index position to the end of | |
the buffer. | |
""" | |
# make sure there is content to return | |
if self.isEOF(): | |
return None | |
return self._buffer[self._index:] | |
#------------------------------------------------------------------------------ | |
# Mp3File | |
#------------------------------------------------------------------------------ | |
class Mp3File(object): | |
"""MPEG-1 / MPEG-2 Audio Layer III (.mp3) file parser. Currently only | |
reading and writing frames is supported. | |
""" | |
#-------------------------------------------------------------------------- | |
# object | |
#-------------------------------------------------------------------------- | |
def __init__(self, mp3Path): | |
"""Attempt to parse the frames and headers/tags found in the mp3 file. | |
""" | |
super(Mp3File, self).__init__() | |
# make sure the path is valid | |
if not os.path.exists(mp3Path): | |
raise ValueError("MPEG .mp3 file not found at '%s'." % mp3Path) | |
# open up the file and load the entire file into memory, optimize to | |
# support streaming once funcionality is added... | |
with open(mp3Path, 'rb') as mp3File: | |
self._buffer = Mp3Buffer(mp3File.read()) | |
# parse out the id3v1 tag if provided | |
self._id3v1tag = None | |
if self._buffer.hasID3v1Tag(): | |
self._id3v1tag = self._buffer.popID3v1Tag() | |
# parse out the id3v2 tag if provided | |
self._id3v2tag = None | |
if self._buffer.hasID3v2Tag(): | |
self._id3v2tag = self._buffer.popID3v2Tag() | |
# forward to the first frame | |
if not self._buffer.syncToMp3Frame(): | |
raise ValueError("First mp3 frame could not be located.") | |
# fields to track mp3 frames encountered | |
self._headerFrame = None | |
self._frames = [] | |
# first frame could be special 'header' frame | |
firstFrame = self._buffer.extractMp3Frame() | |
xingHeader = firstFrame.getXingHeader() | |
if xingHeader != None: | |
self._headerFrame = firstFrame | |
else: | |
self._frames.append(firstFrame) | |
# take advantage of frame count if possible | |
framesParsed = lambda: False | |
if xingHeader != None: | |
framesParsed = lambda: len(self._frames) == xingHeader.frames | |
# parse the rest of the frames from the buffer | |
while not self._buffer.isEOF() and not framesParsed(): | |
# bail if another sync marker not found | |
if not self._buffer.syncToMp3Frame(): | |
break | |
# extract the frame | |
frame = self._buffer.extractMp3Frame() | |
self._frames.append(frame) | |
#-------------------------------------------------------------------------- | |
# methods | |
#-------------------------------------------------------------------------- | |
def totalDuration(self): | |
return sum(map(lambda f: f.header.duration(), self._frames)) | |
#-------------------------------------------------------------------------- | |
def printMp3Info(self): | |
"""Print out all available info found in tags and headers. | |
""" | |
# frame header | |
frame = self._headerFrame | |
if frame == None: | |
frame = self._frames[0] | |
print 'Mp3FrameHeader:' | |
print ' - version : %s' % _versionToStr(frame.header.version) | |
print ' - layer : %s' % _layerToStr(frame.header.layer) | |
print ' - bitrate : %s' % frame.header.bitrate | |
print ' - sampling rate : %s' % frame.header.samplingRate | |
print ' - frame size : %s' % frame.header.frameSize() | |
print ' - duration : %s' % frame.header.duration() | |
print '' | |
# id3v1 tag | |
if self._id3v1tag != None: | |
print 'ID3v1 Tag:' | |
print ' - identifier : %s' % self._id3v1tag.identifier | |
print ' - title : %s' % self._id3v1tag.title | |
print ' - artist : %s' % self._id3v1tag.artist | |
print ' - album : %s' % self._id3v1tag.album | |
print ' - year : %s' % self._id3v1tag.year | |
print ' - comment : %s' % self._id3v1tag.comment | |
print ' - genre : %s' % self._id3v1tag.genre | |
print '' | |
# id3v2 tag | |
if self._id3v2tag != None: | |
print 'ID3v2 Tag:' | |
print ' - identifier : %s' % self._id3v2tag.identifier | |
print ' - version : %s.%s' % self._id3v2tag.version | |
print ' - flags : %s' % self._id3v2tag.flags | |
print ' - size : %s' % self._id3v2tag.size | |
print ' - frames :' | |
for tagFrame in self._id3v2tag.frames: | |
print ' - %s' % tagFrame.tag | |
print '' | |
# xing header | |
xingHeader = frame.getXingHeader() | |
if xingHeader != None: | |
print 'Xing Header:' | |
print ' - identifier : %s' % xingHeader.identifier | |
print ' - frames : %s' % xingHeader.frames | |
print ' - bytes : %s' % xingHeader.bytes | |
print ' - toc : %s' % str(xingHeader.tableOfContents) | |
print ' - quiality : %s' % xingHeader.qualityIndicator | |
print '' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment