Skip to content

Instantly share code, notes, and snippets.

@tripulse
Last active May 5, 2020 04:12
Show Gist options
  • Save tripulse/a3857aa3cf51025150cb0f9c4fd7bc24 to your computer and use it in GitHub Desktop.
Save tripulse/a3857aa3cf51025150cb0f9c4fd7bc24 to your computer and use it in GitHub Desktop.
A high-level pythonic wrapper over the libmp3lame C API.
"""
A high-level wrapper over libmp3lame C API (a high-quality
production grade MPEG Layer-III encoder). This excludes some
useless and undocumented routines which don't affect encoding
at all or not used often.
"""
from enum import IntEnum
from array import array
from os import name as _os_name
from itertools import chain
from ctypes import CDLL, c_void_p, c_ubyte, pointer
from io import BytesIO
if _os_name == 'nt':
_C = CDLL('libmp3lame.dll')
if _os_name == 'posix':
_C = CDLL('libmp3lame.so')
if _os_name == 'darwin':
_C = CDLL('libmp3lame.dylib')
def _iswritablebinfile(f):
if hasattr(f, 'write'):
try:
f.write(b'')
return True
except:
return False
else:
return False
class ChannelMode(IntEnum):
Mono = 3
Stereo = 0
JointStereo = 1
NotSet = 4
class VBRType(IntEnum):
Off = 0
Default = 4
ABR = 3
class AlreadyInit(RuntimeError):
"""Thrown if attempting to re-initialse encoder parameters
after initialisation of them with a encoding method."""
pass
class NotInit(RuntimeError):
"""Thrown if attempting to encode PCM samples without
initialising the encoder without parameters."""
pass
class SmallBuffer(RuntimeError): pass
class PsychoError(RuntimeError): pass
class AllocationFailure(RuntimeError): pass
class LameError(RuntimeError): pass
class Lame:
@property
def samplerate(self) -> int:
return _C.lame_get_in_samplerate(self._gfp)
@property
def channels(self) -> int:
return _C.lame_get_num_channels(self._gfp)
@property
def mode(self) -> ChannelMode:
return ChannelMode(_C.lame_get_mode(self._gfp))
@property
def brate(self) -> int:
return _C.lame_get_brate(self._gfp)
@property
def vbr(self) -> VBRType:
return VBRType(_C.lame_get_VBR(self._gfp))
@property
def quality(self) -> int:
return _C.lame_get_VBR_q(self._gfp)
def __init__(self,
out,
channels: int,
samplerate: int,
mode: ChannelMode=None,
protect=True,
effort=2,
):
"""Initialse the encoder with some immutable parameters.
- out a writable filestream to write out encoded data,
it must accept bytes which must have a write() method.
- channels number of input channels in PCM data.
- samplerate samplerate of the input PCM data.
- mode the MP3 channel mode to use (default: None).
- protect enable CRC16 protection for frames (default: True).
- effort quality-to-speed trade-off for the encoder in (0..9]
scale. (9=worst&fast, 7=ok, 5=good, 2=high, 0=best&slow).
note: actual quality depends on the bitrate.
"""
self._gfp = _C.lame_init()
self._isinit = False
if self._gfp == c_void_p():
raise AllocationFailure
if not _iswritablebinfile(out):
raise ValueError('Unwritable (non-)binary file')
self._out = out
_C.lame_set_num_channels(self._gfp, channels)
_C.lame_set_in_samplerate(self._gfp, samplerate)
_C.lame_set_error_protection(self._gfp, protect)
if mode:
if not isinstance(mode, ChannelMode):
raise TypeError("Invalid channel mode enumeration,"
" must be a variant of ChannelMode")
_C.lame_set_mode(self._gfp, mode.value)
_C.lame_set_write_id3tag_automatic(self._gfp, 0)
_C.lame_set_quality(self._gfp, effort)
def init_VBR(self, abr=False, q=3, brate=128):
"""Initialise parameters for doing VBR/ABR encoding.
- abr use the ABR mode of VBR.
- q quality of VBR in range of (0..+9]
(default: 2), (0=highest, 9=lowest).
- rate kilobitrate of VBR (default: 128).
"""
if self._isinit:
raise AlreadyInit
if abr:
_C.lame_set_VBR(self._gfp, 3)
else:
_C.lame_set_VBR(self._gfp, 4)
_C.lame_set_VBR_quality(self._gfp, int(q))
_C.lame_set_brate(self._gfp, int(brate))
assert _C.lame_init_params(self._gfp) == 0, \
'Invalid encoder parameters specified'
def init_CBR(self, brate=128):
"""Initalise parameters for doing CBR encoding with
a given kilobitrate (default: 128)."""
if self._isinit:
raise AlreadyInit
_C.lame_set_VBR(self._gfp, 0)
_C.lame_set_brate(self._gfp, int(brate))
assert _C.lame_init_params(self._gfp) == 0, \
'Invalid encoder parameters specified'
def write(self, samples: array):
"""Encode a interleaved floating-point buffer through the
LAME encoder to MP3 byte-data. Typecode of samples can be:
`d` (double), `f` (float), `l` (long), `h` (short)."""
if not isinstance(samples, array):
raise TypeError("Invalid source of audio samples,"
" only array.array is supported.")
nch = _C.lame_get_num_channels(self._gfp)
nsamples = len(samples) // nch
out_bufcap = int(1.25 * len(samples) + 7200)
assert len(samples) % nch == 0, 'Uneven number of samples.'
# if mono channellayout was set then replicate that
# as two channels (L&R) interleaved.
if nch == 1:
samples = array(samples.typecode,
chain(*zip(samples, samples)))
out = pointer((c_ubyte * out_bufcap)())
outsiz = getattr(_C, {
'd': 'lame_encode_buffer_interleaved_ieee_double',
'f': 'lame_encode_buffer_interleaved_ieee_float',
'l': 'lame_encode_buffer_interleaved_int',
'h': 'lame_encode_buffer_interleaved'
}[samples.typecode])(self._gfp,
samples.tobytes(), nsamples, out, out_bufcap)
if outsiz == -1: raise SmallBuffer
elif outsiz == -2: raise AllocationFailure
elif outsiz == -3: raise NotInit
elif outsiz == -4: raise PsychoError
elif outsiz < 0:
raise LameError('LAME encoder failed with code: %d' % outsiz)
self._out.write(bytes(
out.contents[c] for c in range(outsiz)))
def flush(self):
out = pointer((c_ubyte * 8192)())
outsiz = _C.lame_encode_flush(self._gfp, out, 8192)
self._out.write(bytes(
out.contents[c] for c in range(outsiz)))
def __del__(self):
if self._gfp != c_void_p():
_C.lame_close(self._gfp)
def encode(
samples: array,
channels: int,
samplerate: int,
mode: ChannelMode=None,
protect=True,
effort=2,
brate=128,
vbr_type=VBRType.Off,
vbr_quality=3,
):
"""Perform one-shot encoding of MP3 data and output bytes.
- samples interleaved PCM samples to encode to MP3, typecode of
samples can be: d,f,l,h (else TypeError is thrown).
- channels number of input channels in PCM data.
- samplerate samplerate of the input PCM data.
- mode the MP3 channel mode to use (defualt: None).
- protection enable CRC16 protection for frames (default: True).
- effort quality-to-speed trade-off for the encoder in (0..9]
scale. (9=worst&fast, 7=ok, 5=good, 2=high, 0=best&slow).
- brate kilobitrate of VBR/CBR (default: 128).
- vbr_type type of VBR to use (options: ABR, Default, Off).
- vbr_quality quality of VBR in range of (0..9] (default: 2),
(9=worst, 0=best).
"""
if not isinstance(vbr_type, VBRType):
raise TypeError("Invalid VBR type enumeration,"
" must be a variant of VBRType")
out = BytesIO()
l = Lame(out, channels, samplerate, mode, protect, effort)
if vbr_type == VBRType.Off:
l.init_CBR(brate)
else:
l.init_VBR(vbr_quality == VBRType.ABR, vbr_quality, brate)
l.write(samples)
l.flush()
out.seek(0)
return out.read()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment