Last active
May 5, 2020 04:12
-
-
Save tripulse/a3857aa3cf51025150cb0f9c4fd7bc24 to your computer and use it in GitHub Desktop.
A high-level pythonic wrapper over the libmp3lame C API.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
A high-level wrapper over libmp3lame C API (a high-quality | |
production grade MPEG Layer-III encoder). This excludes some | |
useless and undocumented routines which don't affect encoding | |
at all or not used often. | |
""" | |
from enum import IntEnum | |
from array import array | |
from os import name as _os_name | |
from itertools import chain | |
from ctypes import CDLL, c_void_p, c_ubyte, pointer | |
from io import BytesIO | |
if _os_name == 'nt': | |
_C = CDLL('libmp3lame.dll') | |
if _os_name == 'posix': | |
_C = CDLL('libmp3lame.so') | |
if _os_name == 'darwin': | |
_C = CDLL('libmp3lame.dylib') | |
def _iswritablebinfile(f): | |
if hasattr(f, 'write'): | |
try: | |
f.write(b'') | |
return True | |
except: | |
return False | |
else: | |
return False | |
class ChannelMode(IntEnum): | |
Mono = 3 | |
Stereo = 0 | |
JointStereo = 1 | |
NotSet = 4 | |
class VBRType(IntEnum): | |
Off = 0 | |
Default = 4 | |
ABR = 3 | |
class AlreadyInit(RuntimeError): | |
"""Thrown if attempting to re-initialse encoder parameters | |
after initialisation of them with a encoding method.""" | |
pass | |
class NotInit(RuntimeError): | |
"""Thrown if attempting to encode PCM samples without | |
initialising the encoder without parameters.""" | |
pass | |
class SmallBuffer(RuntimeError): pass | |
class PsychoError(RuntimeError): pass | |
class AllocationFailure(RuntimeError): pass | |
class LameError(RuntimeError): pass | |
class Lame: | |
@property | |
def samplerate(self) -> int: | |
return _C.lame_get_in_samplerate(self._gfp) | |
@property | |
def channels(self) -> int: | |
return _C.lame_get_num_channels(self._gfp) | |
@property | |
def mode(self) -> ChannelMode: | |
return ChannelMode(_C.lame_get_mode(self._gfp)) | |
@property | |
def brate(self) -> int: | |
return _C.lame_get_brate(self._gfp) | |
@property | |
def vbr(self) -> VBRType: | |
return VBRType(_C.lame_get_VBR(self._gfp)) | |
@property | |
def quality(self) -> int: | |
return _C.lame_get_VBR_q(self._gfp) | |
def __init__(self, | |
out, | |
channels: int, | |
samplerate: int, | |
mode: ChannelMode=None, | |
protect=True, | |
effort=2, | |
): | |
"""Initialse the encoder with some immutable parameters. | |
- out a writable filestream to write out encoded data, | |
it must accept bytes which must have a write() method. | |
- channels number of input channels in PCM data. | |
- samplerate samplerate of the input PCM data. | |
- mode the MP3 channel mode to use (default: None). | |
- protect enable CRC16 protection for frames (default: True). | |
- effort quality-to-speed trade-off for the encoder in (0..9] | |
scale. (9=worst&fast, 7=ok, 5=good, 2=high, 0=best&slow). | |
note: actual quality depends on the bitrate. | |
""" | |
self._gfp = _C.lame_init() | |
self._isinit = False | |
if self._gfp == c_void_p(): | |
raise AllocationFailure | |
if not _iswritablebinfile(out): | |
raise ValueError('Unwritable (non-)binary file') | |
self._out = out | |
_C.lame_set_num_channels(self._gfp, channels) | |
_C.lame_set_in_samplerate(self._gfp, samplerate) | |
_C.lame_set_error_protection(self._gfp, protect) | |
if mode: | |
if not isinstance(mode, ChannelMode): | |
raise TypeError("Invalid channel mode enumeration," | |
" must be a variant of ChannelMode") | |
_C.lame_set_mode(self._gfp, mode.value) | |
_C.lame_set_write_id3tag_automatic(self._gfp, 0) | |
_C.lame_set_quality(self._gfp, effort) | |
def init_VBR(self, abr=False, q=3, brate=128): | |
"""Initialise parameters for doing VBR/ABR encoding. | |
- abr use the ABR mode of VBR. | |
- q quality of VBR in range of (0..+9] | |
(default: 2), (0=highest, 9=lowest). | |
- rate kilobitrate of VBR (default: 128). | |
""" | |
if self._isinit: | |
raise AlreadyInit | |
if abr: | |
_C.lame_set_VBR(self._gfp, 3) | |
else: | |
_C.lame_set_VBR(self._gfp, 4) | |
_C.lame_set_VBR_quality(self._gfp, int(q)) | |
_C.lame_set_brate(self._gfp, int(brate)) | |
assert _C.lame_init_params(self._gfp) == 0, \ | |
'Invalid encoder parameters specified' | |
def init_CBR(self, brate=128): | |
"""Initalise parameters for doing CBR encoding with | |
a given kilobitrate (default: 128).""" | |
if self._isinit: | |
raise AlreadyInit | |
_C.lame_set_VBR(self._gfp, 0) | |
_C.lame_set_brate(self._gfp, int(brate)) | |
assert _C.lame_init_params(self._gfp) == 0, \ | |
'Invalid encoder parameters specified' | |
def write(self, samples: array): | |
"""Encode a interleaved floating-point buffer through the | |
LAME encoder to MP3 byte-data. Typecode of samples can be: | |
`d` (double), `f` (float), `l` (long), `h` (short).""" | |
if not isinstance(samples, array): | |
raise TypeError("Invalid source of audio samples," | |
" only array.array is supported.") | |
nch = _C.lame_get_num_channels(self._gfp) | |
nsamples = len(samples) // nch | |
out_bufcap = int(1.25 * len(samples) + 7200) | |
assert len(samples) % nch == 0, 'Uneven number of samples.' | |
# if mono channellayout was set then replicate that | |
# as two channels (L&R) interleaved. | |
if nch == 1: | |
samples = array(samples.typecode, | |
chain(*zip(samples, samples))) | |
out = pointer((c_ubyte * out_bufcap)()) | |
outsiz = getattr(_C, { | |
'd': 'lame_encode_buffer_interleaved_ieee_double', | |
'f': 'lame_encode_buffer_interleaved_ieee_float', | |
'l': 'lame_encode_buffer_interleaved_int', | |
'h': 'lame_encode_buffer_interleaved' | |
}[samples.typecode])(self._gfp, | |
samples.tobytes(), nsamples, out, out_bufcap) | |
if outsiz == -1: raise SmallBuffer | |
elif outsiz == -2: raise AllocationFailure | |
elif outsiz == -3: raise NotInit | |
elif outsiz == -4: raise PsychoError | |
elif outsiz < 0: | |
raise LameError('LAME encoder failed with code: %d' % outsiz) | |
self._out.write(bytes( | |
out.contents[c] for c in range(outsiz))) | |
def flush(self): | |
out = pointer((c_ubyte * 8192)()) | |
outsiz = _C.lame_encode_flush(self._gfp, out, 8192) | |
self._out.write(bytes( | |
out.contents[c] for c in range(outsiz))) | |
def __del__(self): | |
if self._gfp != c_void_p(): | |
_C.lame_close(self._gfp) | |
def encode( | |
samples: array, | |
channels: int, | |
samplerate: int, | |
mode: ChannelMode=None, | |
protect=True, | |
effort=2, | |
brate=128, | |
vbr_type=VBRType.Off, | |
vbr_quality=3, | |
): | |
"""Perform one-shot encoding of MP3 data and output bytes. | |
- samples interleaved PCM samples to encode to MP3, typecode of | |
samples can be: d,f,l,h (else TypeError is thrown). | |
- channels number of input channels in PCM data. | |
- samplerate samplerate of the input PCM data. | |
- mode the MP3 channel mode to use (defualt: None). | |
- protection enable CRC16 protection for frames (default: True). | |
- effort quality-to-speed trade-off for the encoder in (0..9] | |
scale. (9=worst&fast, 7=ok, 5=good, 2=high, 0=best&slow). | |
- brate kilobitrate of VBR/CBR (default: 128). | |
- vbr_type type of VBR to use (options: ABR, Default, Off). | |
- vbr_quality quality of VBR in range of (0..9] (default: 2), | |
(9=worst, 0=best). | |
""" | |
if not isinstance(vbr_type, VBRType): | |
raise TypeError("Invalid VBR type enumeration," | |
" must be a variant of VBRType") | |
out = BytesIO() | |
l = Lame(out, channels, samplerate, mode, protect, effort) | |
if vbr_type == VBRType.Off: | |
l.init_CBR(brate) | |
else: | |
l.init_VBR(vbr_quality == VBRType.ABR, vbr_quality, brate) | |
l.write(samples) | |
l.flush() | |
out.seek(0) | |
return out.read() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment