Created
June 8, 2017 19:21
-
-
Save xrl/bcd2f1b9762a7f0a746cfefb283b9a9a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
LP-XLANGE-OSX:tmp xlange$ echo "hi" >> deleteme/file.1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//use super::DockerLogLine; | |
//use std::fs::File; | |
//use std::io::{ BufReader, BufRead }; | |
use std::path::PathBuf; | |
use std::thread; | |
use notify::{Watcher, RecursiveMode, raw_watcher, op}; | |
use std::sync::mpsc as std_mpsc; | |
use futures::sync::mpsc as futures_mpsc; | |
use futures::{ self, sink, Sink, Future }; | |
use futures::future::FutureResult; | |
use tokio_core::reactor::Remote; | |
//use futures_cpupool::CpuPool; | |
pub struct DockerLogsWatcher { | |
path: String | |
} | |
impl DockerLogsWatcher { | |
pub fn new(path: String) -> DockerLogsWatcher { | |
DockerLogsWatcher { | |
path: path | |
} | |
} | |
pub fn channel_v2(&self, remote: &mut Remote) -> (futures_mpsc::Receiver<futures::future::FutureResult<PathBuf, String>>, thread::JoinHandle<()>) { | |
let (futures_tx, futures_rx) = futures_mpsc::channel(1); | |
let path = self.path.clone(); | |
let remote = remote.clone(); | |
let guard : thread::JoinHandle<()> = thread::spawn(move || { | |
let (tx, rx) = std_mpsc::channel(); | |
let mut watcher = raw_watcher(tx).unwrap(); | |
watcher.watch(&path[..], RecursiveMode::NonRecursive).unwrap(); | |
loop { | |
match rx.recv() { | |
Ok(event) => { | |
let futures_tx = futures_tx.clone(); | |
if event.path.is_some() && event.op.is_ok() && (event.op.as_ref().unwrap().contains(op::WRITE) || event.op.as_ref().unwrap().contains(op::REMOVE) || event.op.as_ref().unwrap().contains(op::CREATE)) { | |
println!("sending..."); | |
remote.spawn(move |_|{ | |
// unimplemented!(); | |
println!("inside of spawn... sending an OK"); | |
futures::future::ok(()) | |
}); | |
let thing : sink::Send< futures_mpsc::Sender<FutureResult<PathBuf,String>> > = futures_tx.send(futures::future::ok(event.path.unwrap())); | |
} else { | |
println!("broken event v2: {:?}", event); | |
futures_tx.send(futures::future::err("problem!".to_owned())); | |
} | |
}, | |
Err(e) => println!("watch error: {:?}", e), | |
} | |
} | |
}); | |
(futures_rx, guard) | |
} | |
} | |
#[cfg(test)] | |
mod tests { | |
use super::*; | |
// use super::super::DockerLogLine; | |
#[test] | |
pub fn doit(){ | |
let dl = DockerLogsWatcher::new("/tmp/deleteme".to_string()); | |
dl.doit(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//{"log":"time=\"2017-05-02T15:56:35Z\" level=info msg=\"PurgeUploads starting: olderThan=2017-04-25 15:56:35.846514213 +0000 UTC, actuallyDelete=true\" \n","stream":"stdout","time":"2017-05-02T15:56:35.846897572Z"} | |
//{"log":"time=\"2017-05-02T15:56:35Z\" level=info msg=\"Purge uploads finished. Num deleted=0, num errors=0\" \n","stream":"stdout","time":"2017-05-02T15:56:35.849976968Z"} | |
//{"log":"time=\"2017-05-02T15:56:35Z\" level=info msg=\"Starting upload purge in 24h0m0s\" go.version=go1.7.3 instance.id=e930db88-e8b8-47b0-ade1-20183a3167b4 version=v2.6.0 \n","stream":"stdout","time":"2017-05-02T15:56:35.850005422Z"} | |
extern crate whaletail; | |
use whaletail::DockerLogsWatcher; | |
extern crate tokio_core; | |
use tokio_core::reactor::Core; | |
extern crate futures; | |
use futures::{Stream}; | |
// http://hermanradtke.com/2017/03/03/future-mpsc-queue-with-tokio.html | |
fn main() { | |
let mut core = Core::new().expect("Failed to create core"); | |
let mut remote_core = core.remote(); | |
let dl = DockerLogsWatcher::new("/tmp/deleteme".to_owned()); | |
let (rx,handle) = dl.channel_v2(&mut remote_core); | |
let job = rx.for_each(|_|{ | |
println!("thing happened"); | |
Ok(()) | |
}); | |
core.run(job).expect("failed to run"); | |
handle.join().unwrap(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/Users/xlange/.cargo/bin/cargo run --color=always --bin whaletail | |
Compiling whaletail v0.1.0 (file:///Users/xlange/IdeaProjects/whale-tail) | |
Finished dev [unoptimized + debuginfo] target(s) in 3.15 secs | |
Running `target/debug/whaletail` | |
sending... | |
inside of spawn... sending an OK |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment