Last active
May 29, 2023 03:47
-
-
Save benkay86/6afffd4cf90ad84ac43e42d59d197e08 to your computer and use it in GitHub Desktop.
Attach an indicatif progress bar to a stream, similar to indicatif::ProgressIterator.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//! [Indicatif](https://github.com/mitsuhiko/indicatif) is a wonderful progress | |
//! bar for terminal applications. It already works in a multi-threaded | |
//! environment. The [`indicatif::ProgressIterator::progress_with()`] | |
//! combinator conveniently attaches a progress bar to an existing iterator. | |
//! | |
//! This module provides [`ProgressStream::progress_with()`] for attaching a | |
//! progress bar to a [`futures::stream::Stream`]. | |
//! | |
//! Note that the unit tests in this module use [tokio](https://tokio.rs) for | |
//! a testing runtime, but the rest of the code does not depend on tokio. You | |
//! can simply delete the `mod tests { }` if you do not want to use tokio in | |
//! your project. | |
/// Re-export of [`indicatif`]. | |
pub use indicatif; | |
use indicatif::*; | |
use futures_core::stream::Stream; | |
use std::pin::Pin; | |
use std::ops::{Deref, DerefMut}; | |
use std::task::{Poll, Context}; | |
pub struct ProgressBarStream<S, P> | |
where P: Deref<Target = ProgressBar> + Unpin | |
{ | |
len_known: bool, // length of bar | |
progress: P, // progress bar | |
inner_stream: Pin<S> // stream to wrap | |
} | |
impl<S, P> Drop for ProgressBarStream<S, P> | |
where P: Deref<Target = ProgressBar> + Unpin | |
{ | |
fn drop(&mut self) { | |
if !self.progress.is_finished() { | |
self.progress.finish_at_current_pos(); | |
} | |
} | |
} | |
impl<D, S, T, P> Stream for ProgressBarStream<D, P> | |
where | |
D: DerefMut<Target = S> + Send + Unpin, | |
S: Stream<Item = T>, | |
T: Send, | |
P: Deref<Target = ProgressBar> + Unpin | |
{ | |
type Item = T; | |
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | |
// Get mutable reference to self. | |
let this = self.get_mut(); | |
// Poll the stream. | |
match this.inner_stream.as_mut().poll_next(cx) { | |
// Update progress bar, then pass through ready item. | |
Poll::Ready(item) => { | |
// Update provisional progress bar length if the | |
// ultimate length is not yet known. | |
if !this.len_known { | |
// Add an extra 1 to length if there is an item to yield. | |
let item = match item.is_some() { true => 1, false => 0} as u64; | |
let size_hint = this.inner_stream.size_hint(); | |
match size_hint.1 { | |
// Ultimate length is now known. | |
Some(len) => { | |
let len = this.progress.position() + item + len as u64; | |
this.progress.set_length(len); | |
this.len_known = true; | |
} | |
// Update provisional length. | |
None => { | |
let len = this.progress.position() + item + size_hint.0 as u64; | |
this.progress.set_length(len); | |
} | |
} | |
} | |
// Increment progress bar if there is some item. | |
// This must be done after adjusting length (above), otherwise | |
// the bar doesn't render quite right. | |
if item.is_some() { | |
this.progress.inc(1); | |
} | |
// Finish the progress bar if there are no more items in the | |
// stream. | |
if item.is_none() { | |
this.progress.finish_at_current_pos(); | |
} | |
// Pass through ready item. | |
Poll::Ready(item) | |
}, | |
// Pass through pending value. | |
Poll::Pending => Poll::Pending | |
} | |
} | |
fn size_hint(&self) -> (usize, Option<usize>) { | |
self.inner_stream.size_hint() | |
} | |
} | |
/// Combinator trait for associating a progress bar with a stream. | |
/// See [`ProgressStream::progress_with()`]. | |
pub trait ProgressStream { | |
/// Type of underlying stream. | |
type Stream; | |
/// Type yielded by underlying stream. | |
type Item; | |
/// See [`indicatif::ProgressIterator::progress_with()`]. | |
/// For example: | |
/// | |
/// ``` | |
/// use indicatif_progress_stream::ProgressStream; | |
/// let stream = futures::stream::iter(0..10); // stream of 10 items | |
/// let pb = indicatif::ProgressBar::hidden(); // progress bar | |
/// stream.progress_with(&pb).for_each(|_|()).await; | |
/// ``` | |
/// | |
/// Or: | |
/// | |
/// ``` | |
/// use indicatif_progress_stream::ProgressStream; | |
/// let stream = futures::stream::iter(0..10); // stream of 10 items | |
/// let pb = Arc::new(indicatif::ProgressBar::hidden()); // progress bar | |
/// stream.progress_with(pb.clone()).for_each(|_|()).await; | |
/// ``` | |
fn progress_with<P>(self, progress: P) -> ProgressBarStream<Box<Self::Stream>, P> | |
where P: Deref<Target = ProgressBar> + Unpin; | |
} | |
impl<S,T> ProgressStream for S | |
where | |
S: Stream<Item = T> + Send, | |
T: Send | |
{ | |
type Stream = S; | |
type Item = T; | |
fn progress_with<P>(self, progress: P) -> ProgressBarStream<Box<Self::Stream>, P> | |
where P: Deref<Target = ProgressBar> + Unpin | |
{ | |
// Make progress bar visible as soon as it is wrapped in stream. | |
progress.tick(); | |
ProgressBarStream{ len_known: false, progress, inner_stream: Box::pin(self) } | |
} | |
} | |
/// Combinator trait for associating a progress bar with an owned stream. | |
/// See [`ProgressTreamOwned::progress_with()`]. | |
pub trait ProgressStreamOwned { | |
/// Box, Arc, or similar type. | |
type Owned: DerefMut<Target = Self::Stream> + Send + Unpin; | |
/// Type of underlying stream. | |
type Stream; | |
/// Type yielded by underlying stream. | |
type Item; | |
/// Similar to [`ProgressStream::progress_with()`], but generic over owned | |
/// streams. For example: | |
/// | |
/// ``` | |
/// use indicatif_progress_stream::ProgressStream; | |
/// let stream = Arc::pin(futures::stream::iter(0..10)); | |
/// let pb = Arc::new(indicatif::ProgressBar::hidden()); // progress bar | |
/// stream.progress_with(pb.clone()).for_each(|_|()).await; | |
/// ``` | |
fn progress_with<P>(self, progress: P) -> ProgressBarStream<Self::Owned, P> | |
where P: Deref<Target = ProgressBar> + Unpin; | |
} | |
impl<D,S,T> ProgressStreamOwned for Pin<D> | |
where | |
D: DerefMut<Target = S> + Send + Unpin, | |
S: Stream<Item = T>, | |
T: Send | |
{ | |
type Owned = D; | |
type Stream = S; | |
type Item = T; | |
fn progress_with<P>(self, progress: P) -> ProgressBarStream<Self::Owned, P> | |
where P: Deref<Target = ProgressBar> + Unpin | |
{ | |
// Make progress bar visible as soon as it is wrapped in stream. | |
progress.tick(); | |
ProgressBarStream{ len_known: false, progress, inner_stream: self } | |
} | |
} | |
#[cfg(test)] | |
mod tests { | |
use super::*; | |
use futures_util::stream::StreamExt; | |
use std::sync::Arc; | |
#[tokio::test] | |
async fn test_progress_len_known() { | |
// Create stream of two items. Yields the answer of 42. | |
let stream = futures_util::stream::repeat(42 as u16).take(2); | |
let hidden_bar = indicatif::ProgressBar::hidden(); | |
// Attach progress bar. | |
let mut pb_stream = stream.progress_with(&hidden_bar); | |
// Dequeue an item from the stream. | |
assert!(pb_stream.next().await == Some(42)); | |
// Now length should reflect size of the stream, 2. | |
assert!(pb_stream.progress.length() == 2); | |
// And position should be one. | |
assert!(pb_stream.progress.position() == 1); | |
// Dequeue another item from stream. | |
assert!(pb_stream.next().await == Some(42)); | |
// Length should still be 2. | |
assert!(pb_stream.progress.length() == 2); | |
// Position should be two now. | |
assert!(pb_stream.progress.position() == 2); | |
// Progress bar should not be finished yet. | |
assert!(pb_stream.progress.is_finished() == false); | |
// Finish stream. | |
assert!(pb_stream.next().await.is_none()); | |
// Now bar should be finished. | |
assert!(pb_stream.progress.position() == 2); | |
assert!(pb_stream.progress.length() == 2); | |
assert!(pb_stream.progress.is_finished() == true); | |
} | |
#[tokio::test] | |
async fn test_progress_arc_len_known() { | |
// Create stream of two items. Yields the answer of 42. | |
let stream = futures_util::stream::repeat(42 as u16).take(2); | |
let hidden_bar = Arc::new(indicatif::ProgressBar::hidden()); | |
// Attach progress bar. | |
let mut pb_stream = stream.progress_with(hidden_bar); | |
// Dequeue an item from the stream. | |
assert!(pb_stream.next().await == Some(42)); | |
// Now length should reflect size of the stream, 2. | |
assert!(pb_stream.progress.length() == 2); | |
// And position should be one. | |
assert!(pb_stream.progress.position() == 1); | |
// Dequeue another item from stream. | |
assert!(pb_stream.next().await == Some(42)); | |
// Length should still be 2. | |
assert!(pb_stream.progress.length() == 2); | |
// Position should be two now. | |
assert!(pb_stream.progress.position() == 2); | |
// Progress bar should not be finished yet. | |
assert!(pb_stream.progress.is_finished() == false); | |
// Finish stream. | |
assert!(pb_stream.next().await.is_none()); | |
// Now bar should be finished. | |
assert!(pb_stream.progress.position() == 2); | |
assert!(pb_stream.progress.length() == 2); | |
assert!(pb_stream.progress.is_finished() == true); | |
} | |
#[tokio::test] | |
async fn test_progress_len_unknown() { | |
// Create stopping condition. | |
let stop = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); | |
// Create stream of two items. Yields the answer of 42. | |
let stream = { | |
let stop = stop.clone(); // clone before moving into FnMut closure | |
futures_util::stream::repeat(42 as u16).take_while(move |_| { | |
let stop = stop.clone(); // clone before moving into async block | |
async move { !stop.load(std::sync::atomic::Ordering::Acquire) } | |
}) | |
}; | |
// Attach progress bar. | |
let hidden_bar = indicatif::ProgressBar::hidden(); | |
let mut pb_stream = stream.progress_with(&hidden_bar); | |
// Dequeue an item from the stream. | |
assert!(pb_stream.next().await == Some(42)); | |
// Final length of progress bar is unknown, but provisional length is 1. | |
assert!(pb_stream.size_hint().1 == None); | |
assert!(pb_stream.len_known == false); | |
assert!(pb_stream.progress.length() == 1); | |
// And position should be one. | |
assert!(pb_stream.progress.position() == 1); | |
// Dequeue another item from stream. | |
assert!(pb_stream.next().await == Some(42)); | |
assert!(pb_stream.len_known == false); | |
assert!(pb_stream.progress.length() == 2); | |
assert!(pb_stream.progress.position() == 2); | |
// Progress bar should not be finished yet. | |
assert!(pb_stream.progress.is_finished() == false); | |
// Set stopping condition. | |
stop.store(true, std::sync::atomic::Ordering::Release); | |
// Finish stream. | |
assert!(pb_stream.next().await.is_none()); | |
// Now lengh should be known and bar should be finished. | |
assert!(pb_stream.len_known == true); | |
assert!(pb_stream.progress.position() == 2); | |
assert!(pb_stream.progress.length() == 2); | |
assert!(pb_stream.progress.is_finished() == true); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment