Skip to content

Instantly share code, notes, and snippets.

@davepeck
Last active May 16, 2025 12:49
Show Gist options
  • Save davepeck/8ada49d42d44a5632b540a4093225719 to your computer and use it in GitHub Desktop.
Save davepeck/8ada49d42d44a5632b540a4093225719 to your computer and use it in GitHub Desktop.
Zero-dependency Python script to decode the Bluesky firehose

Zero-dependency Python script to decode the Bluesky Firehose

This is a pure Python implementation of code necessary to decode the Bluesky Firehose's low-level binary data format.

It does not depend on any third-party libraries for decoding; we implement DAG_CBOR and CARv1 decoding ourselves.

To run this, install Astral's UV.

Then:

$ uv run https://gist.githubusercontent.com/davepeck/8ada49d42d44a5632b540a4093225719/raw/fe73141037a9ee9e7cfa941ec83c07ca8b89e27a/firehose.py
# ... so much output!

Or you can download the script locally and run it as:

$ ./firehose.py
# ... so much data!

I have notes about this on my website. My notes are often "in progress" as I learn more, but hopefully that + the comments in the code itself are useful!

It depends on httpx and httpx_ws, right?

Yes! I wanted to focus on decoding, not on building a WebSockets client. :-)

#!/usr/bin/env -S uv run -q
# /// script
# requires-python = ">=3.13"
# dependencies = [
# "httpx",
# "httpx-ws",
# ]
# ///
import json
import struct
import typing as t
from base64 import b32encode
from io import BytesIO
from httpx_ws import connect_ws
BSKY_FIREHOSE = "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
class JSONEncoderWithBytes(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, bytes):
return obj.hex()
return super().default(obj)
def read_uvarint(stream: t.IO[bytes]) -> int:
"""
Read a multiformats unsigned varint from the given stream.
See the specification at https://github.com/multiformats/unsigned-varint
And the reference Go implementation at https://github.com/multiformats/go-varint
"""
shift = 0
result = 0
while True:
byte = stream.read(1)
if not byte:
raise ValueError("Unexpected end of input while parsing varint.")
byte_val = byte[0]
result |= (byte_val & 0x7F) << shift
shift += 7
if not (byte_val & 0x80):
break
return result
def multibase_encode_b(b: bytes) -> str:
"""
Encode the given byte string using RFC 4648 base32 encoding, case-insensitive,
without padding. Add a multibase prefix 'b' to indicate the encoding.
See the raw encoding specification at https://tools.ietf.org/html/rfc4648#section-6
See the multibase specification at https://github.com/multiformats/multibase
"""
b32_str = b32encode(b).decode("ascii").replace("=", "").lower()
return f"b{b32_str}"
def encode_dag_cbor_cid(value: bytes) -> str:
"""
Convert a CID tag value to a base32 encoded CID string with a multibase prefix.
This is the default representation for CIDs used elsewhere in the ATProto,
and in examples, so it should be familiar.
A CID (Content Identifier) is a multiformats self-describing
content-addressed identifier.
See the specification for CIDs in general at:
https://github.com/multiformats/cid
See the specification for CIDs found in DAG_CBOR (aka tag 42) at:
https://github.com/ipld/cid-cbor/
Other useful details about CIDs can be found at:
https://docs.ipfs.tech/concepts/content-addressing/#cid-versions
And the reference Go implementation at: https://github.com/ipfs/go-cid
"""
if len(value) != 37:
raise NotImplementedError("Only DAG_CBOR encoded CIDs are supported.")
multibase_prefix = value[0]
if multibase_prefix != 0x00: # Multibase identity prefix
raise ValueError("DAG_CBOR CIDs must have a multibase identity prefix.")
cid_data = value[1:]
return multibase_encode_b(cid_data)
# See the IANA registry for CBOR tags at
# https://www.iana.org/assignments/cbor-tags/cbor-tags.xhtml
CID_CBOR_TAG = 42
def read_dag_cbor(stream: t.IO[bytes]) -> t.Any:
"""
Decodes a DAG_CBOR encoded byte string from the given stream.
The base CBOR specification is RFC 8949; details at https://cbor.io
There's a useful CBOR playground at https://cbor.me
DAG_CBOR is a more restrictive variant of CBOR defined in IPLD; see:
https://ipld.io/specs/codecs/dag-cbor/spec/
"""
initial_byte = stream.read(1)
if not initial_byte:
raise ValueError("Unexpected end of input while decoding CBOR.")
initial_value = initial_byte[0]
major_type = initial_value >> 5
additional_info = initial_value & 0x1F
if major_type == 0: # Unsigned integer
return read_cbor_uint(stream, additional_info)
elif major_type == 1: # Negative integer
return -1 - read_cbor_uint(stream, additional_info)
elif major_type == 2: # Byte string
length = read_cbor_uint(stream, additional_info)
return stream.read(length)
elif major_type == 3: # Text string
length = read_cbor_uint(stream, additional_info)
return stream.read(length).decode("utf-8")
elif major_type == 4: # Array
length = read_cbor_uint(stream, additional_info)
return [read_dag_cbor(stream) for _ in range(length)]
elif major_type == 5: # Map
length = read_cbor_uint(stream, additional_info)
return {read_dag_cbor(stream): read_dag_cbor(stream) for _ in range(length)}
elif major_type == 6: # Tagged item
# DAG_CBOR *requires* all tags to be of type 42 (IPLD CID)
# We convert these to base32 CID strings by default
tag = read_cbor_uint(stream, additional_info)
if tag != CID_CBOR_TAG:
raise ValueError(f"Unsupported CBOR tag {tag} in DAG_CBOR.")
value = read_dag_cbor(stream)
return encode_dag_cbor_cid(value)
elif major_type == 7: # Simple values and floats
if additional_info == 20: # False
return False
elif additional_info == 21: # True
return True
elif additional_info == 22: # Null
return None
elif additional_info == 23: # Undefined
# Technically, this is not supported in DAG_CBOR. But we'll allow it.
return None # CBOR 'undefined' is translated as None
elif additional_info == 25: # Half-precision float (not implemented)
raise NotImplementedError("Half-precision floats are not supported.")
elif additional_info == 26: # Single-precision float
return struct.unpack(">f", stream.read(4))[0]
elif additional_info == 27: # Double-precision float
return struct.unpack(">d", stream.read(8))[0]
else:
raise ValueError(
f"Unsupported simple value with additional info {additional_info}."
)
else:
raise ValueError(f"Unsupported DAG_CBOR major type {major_type}.")
def read_cbor_uint(stream: t.IO[bytes], additional_info: int) -> int:
"""
Parses an unsigned integer from the stream based on the additional information.
See https://cbor.io/spec.html#ints for details.
"""
if additional_info < 24:
return additional_info
elif additional_info == 24:
return struct.unpack(">B", stream.read(1))[0]
elif additional_info == 25:
return struct.unpack(">H", stream.read(2))[0]
elif additional_info == 26:
return struct.unpack(">I", stream.read(4))[0]
elif additional_info == 27:
return struct.unpack(">Q", stream.read(8))[0]
else:
raise ValueError(
f"Unsupported additional information for integer parsing: {additional_info}."
)
def read_carv1(stream: t.IO[bytes]) -> t.Any:
"""
Decodes a CARv1 encoded byte string from the given stream.
CARv1 is a format used for content-addressed archives in IPLD.
See the specification at: https://ipld.io/specs/transport/car/carv2/
(This is the CARv2 specification, but CARv1 is a subset of it.)
See the reference Go implementation at: https://github.com/ipld/go-car
"""
# Dict containing the CAR header, with a 'roots' and a 'version' key
header = read_car_header(stream)
car_version = header["version"]
if car_version != 1:
raise ValueError(f"Unsupported CAR version {car_version}.")
blocks = []
while True:
try:
node = read_car_node(stream)
blocks.append(node)
except ValueError:
break
return {"header": header, "blocks": blocks}
def read_car_header(stream: t.IO[bytes]) -> dict:
"""Read the header of any CAR version from the given stream."""
cbor_bytes = read_car_ld(stream)
with BytesIO(cbor_bytes) as bio:
return read_dag_cbor(bio)
def read_car_node(stream: t.IO[bytes]) -> dict:
"""Read a single CAR node from the given stream."""
bytes = read_car_ld(stream)
cid_bytes = bytes[:36]
cid_str = encode_dag_cbor_cid(b"\00" + cid_bytes)
data_cbor = bytes[36:]
data = read_dag_cbor(BytesIO(data_cbor))
return {"cid": cid_str, "data": data}
def read_car_ld(stream: t.IO[bytes]) -> bytes:
"""Read the CAR link data section from the given stream."""
length = read_uvarint(stream)
return stream.read(length)
def read_firehose_frame(frame: bytes) -> tuple[dict, dict]:
"""
Read a single frame from the BSKY firehose stream.
Each frame contains two CBOR-encoded DAG structures: the header,
and the body of the message.
"""
with BytesIO(frame) as bio:
# Read the frame header and body.
#
# The header is a dict that contains the message type, which is one of:
# "#commit", "#account", "#identity", "#handle", "#tombstone"
#
# The body is a dict that contains the message data, which is specific
# to the type. If you're interested, for instances, in posts, you want
# "#commit" messages that contain a "create" operation to the
# "app.bsky.feed.post/*" path.
header, body = read_dag_cbor(bio), read_dag_cbor(bio)
# If this frame contains an op that includes blocks
# (for instance, a repo #commit), we need to decode the
# blocks from CARv1 format
#
# The CAR header will contain a "root" reference (CID) to
# the top-level data object, which is described in "Commit Objects"
# in the atproto documentation, here:
#
# https://atproto.com/specs/repository
#
# Our code returns a sequence of nodes in the order they appear
# in the archive; from them and their various CID references, you
# could construct a full MST tree as described in that link.
# This would be useful if you wanted to fully verify the message.
#
# On the other hand, you can also easily just filter through the
# blocks to find something you're looking for, like a specific
# type of object like a post (where "$type" is "app.bsky.feed.post")
body_blocks = body.get("blocks")
if isinstance(body_blocks, bytes):
body["blocks"] = read_carv1(BytesIO(body_blocks))
return header, body
def firehose():
with connect_ws(BSKY_FIREHOSE) as ws:
while True:
cbor_frame = ws.receive_bytes()
header, body = read_firehose_frame(cbor_frame)
frame = {"header": header, "body": body}
print(json.dumps(frame, cls=JSONEncoderWithBytes))
if __name__ == "__main__":
firehose()

MIT License

Copyright (c) 2025 Dave Peck [email protected]

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

@jhculb
Copy link

jhculb commented May 16, 2025

Thanks for writing this, it's great code!
I'm using this code in a project of mine, is this under any copyright - and if not - how can I cite you?

@davepeck
Copy link
Author

davepeck commented May 16, 2025

Thanks for writing this, it's great code! I'm using this code in a project of mine, is this under any copyright - and if not - how can I cite you?

Glad it was helpful, @jhculb !

And: just added an MIT license. Have at it! :-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment