Last active
November 7, 2018 23:49
-
-
Save jessegrosjean/6b3ca3a52249b65f58294dfafca12c91 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Originally based off https://github.com/BurntSushi/ripgrep/tree/master/ignore | |
// Use as you see fit. | |
use std::fs; | |
use std::fs::{Metadata, FileType}; | |
use std::sync::{Arc, Mutex}; | |
use std::sync::mpsc::{channel, Receiver, Sender}; | |
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering}; | |
use std::collections::BinaryHeap; | |
use std::path::{Path, PathBuf}; | |
use std::time::Duration; | |
use std::cmp; | |
use std::cmp::{Ordering}; | |
use std::thread; | |
use std::vec; | |
use rand::Rng; | |
use rand::prelude::*; | |
use num_cpus; | |
use alphanumeric_sort; | |
pub struct WalkBuilder { | |
paths: Vec<PathBuf>, | |
max_depth: Option<usize>, | |
max_filesize: Option<u64>, | |
include_metadata: bool, | |
ignore_hidden: bool, | |
threads: usize, | |
} | |
pub struct Walk { | |
paths: vec::IntoIter<PathBuf>, | |
max_filesize: Option<u64>, | |
max_depth: Option<usize>, | |
include_metadata: bool, | |
ignore_hidden: bool, | |
threads: usize, | |
} | |
pub struct WalkEntry { | |
pub name: String, | |
pub path: PathBuf, | |
pub file_type: FileType, | |
pub metadata: Option<Metadata>, | |
index_path: Vec<usize>, | |
child_count: usize, | |
} | |
#[derive(Clone, Copy, Debug, Eq, PartialEq)] | |
pub enum WalkState { | |
Continue, | |
Skip, | |
Quit, | |
} | |
struct Worker { | |
callback: Box<FnMut(WalkEntry) -> WalkState + Send + 'static>, | |
work_senders: Vec<Sender<WalkEntry>>, | |
work_receiver: Receiver<WalkEntry>, | |
steal_from_queues: Vec<Arc<Mutex<BinaryHeap<WalkEntry>>>>, | |
work_queue: Arc<Mutex<BinaryHeap<WalkEntry>>>, | |
active_work_count: Arc<AtomicUsize>, | |
max_depth: Option<usize>, | |
max_filesize: Option<u64>, | |
quit_now: Arc<AtomicBool>, | |
threads: usize, | |
include_metadata: bool, | |
ignore_hidden: bool, | |
id: usize, | |
} | |
struct NextEntry { | |
index_path: Vec<usize>, | |
remaining_siblings: Vec<usize>, | |
} | |
impl WalkBuilder { | |
pub fn new<P: AsRef<Path>>(path: P) -> WalkBuilder { | |
WalkBuilder { | |
paths: vec![path.as_ref().to_path_buf()], | |
max_depth: None, | |
max_filesize: None, | |
include_metadata: false, | |
ignore_hidden: true, | |
threads: cmp::min(12, num_cpus::get()), | |
} | |
} | |
pub fn build(&self) -> Walk { | |
Walk { | |
paths: self.paths.clone().into_iter(), | |
max_depth: self.max_depth, | |
max_filesize: self.max_filesize, | |
include_metadata: self.include_metadata, | |
ignore_hidden: self.ignore_hidden, | |
threads: self.threads, | |
} | |
} | |
pub fn add<P: AsRef<Path>>(&mut self, path: P) -> &mut WalkBuilder { | |
self.paths.push(path.as_ref().to_path_buf()); | |
self | |
} | |
pub fn max_depth(&mut self, depth: Option<usize>) -> &mut WalkBuilder { | |
self.max_depth = depth; | |
self | |
} | |
pub fn include_metadata(&mut self, yes: bool) -> &mut WalkBuilder { | |
self.include_metadata = yes; | |
self | |
} | |
pub fn ignore_hidden(&mut self, yes: bool) -> &mut WalkBuilder { | |
self.ignore_hidden = yes; | |
self | |
} | |
pub fn max_filesize(&mut self, filesize: Option<u64>) -> &mut WalkBuilder { | |
self.max_filesize = filesize; | |
self | |
} | |
pub fn threads(&mut self, n: usize) -> &mut WalkBuilder { | |
self.threads = n; | |
self | |
} | |
} | |
impl Walk { | |
pub fn entries(self) -> Receiver<WalkEntry> { | |
let (sx, rx) = channel::<WalkEntry>(); | |
let (ordered_sx, ordered_rx) = channel::<WalkEntry>(); | |
thread::spawn(move || { | |
let mut receive_buffer = BinaryHeap::new(); | |
let mut next_entry = NextEntry::new(); | |
while let Ok(entry) = rx.recv() { | |
receive_buffer.push(entry); | |
while next_entry.matches(receive_buffer.peek()) { | |
let send_entry = receive_buffer.pop().unwrap(); | |
next_entry.advance_past(&send_entry); | |
if ordered_sx.send(send_entry).is_err() { | |
return | |
} | |
} | |
} | |
while let Some(entry) = receive_buffer.pop() { | |
if ordered_sx.send(entry).is_err() { | |
return | |
} | |
} | |
}); | |
self.run_async(move || { | |
let sx = sx.clone(); | |
Box::new(move |entry| { | |
if sx.send(entry).is_err() { | |
WalkState::Quit | |
} else { | |
WalkState::Continue | |
} | |
}) | |
}); | |
ordered_rx | |
} | |
pub fn run<F>(self, make_callback: F) | |
where F: FnMut() -> Box<FnMut(WalkEntry) -> WalkState + Send + 'static> | |
{ | |
self.inner_run(false, make_callback); | |
} | |
pub fn run_async<F>(self, make_callback: F) | |
where F: FnMut() -> Box<FnMut(WalkEntry) -> WalkState + Send + 'static> | |
{ | |
self.inner_run(true, make_callback); | |
} | |
fn inner_run<F>(self, async: bool, mut make_callback: F) | |
where F: FnMut() -> Box<FnMut(WalkEntry) -> WalkState + Send + 'static> | |
{ | |
let quit_now = Arc::new(AtomicBool::new(false)); | |
let worker_count = cmp::max(1, self.threads); | |
let mut active_work_count = 0; | |
let mut handles = vec![]; | |
let mut work_queues = Vec::new(); | |
let mut work_senders = Vec::new(); | |
let mut work_receivers = Vec::new(); | |
for _ in 0..worker_count { | |
let (sender, reciver) = channel::<WalkEntry>(); | |
work_senders.push(sender); | |
work_receivers.push(reciver); | |
work_queues.push(Arc::new(Mutex::new(BinaryHeap::new()))); | |
} | |
for each in self.paths { | |
let mut rng = thread_rng(); | |
if let Some(each_entry) = WalkEntry::new(&each) { | |
if rng.choose(&work_senders).unwrap().send(each_entry).is_ok() { | |
active_work_count += 1; | |
} | |
} | |
} | |
let active_work_count = Arc::new(AtomicUsize::new(active_work_count)); | |
for i in 0..worker_count { | |
let mut worker = Worker { | |
callback: make_callback(), | |
max_depth: self.max_depth, | |
max_filesize: self.max_filesize, | |
ignore_hidden: self.ignore_hidden, | |
steal_from_queues: work_queues.clone(), | |
work_senders: work_senders.clone(), | |
work_receiver: work_receivers.pop().unwrap(), | |
work_queue: work_queues[i].clone(), | |
active_work_count: active_work_count.clone(), | |
quit_now: quit_now.clone(), | |
include_metadata: self.include_metadata, | |
threads: worker_count, | |
id: i, | |
}; | |
handles.push(thread::spawn(move || worker.run())); | |
} | |
let sync = !async; | |
if sync { | |
for handle in handles { | |
handle.join().unwrap(); | |
} | |
assert!(active_work_count.load(AtomicOrdering::SeqCst) == 0); | |
} | |
} | |
} | |
impl WalkState { | |
fn is_quit(self) -> bool { | |
self == WalkState::Quit | |
} | |
} | |
impl Worker { | |
fn run(&mut self) { | |
self.steal_from_queues.remove(self.id); // dont' steal from self! | |
while let Some(mut entry) = self.get_work() { | |
let should_skip_child_entries = | |
entry.is_symlink() || | |
!entry.is_dir() || | |
self.max_depth.map_or(false, |max_depth| { | |
entry.depth() >= max_depth | |
}); | |
if should_skip_child_entries { | |
if (self.callback)(entry).is_quit() { | |
self.quit_now(); | |
return | |
} | |
self.completed_work(); | |
continue; | |
} | |
let readdir = match fs::read_dir(&entry.path) { | |
Ok(readdir) => readdir, | |
Err(_err) => { | |
self.completed_work(); | |
continue; | |
} | |
}; | |
let entry_index_path = entry.index_path.clone(); | |
let mut child_entries: Vec<_> = readdir.filter_map(|entry_result| { | |
let entry = match entry_result { | |
Ok(entry) => entry, | |
Err(_err) => { | |
return None | |
} | |
}; | |
let name = match entry.file_name().to_str() { | |
Some(name) => name.to_string(), | |
None => return None | |
}; | |
let file_type = match entry.file_type() { | |
Ok(file_type) => file_type, | |
Err(_err) => { | |
return None | |
} | |
}; | |
let path = entry.path(); | |
let metadata = if self.include_metadata { entry.metadata().ok() } else { None }; | |
if self.should_skip_entry(&name, file_type, &path, &metadata) { | |
return None | |
} | |
let mut index_path = Vec::with_capacity(entry_index_path.len() + 1); | |
index_path.extend_from_slice(&entry_index_path[..]); | |
Some(WalkEntry { | |
name, | |
path, | |
index_path, | |
file_type, | |
metadata, | |
child_count: 0 | |
}) | |
}).collect(); | |
child_entries.sort_by(|a, b| { | |
alphanumeric_sort::compare_str(a.name(), b.name()) | |
}); | |
child_entries.iter_mut().enumerate().for_each(|(i, each)| { | |
each.index_path.push(i); | |
}); | |
entry.child_count = child_entries.len(); | |
match (self.callback)(entry) { | |
WalkState::Continue => (), | |
WalkState::Skip => { | |
self.completed_work(); | |
continue; | |
} | |
WalkState::Quit => { | |
self.quit_now(); | |
return | |
} | |
} | |
self.schedule_work(child_entries); | |
self.completed_work(); | |
} | |
} | |
fn should_skip_entry(&self, name: &str, _file_type: FileType, _path: &Path, metadata: &Option<Metadata>) -> bool { | |
if self.ignore_hidden && name.chars().next() == Some('.') { | |
return true | |
} | |
if let (Some(metadata), Some(max_filesize)) = (metadata, self.max_filesize) { | |
if metadata.len() > max_filesize { | |
return true | |
} | |
} | |
false | |
} | |
fn schedule_work(&mut self, entries: Vec<WalkEntry>) { | |
let entries_count = entries.len(); | |
self.active_work_count.fetch_add(entries_count, AtomicOrdering::SeqCst); | |
let mut rng = thread_rng(); | |
for entry in entries { | |
let _ = rng.choose(&self.work_senders).unwrap().send(entry); | |
} | |
} | |
fn get_work(&mut self) -> Option<WalkEntry> { | |
loop { | |
if self.is_quit_now() { | |
return None | |
} | |
let mut work_queue = self.work_queue.lock().unwrap(); | |
while let Ok(entry) = self.work_receiver.try_recv() { | |
work_queue.push(entry); | |
} | |
let next_entry = work_queue.pop(); | |
match next_entry { | |
Some(entry) => { | |
return Some(entry) | |
}, | |
None => { | |
if self.active_work_count() == 0 { | |
return None | |
} else { | |
if let Some(entries) = self.steal_work() { | |
for each in entries { | |
work_queue.push(each); | |
} | |
} | |
if work_queue.len() == 0 { | |
thread::yield_now(); | |
} | |
} | |
} | |
} | |
} | |
} | |
fn steal_work(&self) -> Option<Vec<WalkEntry>> { | |
if self.steal_from_queues.len() == 0 { | |
return None | |
} | |
let steal_from = thread_rng().choose(&self.steal_from_queues).unwrap(); | |
if let Ok(mut steal_from) = steal_from.try_lock() { | |
if steal_from.len() > 0 { | |
let half = steal_from.len() / 2; | |
let mut stolen_work = Vec::with_capacity(half); | |
let mut return_work = Vec::with_capacity(half); | |
for (i, each) in steal_from.drain().enumerate() { | |
if i % 2 == 0 { | |
stolen_work.push(each); | |
} else { | |
return_work.push(each); | |
} | |
} | |
for each in return_work { | |
steal_from.push(each); | |
} | |
return Some(stolen_work) | |
} | |
} | |
None | |
} | |
fn completed_work(&self) { | |
self.active_work_count.fetch_sub(1, AtomicOrdering::SeqCst); | |
} | |
fn quit_now(&self) { | |
self.quit_now.store(true, AtomicOrdering::SeqCst); | |
} | |
fn is_quit_now(&self) -> bool { | |
self.quit_now.load(AtomicOrdering::SeqCst) | |
} | |
fn active_work_count(&self) -> usize { | |
self.active_work_count.load(AtomicOrdering::SeqCst) | |
} | |
} | |
impl WalkEntry { | |
fn new(path: &Path) -> Option<WalkEntry> { | |
if let (Some(Some(name)), Ok(metadata)) = (path.file_name().map(|n| n.to_str()), fs::metadata(path)) { | |
Some(WalkEntry { | |
name: name.to_string(), | |
path: path.to_path_buf(), | |
index_path: vec![0], | |
child_count: 0, | |
file_type: metadata.file_type(), | |
metadata: Some(metadata), | |
}) | |
} else { | |
None | |
} | |
} | |
pub fn is_dir(&self) -> bool { | |
self.file_type.is_dir() | |
} | |
pub fn is_file(&self) -> bool { | |
self.file_type.is_file() | |
} | |
pub fn is_symlink(&self) -> bool { | |
self.file_type.is_symlink() | |
} | |
pub fn name(&self) -> &str { | |
&self.name | |
} | |
pub fn path(&self) -> &Path { | |
&self.path | |
} | |
pub fn depth(&self) -> usize { | |
self.index_path.len() - 1 | |
} | |
pub fn metadata(&self) -> Option<&Metadata> { | |
match &self.metadata { | |
Some(metadata) => Some(metadata), | |
None => None | |
} | |
} | |
} | |
impl PartialEq for WalkEntry { | |
fn eq(&self, o: &Self) -> bool { | |
self.index_path.eq(&o.index_path) | |
} | |
} | |
impl Eq for WalkEntry {} | |
impl PartialOrd for WalkEntry { | |
fn partial_cmp(&self, o: &Self) -> Option<Ordering> { | |
o.index_path.partial_cmp(&self.index_path) | |
} | |
} | |
impl Ord for WalkEntry { | |
fn cmp(&self, o: &Self) -> Ordering { | |
o.index_path.cmp(&self.index_path) | |
} | |
} | |
impl NextEntry { | |
fn new() -> NextEntry { | |
NextEntry { | |
index_path: vec![0], | |
remaining_siblings: vec![1], | |
} | |
} | |
fn matches(&self, entry: Option<&WalkEntry>) -> bool { | |
entry.map_or(false, |e| { e.index_path == self.index_path }) | |
} | |
fn advance_past(&mut self, entry: &WalkEntry) { | |
// Decrement remaining siblings at this level | |
*self.remaining_siblings.last_mut().unwrap() -= 1; | |
if entry.child_count > 0 { | |
// If visited item has children then push 0 index path, since we are now | |
// looking for the first child. | |
self.index_path.push(0); | |
self.remaining_siblings.push(entry.child_count); | |
} else { | |
// Incrememnt sibling index | |
*self.index_path.last_mut().unwrap() += 1; | |
// If no siblings remain at this level unwind stacks | |
while !self.remaining_siblings.is_empty() && *self.remaining_siblings.last().unwrap() == 0 { | |
self.index_path.pop(); | |
self.remaining_siblings.pop(); | |
// Finished processing level, so increment sibling index | |
if !self.index_path.is_empty() { | |
*self.index_path.last_mut().unwrap() += 1; | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment