Last active
September 12, 2022 19:49
-
-
Save valsteen/103aac191afa881d88829bb9e3699784 to your computer and use it in GitHub Desktop.
Self-feeding processing stream proof of concept
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[package] | |
name = "processing_stream" | |
version = "0.1.0" | |
edition = "2021" | |
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | |
[dependencies] | |
futures = "0.3.24" | |
tokio = { version = "1.21.0", features = ["full"] } | |
derive_more = { version = "0.99.17", features = ["deref", "deref_mut"] } | |
serde_json = { version = "1.0.85" } | |
[[bin]] | |
name = "processing_stream" | |
path = "processing_stream.rs" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// This demonstrates a processing queue that can send back elements to the queue, using | |
/// streams https://docs.rs/futures/latest/futures/stream/trait.Stream.html | |
/// | |
/// It uses an unbounded channel that is seeded by the starting tasks, and the sender | |
/// is passed along the element inside the channel. The channel remains open | |
/// as long as there are references to the sender. | |
/// | |
/// Once the queue is empty and the processor doesn't send back a task, the sender reference | |
/// count is zero and the channel closes, allowing the receiver to close and exit the loop once | |
/// it is empty. | |
/// | |
/// Concurrency limit is controlled with flatten_unordered | |
use futures::channel::mpsc::{unbounded, UnboundedSender}; | |
use futures::sink::SinkExt; | |
use futures::stream::StreamExt; | |
use std::time::{Duration, SystemTime}; | |
use tokio::time::sleep; | |
use derive_more::{Deref, DerefMut}; | |
use serde_json::{json, Value}; | |
#[tokio::main] | |
async fn main() { | |
let max_concurrency = 10; | |
let mut tree = json!([ | |
1, | |
2, | |
[3, [4, 5, [6, 7]], 8], | |
[9, 10], | |
11, | |
12, | |
13, | |
[14, 15, [16, [17, 18, 19, [20, 21, 22]]]] | |
]); | |
// Simple wrapper to allow channels that send their own sender type | |
// https://doc.rust-lang.org/reference/types.html#recursive-types | |
// "Recursive types must include a nominal type in the recursion" | |
#[derive(Clone, Deref, DerefMut)] | |
struct SenderWrapper<T>(UnboundedSender<(T, SenderWrapper<T>)>); | |
let (sender, receiver) = unbounded(); | |
// Seed the channel with the root-level elements. | |
// The sender is dropped from the main loop, and references to it only exists as | |
// items waiting in the channel. | |
// Once all items are consumed, this will close the channel. | |
{ | |
let mut sender = SenderWrapper(sender); | |
for item in tree.as_array_mut().unwrap().drain(..) { | |
let sender_clone = sender.clone(); | |
sender.send((item, sender_clone)).await.unwrap(); | |
} | |
} | |
let start_time = SystemTime::now(); | |
// processing loop. receiver can receive back tasks sent from the processing function | |
receiver | |
.for_each_concurrent(max_concurrency, move |(value, mut sender)| async move { | |
if let Some(next) = process(value).await { | |
for item in next { | |
let sender_clone = sender.clone(); | |
sender.send((item, sender_clone)).await.unwrap() | |
} | |
} | |
}) | |
.await; | |
let duration = start_time.elapsed().unwrap().as_secs_f32(); | |
println!("Finished in {duration:.2} seconds"); | |
} | |
async fn process(value: Value) -> Option<Vec<Value>> { | |
match value { | |
Value::Number(n) => { | |
println!(">> Processing final value {n}"); | |
sleep(Duration::from_secs(25 - n.as_u64().unwrap())).await; | |
println!(">> Processed final value {n}"); | |
None | |
} | |
Value::Array(a) => { | |
let len = a.len(); | |
println!("@@ got list of {len}, rescheduling them"); | |
sleep(Duration::from_secs(len as u64)).await; | |
Some(a) | |
} | |
_ => unreachable!(), | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment