Last active
August 28, 2020 16:37
-
-
Save Meshiest/cac7408ad2387e6d4fca97b1a82d14fa to your computer and use it in GitHub Desktop.
parses archive.org tweet archives (.tar -> .json.bz2 -> tweet)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use bzip2::read::BzDecoder; | |
use crossbeam_queue::SegQueue; | |
use glob::glob; | |
use serde_json::Value; | |
use std::{ | |
fs::{remove_file, File}, | |
io::{BufRead, BufReader}, | |
sync::{ | |
atomic::{AtomicIsize, Ordering}, | |
Arc, | |
}, | |
thread, time, | |
}; | |
use tar::Archive; | |
use threadpool::ThreadPool; | |
// the kind of work the threadpool can do | |
enum Work { | |
// untar from a list of tar files | |
Archive(Vec<String>), | |
// unbzip2 a .json.bz2 and read each line | |
Tweets(String), | |
} | |
// number of thread pool workers | |
const WORKER_COUNT: usize = 16; | |
fn main() { | |
let pool = ThreadPool::new(WORKER_COUNT); // thread pool | |
let work = Arc::new(SegQueue::new()); // work queue | |
let num_workers = Arc::new(AtomicIsize::new(0)); // number of currently active threads | |
// scan the available tars, add them to the work queue as the first job | |
let tars = glob_vec("./tars/*.tar"); | |
work.push(Work::Archive(tars)); | |
let workers = num_workers.clone(); | |
let work_clone = work.clone(); | |
// iterate through each of them | |
while !work_clone.is_empty() || workers.load(Ordering::SeqCst) != 0 { | |
// jobs are running and there is no work at the moment | |
if work_clone.is_empty() { | |
// sleep for 50ms | |
thread::sleep(time::Duration::from_millis(50)); | |
continue; | |
} | |
// keep track of an active worker | |
let workers = num_workers.clone(); | |
workers.fetch_add(1, Ordering::Acquire); | |
// pop the job from the queue | |
let job = work_clone.pop().unwrap(); | |
let work = work.clone(); | |
// hand the job off to the thread pool | |
pool.execute(move || { | |
// determine which kind of job to do | |
match job { | |
// this job untars and archive | |
Work::Archive(mut tars) => { | |
let path = tars.pop().unwrap(); | |
// open the archive | |
let tar = File::open(path).unwrap(); | |
let mut archive = Archive::new(tar); | |
// iterate across the files inside | |
for entry in archive.entries().unwrap() { | |
if let Ok(mut entry) = entry { | |
let path = entry.path().unwrap().to_owned().to_path_buf(); | |
// find .json.bz2 entries (not directories) | |
let is_bz = path.to_str().unwrap().ends_with(".json.bz2"); | |
if is_bz { | |
// unpack into the tweets folder | |
entry.unpack_in("tweets").unwrap(); | |
println!("[tar] {}", path.display()); | |
// add the newly unpacked .json.bz2 to the work queue | |
work.push(Work::Tweets(format!("tweets/{}", path.display()))); | |
} | |
} | |
} | |
// as long as there are more tars to untar, keep working | |
if !tars.is_empty() { | |
// add the rest of the tars back to the job queue | |
work.push(Work::Archive(tars)) | |
} | |
} | |
Work::Tweets(path) => { | |
// unzip the file | |
let file_bz = File::open(path.clone()).unwrap(); | |
let file = BzDecoder::new(file_bz); | |
let reader = BufReader::new(file); | |
println!("[bz2] {}", path); | |
// read each line | |
for l in reader.lines() { | |
let line = l.unwrap(); | |
// parse the line as json | |
let parsed: serde_json::Result<Value> = serde_json::from_str(&line); | |
if let Ok(tweet) = parsed { | |
// check if it has geo location and is in the US | |
if tweet["geo"] != Value::Null && tweet["place"]["country_code"] == "US" | |
{ | |
println!("{}", tweet); | |
} | |
} | |
} | |
remove_file(path).expect("error removing file") | |
} | |
} | |
// this worker has finished its job | |
workers.fetch_add(-1, Ordering::Release); | |
}) | |
} | |
pool.join(); | |
} | |
// glob a path, return a vec of paths | |
fn glob_vec(pattern: &str) -> Vec<String> { | |
glob(pattern) | |
.expect("Failed to read glob pattern") | |
.filter_map(Result::ok) | |
.map(|p| p.to_str().unwrap().to_string()) | |
.collect::<Vec<String>>() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment