-
-
Save longfellowone/a1db9db3157c3350f409064d148d096b to your computer and use it in GitHub Desktop.
Rust Redis with Async/Await (single threaded concurrency) | RustLang by example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#![allow(unused)] // silence unused warnings while exploring (to comment out) | |
use std::{error::Error, time::Duration}; | |
use tokio::time::sleep; | |
use redis::{ | |
from_redis_value, | |
streams::{StreamRangeReply, StreamReadOptions, StreamReadReply}, | |
AsyncCommands, Client, | |
}; | |
// YouTube video: https://youtu.be/uD5hBVHwyDM | |
// region: Notes | |
// | |
// Docker redis command: docker run --name redis_1 --rm -p 6379:6379 -it redis:6 -- --loglevel verbose | |
// Cargo watch command: cargo watch -q -c -x 'run -q' | |
// Crates redis: https://crates.io/crates/redis | |
// | |
// endregion: Notes | |
#[tokio::main(flavor = "current_thread")] | |
async fn main() -> Result<(), Box<dyn Error>> { | |
// 1) Create Connection | |
let client = Client::open("redis://127.0.0.1/")?; | |
let mut con = client.get_tokio_connection().await?; | |
// 2) Set / Get Key | |
con.set("my_key", "Hello world!").await?; | |
let result: String = con.get("my_key").await?; | |
println!("->> my_key: {}\n", result); | |
// 3) xadd to redis stream | |
con.xadd("my_stream", "*", &[("name", "name-01"), ("title", "title 01")]).await?; | |
let len: i32 = con.xlen("my_stream").await?; | |
println!("->> my_stream len {}\n", len); | |
// 4) xrevrange the read stream | |
let result: Option<StreamRangeReply> = con.xrevrange_count("my_stream", "+", "-", 10).await?; | |
if let Some(reply) = result { | |
for stream_id in reply.ids { | |
println!("->> xrevrange stream entity: {} ", stream_id.id); | |
for (name, value) in stream_id.map.iter() { | |
println!(" ->> {}: {}", name, from_redis_value::<String>(value)?); | |
} | |
println!(); | |
} | |
} | |
// 5) Blocking xread | |
tokio::spawn(async { | |
let client = Client::open("redis://127.0.0.1/").unwrap(); | |
let mut con = client.get_tokio_connection().await.unwrap(); | |
loop { | |
let opts = StreamReadOptions::default().count(1).block(0); | |
let result: Option<StreamReadReply> = con.xread_options(&["my_stream"], &["$"], opts).await.unwrap(); | |
if let Some(reply) = result { | |
for stream_key in reply.keys { | |
println!("->> xread block: {}", stream_key.key); | |
for stream_id in stream_key.ids { | |
println!(" ->> StreamId: {:?}", stream_id); | |
} | |
} | |
println!(); | |
} | |
} | |
}); | |
// 6) Add some stream entries | |
sleep(Duration::from_millis(100)).await; | |
con.xadd("my_stream", "*", &[("name", "name-02"), ("title", "title 02")]).await?; | |
sleep(Duration::from_millis(100)).await; | |
con.xadd("my_stream", "*", &[("name", "name-03"), ("title", "title 03")]).await?; | |
// 7) Final wait & cleanup | |
sleep(Duration::from_millis(1000)).await; | |
con.del("my_key").await?; | |
con.del("my_stream").await?; | |
println!("->> the end"); | |
Ok(()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment