Skip to content

Instantly share code, notes, and snippets.

@benkay86
Last active May 29, 2023 03:47
Show Gist options
  • Save benkay86/6afffd4cf90ad84ac43e42d59d197e08 to your computer and use it in GitHub Desktop.
Save benkay86/6afffd4cf90ad84ac43e42d59d197e08 to your computer and use it in GitHub Desktop.
Attach an indicatif progress bar to a stream, similar to indicatif::ProgressIterator.
//! [Indicatif](https://github.com/mitsuhiko/indicatif) is a wonderful progress
//! bar for terminal applications. It already works in a multi-threaded
//! environment. The [`indicatif::ProgressIterator::progress_with()`]
//! combinator conveniently attaches a progress bar to an existing iterator.
//!
//! This module provides [`ProgressStream::progress_with()`] for attaching a
//! progress bar to a [`futures::stream::Stream`].
//!
//! Note that the unit tests in this module use [tokio](https://tokio.rs) for
//! a testing runtime, but the rest of the code does not depend on tokio. You
//! can simply delete the `mod tests { }` if you do not want to use tokio in
//! your project.
/// Re-export of [`indicatif`].
pub use indicatif;
use indicatif::*;
use futures_core::stream::Stream;
use std::pin::Pin;
use std::ops::{Deref, DerefMut};
use std::task::{Poll, Context};
pub struct ProgressBarStream<S, P>
where P: Deref<Target = ProgressBar> + Unpin
{
len_known: bool, // length of bar
progress: P, // progress bar
inner_stream: Pin<S> // stream to wrap
}
impl<S, P> Drop for ProgressBarStream<S, P>
where P: Deref<Target = ProgressBar> + Unpin
{
fn drop(&mut self) {
if !self.progress.is_finished() {
self.progress.finish_at_current_pos();
}
}
}
impl<D, S, T, P> Stream for ProgressBarStream<D, P>
where
D: DerefMut<Target = S> + Send + Unpin,
S: Stream<Item = T>,
T: Send,
P: Deref<Target = ProgressBar> + Unpin
{
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Get mutable reference to self.
let this = self.get_mut();
// Poll the stream.
match this.inner_stream.as_mut().poll_next(cx) {
// Update progress bar, then pass through ready item.
Poll::Ready(item) => {
// Update provisional progress bar length if the
// ultimate length is not yet known.
if !this.len_known {
// Add an extra 1 to length if there is an item to yield.
let item = match item.is_some() { true => 1, false => 0} as u64;
let size_hint = this.inner_stream.size_hint();
match size_hint.1 {
// Ultimate length is now known.
Some(len) => {
let len = this.progress.position() + item + len as u64;
this.progress.set_length(len);
this.len_known = true;
}
// Update provisional length.
None => {
let len = this.progress.position() + item + size_hint.0 as u64;
this.progress.set_length(len);
}
}
}
// Increment progress bar if there is some item.
// This must be done after adjusting length (above), otherwise
// the bar doesn't render quite right.
if item.is_some() {
this.progress.inc(1);
}
// Finish the progress bar if there are no more items in the
// stream.
if item.is_none() {
this.progress.finish_at_current_pos();
}
// Pass through ready item.
Poll::Ready(item)
},
// Pass through pending value.
Poll::Pending => Poll::Pending
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner_stream.size_hint()
}
}
/// Combinator trait for associating a progress bar with a stream.
/// See [`ProgressStream::progress_with()`].
pub trait ProgressStream {
/// Type of underlying stream.
type Stream;
/// Type yielded by underlying stream.
type Item;
/// See [`indicatif::ProgressIterator::progress_with()`].
/// For example:
///
/// ```
/// use indicatif_progress_stream::ProgressStream;
/// let stream = futures::stream::iter(0..10); // stream of 10 items
/// let pb = indicatif::ProgressBar::hidden(); // progress bar
/// stream.progress_with(&pb).for_each(|_|()).await;
/// ```
///
/// Or:
///
/// ```
/// use indicatif_progress_stream::ProgressStream;
/// let stream = futures::stream::iter(0..10); // stream of 10 items
/// let pb = Arc::new(indicatif::ProgressBar::hidden()); // progress bar
/// stream.progress_with(pb.clone()).for_each(|_|()).await;
/// ```
fn progress_with<P>(self, progress: P) -> ProgressBarStream<Box<Self::Stream>, P>
where P: Deref<Target = ProgressBar> + Unpin;
}
impl<S,T> ProgressStream for S
where
S: Stream<Item = T> + Send,
T: Send
{
type Stream = S;
type Item = T;
fn progress_with<P>(self, progress: P) -> ProgressBarStream<Box<Self::Stream>, P>
where P: Deref<Target = ProgressBar> + Unpin
{
// Make progress bar visible as soon as it is wrapped in stream.
progress.tick();
ProgressBarStream{ len_known: false, progress, inner_stream: Box::pin(self) }
}
}
/// Combinator trait for associating a progress bar with an owned stream.
/// See [`ProgressTreamOwned::progress_with()`].
pub trait ProgressStreamOwned {
/// Box, Arc, or similar type.
type Owned: DerefMut<Target = Self::Stream> + Send + Unpin;
/// Type of underlying stream.
type Stream;
/// Type yielded by underlying stream.
type Item;
/// Similar to [`ProgressStream::progress_with()`], but generic over owned
/// streams. For example:
///
/// ```
/// use indicatif_progress_stream::ProgressStream;
/// let stream = Arc::pin(futures::stream::iter(0..10));
/// let pb = Arc::new(indicatif::ProgressBar::hidden()); // progress bar
/// stream.progress_with(pb.clone()).for_each(|_|()).await;
/// ```
fn progress_with<P>(self, progress: P) -> ProgressBarStream<Self::Owned, P>
where P: Deref<Target = ProgressBar> + Unpin;
}
impl<D,S,T> ProgressStreamOwned for Pin<D>
where
D: DerefMut<Target = S> + Send + Unpin,
S: Stream<Item = T>,
T: Send
{
type Owned = D;
type Stream = S;
type Item = T;
fn progress_with<P>(self, progress: P) -> ProgressBarStream<Self::Owned, P>
where P: Deref<Target = ProgressBar> + Unpin
{
// Make progress bar visible as soon as it is wrapped in stream.
progress.tick();
ProgressBarStream{ len_known: false, progress, inner_stream: self }
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures_util::stream::StreamExt;
use std::sync::Arc;
#[tokio::test]
async fn test_progress_len_known() {
// Create stream of two items. Yields the answer of 42.
let stream = futures_util::stream::repeat(42 as u16).take(2);
let hidden_bar = indicatif::ProgressBar::hidden();
// Attach progress bar.
let mut pb_stream = stream.progress_with(&hidden_bar);
// Dequeue an item from the stream.
assert!(pb_stream.next().await == Some(42));
// Now length should reflect size of the stream, 2.
assert!(pb_stream.progress.length() == 2);
// And position should be one.
assert!(pb_stream.progress.position() == 1);
// Dequeue another item from stream.
assert!(pb_stream.next().await == Some(42));
// Length should still be 2.
assert!(pb_stream.progress.length() == 2);
// Position should be two now.
assert!(pb_stream.progress.position() == 2);
// Progress bar should not be finished yet.
assert!(pb_stream.progress.is_finished() == false);
// Finish stream.
assert!(pb_stream.next().await.is_none());
// Now bar should be finished.
assert!(pb_stream.progress.position() == 2);
assert!(pb_stream.progress.length() == 2);
assert!(pb_stream.progress.is_finished() == true);
}
#[tokio::test]
async fn test_progress_arc_len_known() {
// Create stream of two items. Yields the answer of 42.
let stream = futures_util::stream::repeat(42 as u16).take(2);
let hidden_bar = Arc::new(indicatif::ProgressBar::hidden());
// Attach progress bar.
let mut pb_stream = stream.progress_with(hidden_bar);
// Dequeue an item from the stream.
assert!(pb_stream.next().await == Some(42));
// Now length should reflect size of the stream, 2.
assert!(pb_stream.progress.length() == 2);
// And position should be one.
assert!(pb_stream.progress.position() == 1);
// Dequeue another item from stream.
assert!(pb_stream.next().await == Some(42));
// Length should still be 2.
assert!(pb_stream.progress.length() == 2);
// Position should be two now.
assert!(pb_stream.progress.position() == 2);
// Progress bar should not be finished yet.
assert!(pb_stream.progress.is_finished() == false);
// Finish stream.
assert!(pb_stream.next().await.is_none());
// Now bar should be finished.
assert!(pb_stream.progress.position() == 2);
assert!(pb_stream.progress.length() == 2);
assert!(pb_stream.progress.is_finished() == true);
}
#[tokio::test]
async fn test_progress_len_unknown() {
// Create stopping condition.
let stop = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
// Create stream of two items. Yields the answer of 42.
let stream = {
let stop = stop.clone(); // clone before moving into FnMut closure
futures_util::stream::repeat(42 as u16).take_while(move |_| {
let stop = stop.clone(); // clone before moving into async block
async move { !stop.load(std::sync::atomic::Ordering::Acquire) }
})
};
// Attach progress bar.
let hidden_bar = indicatif::ProgressBar::hidden();
let mut pb_stream = stream.progress_with(&hidden_bar);
// Dequeue an item from the stream.
assert!(pb_stream.next().await == Some(42));
// Final length of progress bar is unknown, but provisional length is 1.
assert!(pb_stream.size_hint().1 == None);
assert!(pb_stream.len_known == false);
assert!(pb_stream.progress.length() == 1);
// And position should be one.
assert!(pb_stream.progress.position() == 1);
// Dequeue another item from stream.
assert!(pb_stream.next().await == Some(42));
assert!(pb_stream.len_known == false);
assert!(pb_stream.progress.length() == 2);
assert!(pb_stream.progress.position() == 2);
// Progress bar should not be finished yet.
assert!(pb_stream.progress.is_finished() == false);
// Set stopping condition.
stop.store(true, std::sync::atomic::Ordering::Release);
// Finish stream.
assert!(pb_stream.next().await.is_none());
// Now lengh should be known and bar should be finished.
assert!(pb_stream.len_known == true);
assert!(pb_stream.progress.position() == 2);
assert!(pb_stream.progress.length() == 2);
assert!(pb_stream.progress.is_finished() == true);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment