Last active
October 31, 2024 12:12
-
-
Save ngerakines/0b10f57ce570f397cbe955266bc0573a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[package] | |
name = "corpfeeds" | |
version = "0.1.0" | |
edition = "2021" | |
[dependencies] | |
futures-util = { version = "0.3.31", features = ["sink"] } | |
http = "1.1.0" | |
serde = { version = "1.0.214", features = ["derive"] } | |
serde_json = "1.0.132" | |
tokio = { version = "1.41.0", features = ["bytes", "macros", "net", "rt", "rt-multi-thread", "signal", "sync"] } | |
tokio-websockets = { version = "0.10.1", features = ["client", "native-tls", "rand", "ring"] } | |
zstd = "0.13.2" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.67s | |
Running `target/debug/corpfeeds` Event { did: "did:plc:g4tyth5bmfehzhwjnt2ieyif", kind: "commit", time_us: 1730318015275651, commit: Some(Update { rev: "3l7qwbe5xim27", collection: "app.bsky.feed.post", rkey: "3l7qwbe5lrm27", record: Record { type: "app.bsky.feed.post", extra: {"createdAt": String("2024-10-30T19:53:34.256Z"), "langs": Array [String("pt")], "reply": Object {"parent": Object {"cid": String("bafyreicnkjwvfcdyfbjusgyknxh7xodufxwfgxzdks72qmfrqmwuvmomqm"), "uri": String("at://did:plc:vzbmqm4et3mzwoyzj7oyol4t/app.bsky.feed.post/3l7qvzoizro2t")}, "root": Object {"cid": String("bafyreifmidtbbzvtdhz2zcibealxguo7kcylrslfcismy7d7d4l4sakbjm"), "uri": String("at://did:plc:g4tyth5bmfehzhwjnt2ieyif/app.bsky.feed.post/3l7quwvc7zo27")}}, "text": String("qual cidade amg?")} }, cid: "bafyreigyt3qjvjorubyhrtkl6ryapyujttfi3ls6jc2aj2zttyasaja74u" }) } | |
Event { did: "did:plc:xddmxju5mjiia4tialqarnym", kind: "commit", time_us: 1730318015276495, commit: Some(Update { rev: "3l7qwbe7m622k", collection: "app.bsky.feed.post", rkey: "3l7qwbe7cfk2k", record: Record { type: "app.bsky.feed.post", extra: {"reply": Object {"parent": Object {"cid": String("bafyreifphamdvsxqmo4kci5uk7pim6yeilhn6lrmpqdbw3sp6frhl5ph7i"), "uri": String("at://did:plc:o2xg6g7pc5xwsdijtcxdvavv/app.bsky.feed.post/3l7qvlkk45m2p")}, "root": Object {"cid": String("bafyreifphamdvsxqmo4kci5uk7pim6yeilhn6lrmpqdbw3sp6frhl5ph7i"), "uri": String("at://did:plc:o2xg6g7pc5xwsdijtcxdvavv/app.bsky.feed.post/3l7qvlkk45m2p")}}, "createdAt": String("2024-10-30T19:53:35.543Z"), "langs": Array [String("pt")], "text": String("Ah não! Esse aí é pra sobreviver. Pra viver 15k.")} }, cid: "bafyreiejeehufpc66xjafhub25sx5ocdcbemwnqqudjwq7y5pd4eww2fbi" }) } | |
Event { did: "did:plc:zopgxysewdtkzmdyyu7kxogh", kind: "commit", time_us: 1730318015276867, commit: Some(Update { rev: "3l7qwbdkv5t2d", collection: "app.bsky.feed.post", rkey: "3l7qwbdkndt2d", record: Record { type: "app.bsky.feed.post", extra: {"createdAt": String("2024-10-30T19:55:31.830Z"), "langs": Array | |
[String("pt")], "embed": Object {"$type": String("app.bsky.embed.recordWithMedia"), "media": Object {"$type": String("app.bsky.embed.external"), "external": Object {"description": String("ALT: a close up of a person wearing sunglasses and a black shirt"), "thumb": Object {"$type": String("blob"), "mimeType" | |
: String("image/jpeg"), "ref": Object {"$link": String("bafkreieyzdu4m3dxax4oavfhzcdgkikmmgxhy4exnpwduknt4x5vf7dwbi")}, "size": Number(169218)}, "title": String("a close up of a person wearing sunglasses and a black shirt"), "uri": String("https://media.tenor.com/zZOt7alSzAMAAAAC/gojo-gojo-satoru.gif?hh=498& | |
ww=498")}}, "record": Object {"$type": String("app.bsky.embed.record"), "record": Object {"cid": String("bafyreihj4ldkevd33cusbz6fvy5svyjxsqryzmylwbhfatlfslwcquymgq"), "uri": String("at://did:plc:4mei3tgzsrylhqfotfrllfma/app.bsky.feed.post/3l7qvnc6gg42v")}}}, "text": String("proibido passar por esse POST sem | |
citar um personagem que merecia um final feliz")} }, cid: "bafyreicolg4xaqrky3ung4n73ihiz6xc7ncuxxvudzdhmkcyq2ju3xvhgi" }) } | |
... | |
$ RUST_BACKTRACE=1 RUST_LOG=debug RUST_LIB_BACKTRACE=1 cargo run | |
... | |
Event { did: "did:plc:qs6hgnkfdlacfvhimjtx7ob4", kind: "account", time_us: 1730376707771642, commit: None } | |
Event { did: "did:plc:6pid6feqxwhsfttjakjd3ejw", kind: "identity", time_us: 1730376708171679, commit: None } | |
Event { did: "did:plc:6pid6feqxwhsfttjakjd3ejw", kind: "account", time_us: 1730376708274773, commit: None } | |
Event { did: "did:plc:cbkjy5n7bk3ax2wplmtjofq2", kind: "commit", time_us: 1730376710074768, commit: Some(Update { rev: "3l7smwlijpk25", collection: "events.smokesignal.calendar.rsvp", rkey: "3l5moo43xl22w", record: Other { extra: {"$type": String("events.smokesignal.calendar.rsvp"), "status": String("events.smokesignal.calendar.rsvp#interested"), "subject": Object {"cid": String("bafyreieikhopfl57ocwqfscntrwkukytg2e4dsrbf6babqmtly73ka2bmq"), "uri": String("at://did:plc:cbkjy5n7bk3ax2wplmtjofq2/events.smokesignal.calendar.event/3l5monz7mik2w")}} }, cid: "bafyreic4sirbjxhdv2t25yhq4fn7lovbxce7qpxjm6dtfvqfm7tx74qhua" }) } | |
Event { did: "did:plc:2ghofyr5pwamicrnvrpihyun", kind: "identity", time_us: 1730376712270923, commit: None } | |
Event { did: "did:plc:itjgwu2runqzxfysokknye23", kind: "identity", time_us: 1730376712372400, commit: None } | |
Event { did: "did:plc:itjgwu2runqzxfysokknye23", kind: "account", time_us: 1730376712471312, commit: None } | |
Event { did: "did:plc:p6vliozstc3uo46lexwek3fs", kind: "identity", time_us: 1730376713071979, commit: None } | |
Event { did: "did:plc:p6vliozstc3uo46lexwek3fs", kind: "account", time_us: 1730376713175360, commit: None } | |
Event { did: "did:plc:cbkjy5n7bk3ax2wplmtjofq2", kind: "commit", time_us: 1730376716168488, commit: Some(Update { rev: "3l7smwqygmk25", collection: "events.smokesignal.calendar.rsvp", rkey: "3l5moo43xl22w", record: Other { extra: {"$type": String("events.smokesignal.calendar.rsvp"), "status": String("events.smokesignal.calendar.rsvp#going"), "subject": Object {"cid": String("bafyreieikhopfl57ocwqfscntrwkukytg2e4dsrbf6babqmtly73ka2bmq"), "uri": String("at://did:plc:cbkjy5n7bk3ax2wplmtjofq2/events.smokesignal.calendar.event/3l5monz7mik2w")}} }, cid: "bafyreiecr3sd56vv7pzogviq4zrv4devtshc3ceh2ut63gu6kvdqpg2x5q" }) } | |
Event { did: "did:plc:ycer7pqocvyx6rajpivsq2fm", kind: "identity", time_us: 1730376716572894, commit: None } | |
Event { did: "did:plc:ycer7pqocvyx6rajpivsq2fm", kind: "account", time_us: 1730376716673176, commit: None } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use futures_util::SinkExt; | |
use futures_util::StreamExt; | |
use http::Uri; | |
use serde::{Deserialize, Serialize}; | |
use serde_json::json; | |
use std::{collections::HashMap, env}; | |
use tokio_websockets::{ClientBuilder, Error, Message}; | |
#[derive(Debug, Serialize, Deserialize)] | |
#[serde(tag = "type", content = "payload")] | |
enum SubscriberSourcedMessage { | |
#[serde(rename = "options_update")] | |
Update { | |
#[serde(rename = "wantedCollections")] | |
wanted_collections: Vec<String>, | |
#[serde(rename = "wantedDids")] | |
wanted_dids: Vec<String>, | |
}, | |
} | |
#[derive(Debug, Deserialize)] | |
struct Facet { | |
features: Vec<HashMap<String, String>>, | |
} | |
#[derive(Debug, Deserialize)] | |
struct StrongRef { | |
uri: String, | |
} | |
#[derive(Debug, Deserialize)] | |
struct Reply { | |
root: Option<StrongRef>, | |
parent: Option<StrongRef>, | |
} | |
#[derive(Debug, Deserialize)] | |
#[serde(tag = "$type")] | |
enum Record { | |
#[serde(rename = "app.bsky.feed.post")] | |
Post { | |
text: String, | |
facets: Option<Vec<Facet>>, | |
reply: Option<Reply>, | |
#[serde(flatten)] | |
extra: HashMap<String, serde_json::Value>, | |
}, | |
#[serde(untagged)] | |
Other{ | |
#[serde(flatten)] | |
extra: HashMap<String, serde_json::Value>, | |
} | |
} | |
#[derive(Debug, Deserialize)] | |
#[serde(tag = "operation")] | |
enum CommitOp { | |
#[serde(rename = "create")] | |
Create { | |
rev: String, | |
collection: String, | |
rkey: String, | |
record: Record, | |
cid: String, | |
}, | |
#[serde(rename = "update")] | |
Update { | |
rev: String, | |
collection: String, | |
rkey: String, | |
record: Record, | |
cid: String, | |
}, | |
#[serde(rename = "delete")] | |
Delete { | |
rev: String, | |
collection: String, | |
rkey: String, | |
}, | |
} | |
#[derive(Debug, Deserialize)] | |
struct Event { | |
did: String, | |
kind: String, | |
time_us: u64, | |
commit: Option<CommitOp>, | |
} | |
#[tokio::main] | |
async fn main() -> Result<(), Error> { | |
let path = env::current_dir()?; | |
// mkdir -p data/ && curl -o data/zstd_dictionary https://github.com/bluesky-social/jetstream/raw/refs/heads/main/pkg/models/zstd_dictionary | |
let data: Vec<u8> = std::fs::read(path.join("data").join("zstd_dictionary"))?; | |
let uri = Uri::from_static( | |
"wss://jetstream2.us-east.bsky.network/subscribe?compress=true&requireHello=true", | |
); | |
let (mut client, _) = ClientBuilder::from_uri(uri).connect().await?; | |
let update = SubscriberSourcedMessage::Update { | |
wanted_collections: vec!["events.smokesignal.calendar.rsvp".to_string()], | |
wanted_dids: vec![], | |
}; | |
let serialized_update = serde_json::to_string(&update).unwrap(); | |
client.send(Message::text(serialized_update)).await?; | |
let mut decompressor = zstd::bulk::Decompressor::with_dictionary(&data)?; | |
while let Some(item) = client.next().await { | |
if let Err(err) = item { | |
println!("{err:?}"); | |
continue; | |
} | |
let item = item.unwrap(); | |
if !item.is_binary() { | |
println!("item is not binary"); | |
continue; | |
} | |
let payload = item.into_payload(); | |
let decoded = decompressor.decompress(&payload, 10000); | |
if let Err(err) = decoded { | |
println!("{err:?}"); | |
continue; | |
} | |
let decoded = decoded.unwrap(); | |
let event = serde_json::from_slice::<Event>(&decoded); | |
if let Err(err) = event { | |
println!("{err:?}"); | |
println!("{:?}", std::str::from_utf8(&decoded)); | |
continue; | |
} | |
let event = event.unwrap(); | |
println!("{event:?}"); | |
} | |
Ok(()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment