Skip to content

Instantly share code, notes, and snippets.

@jayhuang75
Last active September 13, 2020 22:24
Show Gist options
  • Select an option

  • Save jayhuang75/ebae5886dd7e0fbf4c569f1b974c7df2 to your computer and use it in GitHub Desktop.

Select an option

Save jayhuang75/ebae5886dd7e0fbf4c569f1b974c7df2 to your computer and use it in GitHub Desktop.
main execution logic and flow
#[tokio::main]
async fn main() {
// prepare local env
dotenv().ok();
// set the interval for every 20s
let mut interval = time::interval(time::Duration::from_secs(20));
loop {
// wait every 20s
interval.tick().await;
// application start time
let start = Instant::now();
// prepare the channel
let (tx, rx) = channel::unbounded();
// prepare database
let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
let context = SqliteConnection::establish(&database_url)
.unwrap_or_else(|_| panic!("Error connecting to {}", database_url));
let mut sqlite_db = db::DbContext::new(&context);
let counter = sqlite_db.count().unwrap() as usize;
println!("[main.rs] total apps in db : {}", counter);
// get the current application logs
let records = hadoop_cmd::run().expect("running hdfs dfs -ls -R /app-logs error");
println!("[main.rs] current log total count: {:?}", records.len());
// prepare the worker pool
// for demo only get all the available cpu
// it may not need that.
let pool = ThreadPool::new(num_cpus::get());
if records.len() > counter {
// 1. get the delta
let delta_new = records.len() - counter;
println!("[main.rs] delta is : {:?}", delta_new);
// 2. process the logs
records.into_iter().take(delta_new).for_each(|line| {
println!("[main.rs] process : {:?}", line.app_id);
let tx = tx.clone();
pool.execute(move || {
yarn_cmd::run(tx, &line).unwrap();
});
});
}
drop(tx);
// Success process the logs then update the sqlite db
rx.into_iter().for_each(|_line: yarn_cmd::BatchCTL| {
println!(
"[main.rs] new new logs {:?} metrics count : {:?}",
_line.app.app_id,
_line.metrics.len()
);
sqlite_db.insert(&_line.app).unwrap();
});
let duration = start.elapsed();
println!("[main.rs] time elapsed in run() is: {:?}", duration);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment