Skip to content

Instantly share code, notes, and snippets.

@Meshiest
Last active August 28, 2020 16:37
Show Gist options
  • Save Meshiest/cac7408ad2387e6d4fca97b1a82d14fa to your computer and use it in GitHub Desktop.
Save Meshiest/cac7408ad2387e6d4fca97b1a82d14fa to your computer and use it in GitHub Desktop.
parses archive.org tweet archives (.tar -> .json.bz2 -> tweet)
use bzip2::read::BzDecoder;
use crossbeam_queue::SegQueue;
use glob::glob;
use serde_json::Value;
use std::{
fs::{remove_file, File},
io::{BufRead, BufReader},
sync::{
atomic::{AtomicIsize, Ordering},
Arc,
},
thread, time,
};
use tar::Archive;
use threadpool::ThreadPool;
// the kind of work the threadpool can do
enum Work {
// untar from a list of tar files
Archive(Vec<String>),
// unbzip2 a .json.bz2 and read each line
Tweets(String),
}
// number of thread pool workers
const WORKER_COUNT: usize = 16;
fn main() {
let pool = ThreadPool::new(WORKER_COUNT); // thread pool
let work = Arc::new(SegQueue::new()); // work queue
let num_workers = Arc::new(AtomicIsize::new(0)); // number of currently active threads
// scan the available tars, add them to the work queue as the first job
let tars = glob_vec("./tars/*.tar");
work.push(Work::Archive(tars));
let workers = num_workers.clone();
let work_clone = work.clone();
// iterate through each of them
while !work_clone.is_empty() || workers.load(Ordering::SeqCst) != 0 {
// jobs are running and there is no work at the moment
if work_clone.is_empty() {
// sleep for 50ms
thread::sleep(time::Duration::from_millis(50));
continue;
}
// keep track of an active worker
let workers = num_workers.clone();
workers.fetch_add(1, Ordering::Acquire);
// pop the job from the queue
let job = work_clone.pop().unwrap();
let work = work.clone();
// hand the job off to the thread pool
pool.execute(move || {
// determine which kind of job to do
match job {
// this job untars and archive
Work::Archive(mut tars) => {
let path = tars.pop().unwrap();
// open the archive
let tar = File::open(path).unwrap();
let mut archive = Archive::new(tar);
// iterate across the files inside
for entry in archive.entries().unwrap() {
if let Ok(mut entry) = entry {
let path = entry.path().unwrap().to_owned().to_path_buf();
// find .json.bz2 entries (not directories)
let is_bz = path.to_str().unwrap().ends_with(".json.bz2");
if is_bz {
// unpack into the tweets folder
entry.unpack_in("tweets").unwrap();
println!("[tar] {}", path.display());
// add the newly unpacked .json.bz2 to the work queue
work.push(Work::Tweets(format!("tweets/{}", path.display())));
}
}
}
// as long as there are more tars to untar, keep working
if !tars.is_empty() {
// add the rest of the tars back to the job queue
work.push(Work::Archive(tars))
}
}
Work::Tweets(path) => {
// unzip the file
let file_bz = File::open(path.clone()).unwrap();
let file = BzDecoder::new(file_bz);
let reader = BufReader::new(file);
println!("[bz2] {}", path);
// read each line
for l in reader.lines() {
let line = l.unwrap();
// parse the line as json
let parsed: serde_json::Result<Value> = serde_json::from_str(&line);
if let Ok(tweet) = parsed {
// check if it has geo location and is in the US
if tweet["geo"] != Value::Null && tweet["place"]["country_code"] == "US"
{
println!("{}", tweet);
}
}
}
remove_file(path).expect("error removing file")
}
}
// this worker has finished its job
workers.fetch_add(-1, Ordering::Release);
})
}
pool.join();
}
// glob a path, return a vec of paths
fn glob_vec(pattern: &str) -> Vec<String> {
glob(pattern)
.expect("Failed to read glob pattern")
.filter_map(Result::ok)
.map(|p| p.to_str().unwrap().to_string())
.collect::<Vec<String>>()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment