Skip to content

Instantly share code, notes, and snippets.

@seansummers
Last active May 19, 2025 16:52
Show Gist options
  • Save seansummers/d0126b1d490ceace6d504187d01407da to your computer and use it in GitHub Desktop.
Save seansummers/d0126b1d490ceace6d504187d01407da to your computer and use it in GitHub Desktop.
AWS Python Snippets

AWS Python Snippets

Collection of Python scripts and functions that might be useful.

# SPDX-License-Identifier: MIT
# /// script
# requires-python = ">=3.7"
# dependencies = [
# "jmespath",
# ]
# ///
__all__ = ["from_awslogs_data"]
from base64 import b64decode
from functools import partial
from json import loads
from zlib import MAX_WBITS, decompress
from jmespath.functions import Functions, signature
from jmespath import Options, compile
class Functions(Functions):
@signature({"types": ["string"]})
def _func_awslogs_data(self_, data: str) -> str:
"""Parse CloudWatch Logs subscription events `awslogs.data` base64-gzipped-json values."""
decoded_data = b64decode(data)
event = decompress(decoded_data, 16 + MAX_WBITS)
return loads(event.decode("utf8"))
options = Options(custom_functions=Functions())
from_awslogs_data = partial(
compile("awslogs_data(awslogs.data)").search,
options=options,
)
if __name__ == "__main__":
sample_events = [
{
"awslogs": {
"data": "H4sIAAAAAAAAAIVRy27bMBD8FYPoMYSWFJ+6qa1tBIlbwFKbQ2oUlEQpBGRJpaS6RZB/z7oPoLdiL8vhYnZ25pmc/Ty7zpc/J08y8j4v86+HbVHk+y25IeNl8BFhxbgwqeVCaUC4H7t9HNcJfxJ3mZPenavGJRXiFN/U925eQj17F+sn2obon8bZUzfRMoau83G3DvUSxoGyWzgc3vLy0/HD3fE3c7FE785IzYGZhLGE2eTxzX1ebovyJHiTShCqraARjXOWs1ZVDUArGu10ihTzWs11DNOVfxf6xceZZI/k/+IC7S7U/3Dnqfe07se1ubgFR95d24dre4/q/mGnD9zsgB0+3u13lpx+qd9+98NyXfhMQoNHpEIA56A5CCxpjbESpJESLbUcuGFKM6YYgAapU2VNiqU5HrIEzGZBOSRjUnCFCfDUGnbzNzOkL8r8WG6O/tuKo7dNtmGmNRWTmvqaA2XMG1q1WlJoK6vrSijg9eYzeoL6s80fW78M5OX08grvTTW4DAIAAA=="
}
},
{
"awslogs": {
"data": "H4sICGKEJGgAA3gAzZJda8IwFIbv/RXlXMvIR5Mm3hXXidBNmR8w1iFOgwSqKW3cJuJ/X1Jddd4L5irhefOe855kH7QCt2Ctqmq+UuNdoaATwGM8jmfPyWgU9xJoHyXme6NKDzGhIeORkAiTP5ibVa8028Lz5Ge+LnKFL9jIlmq+9rA5NLjaflaLUhdWm82Tzq0qKyd8PzZWK06GQ5PrxQ6O4ONsnnypjb26sz9va5le+uIUS0Yx4hxxybkUlEZCIIkIQ5EUKCQY45ASITlmIQmJFIRgxsWp1cbNajcv65pypvUFwgVjCKEr3WmqvvQ+A+X7nLp4LmgGnQzwA6IZtDPYVqrsLx3VdueI01r3ELXm1RibwQEa40P7NiHlHYQMLlLe5i0jdF8x/3/mov7hqTPPfZ242x1MXsazNJkm6Ww4SPvdN2gdWr/2N4V/tQMAAA=="
}
},
]
for event in sample_events:
log_entry = from_awslogs_data(event)
print(log_entry)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment