Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save jayhuang75/0cbf5715a84543a672f382d851a470e2 to your computer and use it in GitHub Desktop.

Select an option

Save jayhuang75/0cbf5715a84543a672f382d851a470e2 to your computer and use it in GitHub Desktop.
nextjs-rust-streaming-websocket-twitter-streaming.rs
pub async fn twitter_stream(
topic: String,
sender: mpsc::UnboundedSender<std::result::Result<Message, warp::Error>>,
) {
let consumer_key = env::var("consumer_key").expect("load consumer key failed failed");
let consumer_secret = env::var("consumer_secret").expect("load consumer_secret failed failed");
let access_token = env::var("access_token").expect("load access_key failed failed");
let access_token_secret =
env::var("access_token_secret").expect("load access_token_secret failed failed");
let token = Token::from_parts(
consumer_key,
consumer_secret,
access_token,
access_token_secret,
);
TwitterStream::track(&topic, &token)
.try_flatten_stream()
.try_for_each(|json| {
// process the decoded JSON
let msg: StreamMessage =
serde_json::from_str(&json).expect("failed to convert tweet JSON to struct");
match msg {
StreamMessage::Tweet(tweet) => {
// && !tweet.possibly_sensitive
if tweet.lang.eq("en") {
let serialized = serde_json::to_string(&tweet).unwrap();
log::info!("twitter : {}", serialized);
sender.send(Ok(Message::text(serialized))).unwrap();
}
}
StreamMessage::Other(_) => {
log::warn!("not a tweet. ignored");
}
}
future::ok(())
})
.await
.unwrap();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment