Skip to content

Instantly share code, notes, and snippets.

@ngerakines
Last active October 31, 2024 12:12
Show Gist options
  • Save ngerakines/0b10f57ce570f397cbe955266bc0573a to your computer and use it in GitHub Desktop.
Save ngerakines/0b10f57ce570f397cbe955266bc0573a to your computer and use it in GitHub Desktop.
[package]
name = "corpfeeds"
version = "0.1.0"
edition = "2021"
[dependencies]
futures-util = { version = "0.3.31", features = ["sink"] }
http = "1.1.0"
serde = { version = "1.0.214", features = ["derive"] }
serde_json = "1.0.132"
tokio = { version = "1.41.0", features = ["bytes", "macros", "net", "rt", "rt-multi-thread", "signal", "sync"] }
tokio-websockets = { version = "0.10.1", features = ["client", "native-tls", "rand", "ring"] }
zstd = "0.13.2"
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.67s
Running `target/debug/corpfeeds` Event { did: "did:plc:g4tyth5bmfehzhwjnt2ieyif", kind: "commit", time_us: 1730318015275651, commit: Some(Update { rev: "3l7qwbe5xim27", collection: "app.bsky.feed.post", rkey: "3l7qwbe5lrm27", record: Record { type: "app.bsky.feed.post", extra: {"createdAt": String("2024-10-30T19:53:34.256Z"), "langs": Array [String("pt")], "reply": Object {"parent": Object {"cid": String("bafyreicnkjwvfcdyfbjusgyknxh7xodufxwfgxzdks72qmfrqmwuvmomqm"), "uri": String("at://did:plc:vzbmqm4et3mzwoyzj7oyol4t/app.bsky.feed.post/3l7qvzoizro2t")}, "root": Object {"cid": String("bafyreifmidtbbzvtdhz2zcibealxguo7kcylrslfcismy7d7d4l4sakbjm"), "uri": String("at://did:plc:g4tyth5bmfehzhwjnt2ieyif/app.bsky.feed.post/3l7quwvc7zo27")}}, "text": String("qual cidade amg?")} }, cid: "bafyreigyt3qjvjorubyhrtkl6ryapyujttfi3ls6jc2aj2zttyasaja74u" }) }
Event { did: "did:plc:xddmxju5mjiia4tialqarnym", kind: "commit", time_us: 1730318015276495, commit: Some(Update { rev: "3l7qwbe7m622k", collection: "app.bsky.feed.post", rkey: "3l7qwbe7cfk2k", record: Record { type: "app.bsky.feed.post", extra: {"reply": Object {"parent": Object {"cid": String("bafyreifphamdvsxqmo4kci5uk7pim6yeilhn6lrmpqdbw3sp6frhl5ph7i"), "uri": String("at://did:plc:o2xg6g7pc5xwsdijtcxdvavv/app.bsky.feed.post/3l7qvlkk45m2p")}, "root": Object {"cid": String("bafyreifphamdvsxqmo4kci5uk7pim6yeilhn6lrmpqdbw3sp6frhl5ph7i"), "uri": String("at://did:plc:o2xg6g7pc5xwsdijtcxdvavv/app.bsky.feed.post/3l7qvlkk45m2p")}}, "createdAt": String("2024-10-30T19:53:35.543Z"), "langs": Array [String("pt")], "text": String("Ah não! Esse aí é pra sobreviver. Pra viver 15k.")} }, cid: "bafyreiejeehufpc66xjafhub25sx5ocdcbemwnqqudjwq7y5pd4eww2fbi" }) }
Event { did: "did:plc:zopgxysewdtkzmdyyu7kxogh", kind: "commit", time_us: 1730318015276867, commit: Some(Update { rev: "3l7qwbdkv5t2d", collection: "app.bsky.feed.post", rkey: "3l7qwbdkndt2d", record: Record { type: "app.bsky.feed.post", extra: {"createdAt": String("2024-10-30T19:55:31.830Z"), "langs": Array
[String("pt")], "embed": Object {"$type": String("app.bsky.embed.recordWithMedia"), "media": Object {"$type": String("app.bsky.embed.external"), "external": Object {"description": String("ALT: a close up of a person wearing sunglasses and a black shirt"), "thumb": Object {"$type": String("blob"), "mimeType"
: String("image/jpeg"), "ref": Object {"$link": String("bafkreieyzdu4m3dxax4oavfhzcdgkikmmgxhy4exnpwduknt4x5vf7dwbi")}, "size": Number(169218)}, "title": String("a close up of a person wearing sunglasses and a black shirt"), "uri": String("https://media.tenor.com/zZOt7alSzAMAAAAC/gojo-gojo-satoru.gif?hh=498&
ww=498")}}, "record": Object {"$type": String("app.bsky.embed.record"), "record": Object {"cid": String("bafyreihj4ldkevd33cusbz6fvy5svyjxsqryzmylwbhfatlfslwcquymgq"), "uri": String("at://did:plc:4mei3tgzsrylhqfotfrllfma/app.bsky.feed.post/3l7qvnc6gg42v")}}}, "text": String("proibido passar por esse POST sem
citar um personagem que merecia um final feliz")} }, cid: "bafyreicolg4xaqrky3ung4n73ihiz6xc7ncuxxvudzdhmkcyq2ju3xvhgi" }) }
...
$ RUST_BACKTRACE=1 RUST_LOG=debug RUST_LIB_BACKTRACE=1 cargo run
...
Event { did: "did:plc:qs6hgnkfdlacfvhimjtx7ob4", kind: "account", time_us: 1730376707771642, commit: None }
Event { did: "did:plc:6pid6feqxwhsfttjakjd3ejw", kind: "identity", time_us: 1730376708171679, commit: None }
Event { did: "did:plc:6pid6feqxwhsfttjakjd3ejw", kind: "account", time_us: 1730376708274773, commit: None }
Event { did: "did:plc:cbkjy5n7bk3ax2wplmtjofq2", kind: "commit", time_us: 1730376710074768, commit: Some(Update { rev: "3l7smwlijpk25", collection: "events.smokesignal.calendar.rsvp", rkey: "3l5moo43xl22w", record: Other { extra: {"$type": String("events.smokesignal.calendar.rsvp"), "status": String("events.smokesignal.calendar.rsvp#interested"), "subject": Object {"cid": String("bafyreieikhopfl57ocwqfscntrwkukytg2e4dsrbf6babqmtly73ka2bmq"), "uri": String("at://did:plc:cbkjy5n7bk3ax2wplmtjofq2/events.smokesignal.calendar.event/3l5monz7mik2w")}} }, cid: "bafyreic4sirbjxhdv2t25yhq4fn7lovbxce7qpxjm6dtfvqfm7tx74qhua" }) }
Event { did: "did:plc:2ghofyr5pwamicrnvrpihyun", kind: "identity", time_us: 1730376712270923, commit: None }
Event { did: "did:plc:itjgwu2runqzxfysokknye23", kind: "identity", time_us: 1730376712372400, commit: None }
Event { did: "did:plc:itjgwu2runqzxfysokknye23", kind: "account", time_us: 1730376712471312, commit: None }
Event { did: "did:plc:p6vliozstc3uo46lexwek3fs", kind: "identity", time_us: 1730376713071979, commit: None }
Event { did: "did:plc:p6vliozstc3uo46lexwek3fs", kind: "account", time_us: 1730376713175360, commit: None }
Event { did: "did:plc:cbkjy5n7bk3ax2wplmtjofq2", kind: "commit", time_us: 1730376716168488, commit: Some(Update { rev: "3l7smwqygmk25", collection: "events.smokesignal.calendar.rsvp", rkey: "3l5moo43xl22w", record: Other { extra: {"$type": String("events.smokesignal.calendar.rsvp"), "status": String("events.smokesignal.calendar.rsvp#going"), "subject": Object {"cid": String("bafyreieikhopfl57ocwqfscntrwkukytg2e4dsrbf6babqmtly73ka2bmq"), "uri": String("at://did:plc:cbkjy5n7bk3ax2wplmtjofq2/events.smokesignal.calendar.event/3l5monz7mik2w")}} }, cid: "bafyreiecr3sd56vv7pzogviq4zrv4devtshc3ceh2ut63gu6kvdqpg2x5q" }) }
Event { did: "did:plc:ycer7pqocvyx6rajpivsq2fm", kind: "identity", time_us: 1730376716572894, commit: None }
Event { did: "did:plc:ycer7pqocvyx6rajpivsq2fm", kind: "account", time_us: 1730376716673176, commit: None }
use futures_util::SinkExt;
use futures_util::StreamExt;
use http::Uri;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::{collections::HashMap, env};
use tokio_websockets::{ClientBuilder, Error, Message};
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type", content = "payload")]
enum SubscriberSourcedMessage {
#[serde(rename = "options_update")]
Update {
#[serde(rename = "wantedCollections")]
wanted_collections: Vec<String>,
#[serde(rename = "wantedDids")]
wanted_dids: Vec<String>,
},
}
#[derive(Debug, Deserialize)]
struct Facet {
features: Vec<HashMap<String, String>>,
}
#[derive(Debug, Deserialize)]
struct StrongRef {
uri: String,
}
#[derive(Debug, Deserialize)]
struct Reply {
root: Option<StrongRef>,
parent: Option<StrongRef>,
}
#[derive(Debug, Deserialize)]
#[serde(tag = "$type")]
enum Record {
#[serde(rename = "app.bsky.feed.post")]
Post {
text: String,
facets: Option<Vec<Facet>>,
reply: Option<Reply>,
#[serde(flatten)]
extra: HashMap<String, serde_json::Value>,
},
#[serde(untagged)]
Other{
#[serde(flatten)]
extra: HashMap<String, serde_json::Value>,
}
}
#[derive(Debug, Deserialize)]
#[serde(tag = "operation")]
enum CommitOp {
#[serde(rename = "create")]
Create {
rev: String,
collection: String,
rkey: String,
record: Record,
cid: String,
},
#[serde(rename = "update")]
Update {
rev: String,
collection: String,
rkey: String,
record: Record,
cid: String,
},
#[serde(rename = "delete")]
Delete {
rev: String,
collection: String,
rkey: String,
},
}
#[derive(Debug, Deserialize)]
struct Event {
did: String,
kind: String,
time_us: u64,
commit: Option<CommitOp>,
}
#[tokio::main]
async fn main() -> Result<(), Error> {
let path = env::current_dir()?;
// mkdir -p data/ && curl -o data/zstd_dictionary https://github.com/bluesky-social/jetstream/raw/refs/heads/main/pkg/models/zstd_dictionary
let data: Vec<u8> = std::fs::read(path.join("data").join("zstd_dictionary"))?;
let uri = Uri::from_static(
"wss://jetstream2.us-east.bsky.network/subscribe?compress=true&requireHello=true",
);
let (mut client, _) = ClientBuilder::from_uri(uri).connect().await?;
let update = SubscriberSourcedMessage::Update {
wanted_collections: vec!["events.smokesignal.calendar.rsvp".to_string()],
wanted_dids: vec![],
};
let serialized_update = serde_json::to_string(&update).unwrap();
client.send(Message::text(serialized_update)).await?;
let mut decompressor = zstd::bulk::Decompressor::with_dictionary(&data)?;
while let Some(item) = client.next().await {
if let Err(err) = item {
println!("{err:?}");
continue;
}
let item = item.unwrap();
if !item.is_binary() {
println!("item is not binary");
continue;
}
let payload = item.into_payload();
let decoded = decompressor.decompress(&payload, 10000);
if let Err(err) = decoded {
println!("{err:?}");
continue;
}
let decoded = decoded.unwrap();
let event = serde_json::from_slice::<Event>(&decoded);
if let Err(err) = event {
println!("{err:?}");
println!("{:?}", std::str::from_utf8(&decoded));
continue;
}
let event = event.unwrap();
println!("{event:?}");
}
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment