Skip to content

Instantly share code, notes, and snippets.

@xrl
Created June 8, 2017 19:21
Show Gist options
  • Save xrl/bcd2f1b9762a7f0a746cfefb283b9a9a to your computer and use it in GitHub Desktop.
Save xrl/bcd2f1b9762a7f0a746cfefb283b9a9a to your computer and use it in GitHub Desktop.
LP-XLANGE-OSX:tmp xlange$ echo "hi" >> deleteme/file.1
//use super::DockerLogLine;
//use std::fs::File;
//use std::io::{ BufReader, BufRead };
use std::path::PathBuf;
use std::thread;
use notify::{Watcher, RecursiveMode, raw_watcher, op};
use std::sync::mpsc as std_mpsc;
use futures::sync::mpsc as futures_mpsc;
use futures::{ self, sink, Sink, Future };
use futures::future::FutureResult;
use tokio_core::reactor::Remote;
//use futures_cpupool::CpuPool;
pub struct DockerLogsWatcher {
path: String
}
impl DockerLogsWatcher {
pub fn new(path: String) -> DockerLogsWatcher {
DockerLogsWatcher {
path: path
}
}
pub fn channel_v2(&self, remote: &mut Remote) -> (futures_mpsc::Receiver<futures::future::FutureResult<PathBuf, String>>, thread::JoinHandle<()>) {
let (futures_tx, futures_rx) = futures_mpsc::channel(1);
let path = self.path.clone();
let remote = remote.clone();
let guard : thread::JoinHandle<()> = thread::spawn(move || {
let (tx, rx) = std_mpsc::channel();
let mut watcher = raw_watcher(tx).unwrap();
watcher.watch(&path[..], RecursiveMode::NonRecursive).unwrap();
loop {
match rx.recv() {
Ok(event) => {
let futures_tx = futures_tx.clone();
if event.path.is_some() && event.op.is_ok() && (event.op.as_ref().unwrap().contains(op::WRITE) || event.op.as_ref().unwrap().contains(op::REMOVE) || event.op.as_ref().unwrap().contains(op::CREATE)) {
println!("sending...");
remote.spawn(move |_|{
// unimplemented!();
println!("inside of spawn... sending an OK");
futures::future::ok(())
});
let thing : sink::Send< futures_mpsc::Sender<FutureResult<PathBuf,String>> > = futures_tx.send(futures::future::ok(event.path.unwrap()));
} else {
println!("broken event v2: {:?}", event);
futures_tx.send(futures::future::err("problem!".to_owned()));
}
},
Err(e) => println!("watch error: {:?}", e),
}
}
});
(futures_rx, guard)
}
}
#[cfg(test)]
mod tests {
use super::*;
// use super::super::DockerLogLine;
#[test]
pub fn doit(){
let dl = DockerLogsWatcher::new("/tmp/deleteme".to_string());
dl.doit();
}
}
//{"log":"time=\"2017-05-02T15:56:35Z\" level=info msg=\"PurgeUploads starting: olderThan=2017-04-25 15:56:35.846514213 +0000 UTC, actuallyDelete=true\" \n","stream":"stdout","time":"2017-05-02T15:56:35.846897572Z"}
//{"log":"time=\"2017-05-02T15:56:35Z\" level=info msg=\"Purge uploads finished. Num deleted=0, num errors=0\" \n","stream":"stdout","time":"2017-05-02T15:56:35.849976968Z"}
//{"log":"time=\"2017-05-02T15:56:35Z\" level=info msg=\"Starting upload purge in 24h0m0s\" go.version=go1.7.3 instance.id=e930db88-e8b8-47b0-ade1-20183a3167b4 version=v2.6.0 \n","stream":"stdout","time":"2017-05-02T15:56:35.850005422Z"}
extern crate whaletail;
use whaletail::DockerLogsWatcher;
extern crate tokio_core;
use tokio_core::reactor::Core;
extern crate futures;
use futures::{Stream};
// http://hermanradtke.com/2017/03/03/future-mpsc-queue-with-tokio.html
fn main() {
let mut core = Core::new().expect("Failed to create core");
let mut remote_core = core.remote();
let dl = DockerLogsWatcher::new("/tmp/deleteme".to_owned());
let (rx,handle) = dl.channel_v2(&mut remote_core);
let job = rx.for_each(|_|{
println!("thing happened");
Ok(())
});
core.run(job).expect("failed to run");
handle.join().unwrap();
}
/Users/xlange/.cargo/bin/cargo run --color=always --bin whaletail
Compiling whaletail v0.1.0 (file:///Users/xlange/IdeaProjects/whale-tail)
Finished dev [unoptimized + debuginfo] target(s) in 3.15 secs
Running `target/debug/whaletail`
sending...
inside of spawn... sending an OK
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment