Created
February 27, 2024 13:26
-
-
Save fancellu/71c0746d8a79a6f95c8add6d9a39bd5b to your computer and use it in GitHub Desktop.
Rust tokio channels demo
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use tokio::sync::broadcast; | |
use tokio::sync::mpsc; | |
use tokio::sync::oneshot; | |
#[tokio::main] | |
async fn main() { | |
let (tx, rx) = oneshot::channel(); | |
tokio::spawn(async move { | |
tokio::time::sleep(std::time::Duration::from_secs(1)).await; | |
if let Err(_) = tx.send(99) { | |
println!("the receiver dropped"); | |
} | |
}); | |
println!("Waiting on rx"); | |
match rx.await { | |
Ok(v) => println!("got oneshot = {:?}", v), | |
Err(_) => println!("the sender dropped"), | |
} | |
// Buffer of 3 will create back pressure when full | |
let (tx, mut rx) = mpsc::channel(3); | |
tokio::spawn(async move { | |
for i in 0..10 { | |
println!("Sending {}", i); | |
if let Err(_) = tx.send(i).await { | |
println!("receiver dropped"); | |
return; | |
} | |
} | |
}); | |
while let Some(i) = rx.recv().await { | |
// We read slowly so that we can see sender block when buffer is full | |
tokio::time::sleep(std::time::Duration::from_millis(500)).await; | |
println!("got mpsc = {}", i); | |
} | |
// Creating a broadcast channel for strings, multiple producer and consumers | |
let (tx, mut _rx) = broadcast::channel::<String>(3); | |
let tx1 = tx.clone(); | |
let tx2 = tx.clone(); | |
tokio::spawn(async move { | |
tx1.send("hello".to_string()).unwrap(); | |
tx2.send("world".to_string()).unwrap(); | |
}); | |
let mut rx1 = tx.subscribe(); | |
let mut rx2 = tx.subscribe(); | |
// We see the sent messages | |
for _i in 0..2 { | |
let msg = rx1.recv().await.unwrap(); | |
println!("Got rx1 {}", msg); | |
} | |
// Sent messages are seen again, as we use a different receiver | |
for _i in 0..2 { | |
let msg = rx2.recv().await.unwrap(); | |
println!("Got rx2 {}", msg); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Waiting on rx
got oneshot = 99
Sending 0
Sending 1
Sending 2
Sending 3
Sending 4
got mpsc = 0
Sending 5
got mpsc = 1
Sending 6
got mpsc = 2
Sending 7
got mpsc = 3
Sending 8
got mpsc = 4
Sending 9
got mpsc = 5
got mpsc = 6
got mpsc = 7
got mpsc = 8
got mpsc = 9
Got rx1 hello
Got rx1 world
Got rx2 hello
Got rx2 world