Skip to content

Instantly share code, notes, and snippets.

@mguaypaq
Last active March 25, 2024 18:34
Show Gist options
  • Save mguaypaq/e0b70d874c9f81423ffd582bd6787438 to your computer and use it in GitHub Desktop.
Save mguaypaq/e0b70d874c9f81423ffd582bd6787438 to your computer and use it in GitHub Desktop.
Terminal color code filtering, as a python codec
"""
CSI escape sequence filter, implemented as a codec.
Should be usable for writing to a file using:
>>> import csi_filter
>>> csi_filter.register()
>>> some_file = open('some_file', 'w', encoding='csi_filter')
"""
import codecs
import re
# a complete CSI escape sequence (cursor movements, color codes, etc.)
csi_escape = re.compile(
r"""
\x1b \[ # the control sequence introducer, ESC [
[0-?]* # optional parameter bytes, between 0x30 and 0x3f
[ -/]* # optional intermediate bytes, between 0x20 and 0x2f
[@-~] # required final byte, between 0x40 and 0x7e
""",
re.VERBOSE,
)
# the last part of the input which *might* start a CSI escape sequence
csi_suffix = re.compile(
r"""
$
| \x1b $
| \x1b \[ [0-?]* [ -/]* $
""",
re.VERBOSE,
)
class CsiFilterCodec(codecs.Codec):
@staticmethod
def encode(input: str, errors="strict") -> tuple[bytes, int]:
"""Discard csi escape sequences and encode the resulting string in utf-8."""
consumed = csi_suffix.search(input).start()
filtered = csi_escape.sub("", input[:consumed])
output, _ = codecs.utf_8_encode(filtered, errors)
return output, consumed
@staticmethod
def decode(input: bytes, errors="strict") -> tuple[str, int]:
"""Decode a utf-8 string and discard csi escape sequences from the result."""
unfiltered, consumed = codecs.utf_8_decode(input, errors, False)
start, end = csi_suffix.search(unfiltered).span()
# since CSI escape sequences are ASCII,
# the number of utf-8 bytes is the same as the string length
consumed -= end - start
output = csi_escape.sub("", unfiltered[:start])
return output, consumed
class CsiFilterIncrementalEncoder(codecs.BufferedIncrementalEncoder):
def _buffer_encode(self, input: str, errors: str, final: bool) -> tuple[bytes, int]:
return CsiFilterCodec.encode(input, errors)
class CsiFilterIncrementalDecoder(codecs.BufferedIncrementalDecoder):
def _buffer_decode(self, input: bytes, errors: str, final: bool) -> tuple[str, int]:
return CsiFilterCodec.decode(input, errors)
class CsiFilterStreamReader(CsiFilterCodec, codecs.StreamReader):
pass
class CsiFilterStreamWriter(CsiFilterCodec, codecs.StreamWriter):
pass
def register():
"""Register the string "csi-filter" as the name of a usable encoding."""
@codecs.register
def lookup(encoding):
if encoding == "csi_filter":
return codecs.CodecInfo(
name="csi-filter",
encode=CsiFilterCodec.encode,
decode=CsiFilterCodec.decode,
incrementalencoder=CsiFilterIncrementalEncoder,
incrementaldecoder=CsiFilterIncrementalDecoder,
streamreader=CsiFilterStreamReader,
streamwriter=CsiFilterStreamWriter,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment