Last active
March 18, 2021 19:42
-
-
Save benkay86/f4f004d54103996c49839032aa1c464e to your computer and use it in GitHub Desktop.
Rust iterators and streams over globs with a uniform error type.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//! Tools to enhance [`glob::glob()`]. Provides a [`GlobIterError`] type that | |
//! enumerates over [`glob::PatternError`] and [`glob::GlobError`] with a | |
//! corresponding [`GlobPathIter`] as a drop-in replacement for | |
//! [`glob::Paths`]. Also provides a [`GlobPatternIter`] for iterating over | |
//! paths matching multiple glob patterns. | |
//! | |
//! Note that the underlying glob crate does not support asynchronous pattern | |
//! matching. This module provides [`UnboundedGlobPathStream`] | |
//! and [`UnboundedGlobPatternStream`], which use [`super::sidestream`] to do | |
//! blocking operations on a separate thread. | |
//! | |
//! See: | |
//! [sidestream](https://gist.github.com/benkay86/fbfc84babca9b0996d6aee66087e59c4) | |
//! [flatten_result](https://gist.github.com/benkay86/8960008023c62cd5cf5239c10c6fea3e) | |
use futures_core::stream::Stream; | |
use futures_util::stream::{StreamExt, TryStreamExt}; | |
use std::iter::Iterator; | |
use std::path::PathBuf; | |
use std::pin::Pin; | |
use std::task::{Poll, Context}; | |
use super::sidestream::{SideStreamExtForIterator, UnboundedSideStream}; | |
use super::flatten_result::*; | |
/// Enumeration of errors that can be returned by this module. | |
#[derive(thiserror::Error, Debug)] | |
pub enum GlobIterError { | |
/// See [`glob::GlobError`]. | |
#[error("Unreadable match {} for glob pattern {}.", source.path().to_string_lossy(), pattern)] | |
GlobError { | |
pattern: String, | |
source: glob::GlobError, | |
}, | |
/// See [`glob::PatternError`]. | |
#[error("Could not compile glob pattern {}.", pattern)] | |
PatternError { | |
pattern: String, | |
source: glob::PatternError, | |
} | |
} | |
/// Alternative iterator over [`glob::Paths`] which stores its corresponding | |
/// glob pattern and yields `Result<_,GlobIterError>` instead of | |
/// `Result<_glob::GlobError>`. | |
pub struct GlobPathIter { | |
// Made from glob::glob(). | |
paths: glob::Paths, | |
// Pattern used to make the paths, for adding context to errors. | |
pattern: String | |
} | |
impl GlobPathIter { | |
/// Create a new GlobPathIter from a pattern. Wraps [`glob::PatternError`] | |
/// in a [`GlobIterError`]. | |
pub fn try_from_pattern<P: Into<String>>(pattern: P) -> Result<Self, GlobIterError> { | |
// Convert pattern into string. | |
let pattern = pattern.into(); | |
// Try to compile the pattern. | |
let paths = match glob::glob(&pattern) { | |
// Unwrap iterator over paths. | |
Ok(paths) => paths, | |
// Return error if pattern compilation fails. | |
Err(e) => { | |
return Err(GlobIterError::PatternError{pattern, source: e}); | |
} | |
}; | |
// Create the GlobPathIter. | |
Ok(GlobPathIter{paths, pattern}) | |
} | |
} | |
impl Iterator for GlobPathIter { | |
type Item = Result<PathBuf,GlobIterError>; | |
fn next(&mut self) -> Option<Self::Item> { | |
// Yield next PathBuf, mapping errors to GlobIterError. | |
self.paths.next().and_then(|result| Some( | |
result.or_else(|err| Err( | |
GlobIterError::GlobError{ pattern: self.pattern.clone(), source: err } | |
)) | |
)) | |
} | |
fn size_hint(&self) -> (usize, Option<usize>) { | |
self.paths.size_hint() // delegate to Paths iterator | |
} | |
} | |
// Note cannot make generic TryFrom due to: | |
// https://github.com/rust-lang/rust/issues/50133 | |
impl std::convert::TryFrom<String> for GlobPathIter { | |
type Error = GlobIterError; | |
fn try_from(pattern: String) -> Result<Self, Self::Error> { | |
GlobPathIter::try_from_pattern(pattern) | |
} | |
} | |
impl std::convert::TryFrom<&str> for GlobPathIter { | |
type Error = GlobIterError; | |
fn try_from(pattern: &str) -> Result<Self, Self::Error> { | |
GlobPathIter::try_from_pattern(pattern) | |
} | |
} | |
/// Stream over [`glob::Paths`] which uses a [`GlobPathIter`] internally and | |
/// makes it asynchronous by iterating on a separate thread using | |
/// [`super::sidestream`]. | |
pub struct UnboundedGlobPathStream { | |
inner_stream: UnboundedSideStream<<GlobPathIter as Iterator>::Item> | |
} | |
impl UnboundedGlobPathStream { | |
/// Convert a [`GlobPathIter`] into an [`UnboundedGlobPathStream`]. | |
pub fn from_iter(iter: GlobPathIter) -> Self { | |
// Wrap iterator in unbounded sidestream. | |
UnboundedGlobPathStream{inner_stream: iter.sidestream()} | |
} | |
/// Create a new GlobPathStream from a pattern. Wraps [`glob::PatternError`] | |
/// in a [`GlobIterError`]. | |
pub fn try_from_pattern<P: Into<String>>(pattern: P) -> Result<Self, GlobIterError> { | |
Ok(Self::from_iter(GlobPathIter::try_from_pattern(pattern)?)) | |
} | |
} | |
impl Stream for UnboundedGlobPathStream { | |
type Item = <GlobPathIter as Iterator>::Item; | |
fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> { | |
// Get mutable reference to self. | |
let this = self.get_mut(); | |
// Temporarily pin inner stream to the stack. | |
// We an do this safely because the type of the inner stream implements | |
// Unpin, which cancels the pinning guarantee. | |
let inner_stream = Pin::new(&mut this.inner_stream); | |
// Poll inner stream. | |
inner_stream.poll_next(cx) | |
} | |
fn size_hint(&self) -> (usize, Option<usize>) { | |
// Delegate to inner stream's size hint. | |
self.inner_stream.size_hint() | |
} | |
} | |
impl From<GlobPathIter> for UnboundedGlobPathStream { | |
fn from(iter: GlobPathIter) -> Self { | |
Self::from_iter(iter) | |
} | |
} | |
// Note cannot make generic TryFrom due to: | |
// https://github.com/rust-lang/rust/issues/50133 | |
impl std::convert::TryFrom<String> for UnboundedGlobPathStream { | |
type Error = GlobIterError; | |
fn try_from(pattern: String) -> Result<Self, Self::Error> { | |
UnboundedGlobPathStream::try_from_pattern(pattern) | |
} | |
} | |
impl std::convert::TryFrom<&str> for UnboundedGlobPathStream { | |
type Error = GlobIterError; | |
fn try_from(pattern: &str) -> Result<Self, Self::Error> { | |
UnboundedGlobPathStream::try_from_pattern(pattern) | |
} | |
} | |
/// Iterator over one or more glob patterns (as Strings) that yields all | |
/// matching [`std::path::PathBuf`]. | |
pub struct GlobPatternIter<I,P> | |
{ | |
// Holds a FlatMap internally, which flattens and maps iterator over | |
// patterns to iterator over PathBufs while mapping all errors to | |
// GlobIterError. | |
inner_iter: | |
std::iter::FlatMap< | |
I, // input iterator where I: Iterator <Item = P> | |
FlattenResultIter<GlobPathIter, GlobIterError>, // output iterator | |
fn(P)->FlattenResultIter<GlobPathIter, GlobIterError> // mapping closure as function pointer | |
> | |
} | |
impl<I,P> GlobPatternIter<I,P> | |
where | |
I: Iterator<Item = P>, // iterator over patterns | |
P: Into<String> // pattern | |
{ | |
// Private function to launder type of contained closure, which flattens an | |
// iterator of results. | |
fn flatten_results<T: Into<String>>(pattern: T) -> FlattenResultIter<GlobPathIter, GlobIterError> { | |
GlobPathIter::try_from_pattern(pattern).flat_iter() | |
} | |
// Make a GlobPatternIter that flattens the iterator over each glob pattern. | |
pub fn from_pattern(pattern_iter: I) -> Self { | |
Self { inner_iter: pattern_iter.flat_map(Self::flatten_results) } | |
} | |
} | |
impl<I,P> Iterator for GlobPatternIter<I,P> | |
where | |
I: Iterator<Item = P>, // iterator over patterns | |
P: Into<String> // pattern | |
{ | |
type Item = Result<PathBuf,GlobIterError>; | |
fn next(&mut self) -> Option<Self::Item> { | |
self.inner_iter.next() // delegate to inner iterator | |
} | |
fn size_hint(&self) -> (usize, Option<usize>) { | |
self.inner_iter.size_hint() // delegate to inner iterator | |
} | |
} | |
/// Extension trait to convert an iterator of glob patterns (as Strings) to | |
/// a [`GlobPatternIter`]. | |
pub trait GlobPatternIterExt { | |
/// Type of iterator over patterns. | |
type Iterator; | |
/// Type of pattern (should be `Into<String>`). | |
type Pattern; | |
/// See [`GlobPatternIter::from_pattern()`]. | |
/// Use as `path_iter = pattern_iter.glob_pattern_iter()`. | |
fn glob_pattern_iter(self) -> GlobPatternIter<Self::Iterator, Self::Pattern>; | |
} | |
impl<I, P> GlobPatternIterExt for I | |
where | |
I: Iterator<Item = P>, // iterator over patterns | |
P: Into<String> // pattern | |
{ | |
type Iterator = I; | |
type Pattern = P; | |
fn glob_pattern_iter(self) -> GlobPatternIter<Self::Iterator, Self::Pattern> { | |
GlobPatternIter::<Self::Iterator,Self::Pattern>::from_pattern(self) | |
} | |
} | |
/// Stream over one or more glob patterns (as Strings) that yields all | |
/// matching [`std::path::PathBuf`]. Note that this is not a truly asynchronous | |
/// stream because the underlying [`glob::Paths`] is not asynchronous. You | |
/// should prefer to use [`GlobPatternIter`] where possible. To make this | |
/// stream asynchronous, consider using the [`super::sidestream`] module. | |
pub struct UnboundedGlobPatternStream<S,P> where S: Stream<Item = P> // and P is the pattern, i.e. `P: Into<String>` | |
{ | |
// Holds a FlatMap internally, which flattens and maps stream over | |
// patterns to stream over PathBufs while mapping all errors to | |
// GlobIterError. | |
inner_stream: futures_util::stream::TryFlatten<futures_util::stream::Map<S,fn(P)->Result<UnboundedGlobPathStream,GlobIterError>>> | |
} | |
impl <S,P> Stream for UnboundedGlobPatternStream<S,P> | |
where | |
S: Stream<Item = P> + Send + Unpin + 'static | |
{ | |
type Item = Result<PathBuf, GlobIterError>; | |
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | |
// Get mutable reference to self. | |
let this = self.get_mut(); | |
// Temporarily pin inner stream to the stack. | |
// We an do this safely because the type of the inner stream implements | |
// Unpin, which cancels the pinning guarantee. | |
let inner_stream = Pin::new(&mut this.inner_stream); | |
inner_stream.poll_next(cx) | |
} | |
fn size_hint(&self) -> (usize, Option<usize>) { | |
// Delegate to inner stream. | |
self.inner_stream.size_hint() | |
} | |
} | |
/// Extension trait to convert a stream of glob patterns (as Strings) to | |
/// an [`UnboundedGlobPatternStream`]. | |
pub trait UnboundedGlobPatternStreamExt { | |
/// Type of stream over patterns. | |
type Stream: Send + Stream<Item = Self::Pattern> + Unpin + 'static; | |
/// Type of pattern (should be `Into<String>`). | |
type Pattern: Send + 'static; | |
/// Convert a stream of glob patterns (as Strings) to a stream over all | |
/// matching [`std::path::PathBuf`]. | |
fn glob_pattern_stream(self) -> UnboundedGlobPatternStream<Self::Stream,Self::Pattern>; | |
} | |
impl<S, P> UnboundedGlobPatternStreamExt for S | |
where | |
S: Stream<Item = P> + Send + Unpin + 'static, // iterator over patterns | |
P: Into<String> + Send + 'static // pattern | |
{ | |
type Stream = S; | |
type Pattern = P; | |
fn glob_pattern_stream(self) -> UnboundedGlobPatternStream<Self::Stream,Self::Pattern> { | |
UnboundedGlobPatternStream { inner_stream: self.map(UnboundedGlobPathStream::try_from_pattern as fn(_)->_).try_flatten() } | |
} | |
} | |
/// Similar to [`UnboundedGlobPatternStream`], but instead of wrapping a stream | |
/// of patterns it wraps a stream of `Result<Pattern,_>`, passing through any | |
/// errors. | |
pub struct TryUnboundedGlobPatternStream<S,P,E> where S: Stream<Item = Result<P,E>> | |
// P is pattern, i.e. `P: Into<string>` | |
// E is the error type, i.e. `E: From<GlobIterError>` | |
{ | |
// Holds a FlatMap internally, which flattens and maps stream over | |
// patterns to stream over PathBufs while mapping all errors to | |
// GlobIterError. | |
inner_stream: futures_util::stream::TryFlatten<futures_util::stream::Map<S,fn(Result<P,E>)->Result<futures_util::stream::ErrInto<UnboundedGlobPathStream,E>,E>>> | |
} | |
impl<S,P,E> TryUnboundedGlobPatternStream<S,P,E> | |
where | |
S: Stream<Item = Result<P,E>> + Send + Unpin + 'static, // stream over patterns | |
P: Into<String> + Send + 'static, // pattern | |
E: From<GlobIterError> + Unpin // error | |
{ | |
// Private function to launder error type from GlobIterError to the error | |
// type of the parent stream, E: From<GlobIterError> | |
fn convert_err(result: Result<P,E>) -> Result<futures_util::stream::ErrInto<UnboundedGlobPathStream,E>,E> { | |
Ok(UnboundedGlobPathStream::try_from_pattern(result?)?.err_into()) | |
} | |
/// Make a stream of `Result<PathBuf,E>` from a stream of glob patterns | |
/// wrapped in results `Result<Pattern,E>` where `Pattern: Into<String>` and | |
/// `E: From<GlobIterError>`. | |
pub fn from_patterns(pattern_stream: S) -> Self { | |
Self { inner_stream: pattern_stream.map(Self::convert_err as fn(_)->_).try_flatten() } | |
} | |
} | |
impl <S,P,E> Stream for TryUnboundedGlobPatternStream<S,P,E> | |
where | |
S: Stream<Item = Result<P,E>> + Send + Unpin + 'static, // stream over patterns | |
P: Into<String> + Send + 'static, // pattern | |
E: From<GlobIterError> + Unpin // error | |
{ | |
type Item = Result<PathBuf, E>; | |
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | |
// Get mutable reference to self. | |
let this = self.get_mut(); | |
// Temporarily pin inner stream to the stack. | |
// We an do this safely because the type of the inner stream implements | |
// Unpin, which cancels the pinning guarantee. | |
let inner_stream = Pin::new(&mut this.inner_stream); | |
inner_stream.poll_next(cx) | |
} | |
fn size_hint(&self) -> (usize, Option<usize>) { | |
// Delegate to inner stream. | |
self.inner_stream.size_hint() | |
} | |
} | |
/// Extension trait to convert a stream of glob patterns as Result<String,_> to | |
/// a [`TryUnboundedGlobPatternStream`]. | |
pub trait TryUnboundedGlobPatternStreamExt { | |
/// Type of stream over patterns. | |
type Stream: Send + Stream<Item = Result<Self::Pattern, Self::Error>> + Unpin + 'static; | |
/// Type of pattern (should be `Into<String>`). | |
type Pattern: Send + 'static; | |
/// Type of error. Must be able to convert from [`GlobIterError`] to this | |
/// error type. | |
type Error: From<GlobIterError> + Unpin; | |
/// Use as `let stream_of_paths = stream_of_patterns.glob_pattern_stream()`. | |
/// See [`TryUnboundedGlobPatternStream::from_patterns`]. | |
fn glob_pattern_stream(self) -> TryUnboundedGlobPatternStream<Self::Stream,Self::Pattern,Self::Error>; | |
} | |
impl<S, P, E> TryUnboundedGlobPatternStreamExt for S | |
where | |
S: Stream<Item = Result<P,E>> + Send + Unpin + 'static, // iterator over patterns | |
P: Into<String> + Send + 'static, // pattern | |
E: From<GlobIterError> + Unpin // error | |
{ | |
type Stream = S; | |
type Pattern = P; | |
type Error = E; | |
fn glob_pattern_stream(self) -> TryUnboundedGlobPatternStream<Self::Stream,Self::Pattern,Self::Error> { | |
TryUnboundedGlobPatternStream::<Self::Stream,Self::Pattern,Self::Error>::from_patterns(self) | |
} | |
} | |
// TODO could use some more exhaustive testing | |
#[cfg(test)] | |
mod tests { | |
use super::*; | |
use std::convert::TryFrom; | |
#[test] | |
fn test_glob_path_iter() { | |
let mut paths = GlobPathIter::try_from("Carg*.toml").unwrap(); | |
let path = paths.next().unwrap().unwrap(); | |
assert!(path == PathBuf::from("Cargo.toml")); | |
} | |
#[test] | |
fn test_glob_pattern_iter() { | |
let patterns: Vec<&str> = vec!["Carg*.toml"]; | |
let mut paths = patterns.into_iter().glob_pattern_iter(); | |
let path = paths.next().unwrap().unwrap(); | |
assert!(path == PathBuf::from("Cargo.toml")); | |
} | |
#[tokio::test] | |
async fn test_glob_path_stream() { | |
let mut paths = UnboundedGlobPathStream::try_from("Carg*.toml").unwrap(); | |
let path = paths.next().await.unwrap().unwrap(); | |
assert!(path == PathBuf::from("Cargo.toml")); | |
} | |
#[tokio::test] | |
async fn test_glob_pattern_stream() { | |
let patterns: Vec<&str> = vec!["Carg*.toml"]; | |
let mut paths = futures_util::stream::iter(patterns).glob_pattern_stream(); | |
let path = paths.next().await.unwrap().unwrap(); | |
assert!(path == PathBuf::from("Cargo.toml")); | |
} | |
#[tokio::test] | |
async fn test_glob_pattern_try_stream() { | |
let patterns: Vec<Result<&str,GlobIterError>> = vec![Ok("Carg*.toml")]; | |
let mut paths = futures_util::stream::iter(patterns).glob_pattern_stream(); | |
let path = paths.next().await.unwrap().unwrap(); | |
assert!(path == PathBuf::from("Cargo.toml")); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment