Last active
July 9, 2024 23:29
-
-
Save BlinkyStitt/619706df5aac39e601ff0b5e6a85e88b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { | |
HubEventType, | |
getInsecureHubRpcClient, | |
getSSLHubRpcClient, | |
} from "@farcaster/hub-nodejs"; | |
// TODO: a real app should get these from environment variables | |
const HUB_URL = process.env["HUB_URL"]!; // URL + Port of the Hub | |
// TODO: a real app would use encryption, but the rust client doesn't support it yet | |
const USE_SSL = false; // set to true if talking to a hub that uses SSL (3rd party hosted hubs or hubs that require auth) | |
console.log("hello, world"); | |
(async () => { | |
// 1. Connect to the Hub | |
const client = USE_SSL ? getSSLHubRpcClient(HUB_URL) : getInsecureHubRpcClient(HUB_URL); | |
const fromId = 0; | |
// 2. Fetch the data | |
const subscribeParams: { eventTypes: HubEventType[]; fromId?: number } = { | |
eventTypes: [ | |
HubEventType.MERGE_MESSAGE, | |
HubEventType.REVOKE_MESSAGE, | |
HubEventType.PRUNE_MESSAGE, | |
HubEventType.MERGE_ON_CHAIN_EVENT, | |
HubEventType.MERGE_USERNAME_PROOF, | |
], | |
fromId, | |
}; | |
try { | |
const subscribeRequest = await client.subscribe(subscribeParams); | |
if (subscribeRequest.isErr()) { | |
const e = subscribeRequest.error; | |
throw new Error(`Error starting hub stream: ${e}`); | |
} | |
const stream = subscribeRequest.value; | |
stream.on("close", async () => { | |
throw new Error("Crashing on stream close"); | |
}); | |
let last = 0; | |
for await (const event of stream) { | |
console.log("event_id ", event.id); // Assuming event has an id property | |
if (event.id <= last) { | |
throw new Error("Event ids are not in order"); | |
} | |
last = event.id; | |
} | |
} catch (e) { | |
console.error(e); | |
} | |
// 3. Close the connection | |
client.close(); | |
})(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use hubble2kafka::{ | |
connect_hubble, init_app, | |
proto::{hub_service_client::HubServiceClient, HubEventType, SubscribeRequest}, | |
}; | |
use tonic::transport::Channel; | |
use tracing::{error, info}; | |
async fn subscribe_to_client(mut client: HubServiceClient<Channel>) -> anyhow::Result<()> { | |
let subscribe_request = SubscribeRequest { | |
event_types: vec![ | |
HubEventType::MergeMessage as i32, | |
HubEventType::RevokeMessage as i32, | |
HubEventType::PruneMessage as i32, | |
HubEventType::MergeOnChainEvent as i32, | |
HubEventType::MergeUsernameProof as i32, | |
], | |
from_id: None, | |
total_shards: None, | |
shard_index: None, | |
}; | |
let stream_response = client.subscribe(subscribe_request).await?; | |
let mut stream = stream_response.into_inner(); | |
let mut last_id: Option<u64> = None; | |
while let Some(hub_event) = stream.message().await? { | |
let new_id = hub_event.id; | |
info!(new_id); | |
if let Some(last_id) = last_id { | |
// compare last_id to this id. if its out of order, something is wack. | |
if last_id >= new_id { | |
let diff = last_id - new_id; | |
error!(last_id, new_id, diff, "event ids went backwards!"); | |
} | |
} | |
last_id = Some(new_id); | |
} | |
Ok(()) | |
} | |
#[tokio::main] | |
async fn main() -> anyhow::Result<()> { | |
init_app(9005)?; | |
info!("Hello, world!"); | |
let client = connect_hubble("http://54.234.220.140:2283".to_string()).await?; | |
subscribe_to_client(client).await?; | |
Ok(()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment