Skip to content

Instantly share code, notes, and snippets.

@cbonesana
Created August 12, 2018 16:18
Show Gist options
  • Save cbonesana/b955970a918d4fbd9611c060d532ce0d to your computer and use it in GitHub Desktop.
Save cbonesana/b955970a918d4fbd9611c060d532ce0d to your computer and use it in GitHub Desktop.
Just an experimental project to work with a relative big amount of tweets from a single JSON file compressed in BZip2. The idea is to work in parallel on many tweets and avoiding to load everything in memory.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>test</groupId>
<artifactId>tweet</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<!-- Used to decompress BZip2 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.0</version>
</dependency>
<!-- Manipulate the jsons -->
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20180130</version>
</dependency>
<!-- Log4j2 as logger -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.11.1</version>
</dependency>
</dependencies>
</project>
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
import java.io.*;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.Date;
import java.util.Locale;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Author: Claudio "Dna" Bonesana
* Date: 12.08.2018 16:46
* Project: test-tweet
*/
public class TweetDecompressApplication {
private static final Logger log = LogManager.getLogger(TweetDecompressApplication.class);
// this is a single file containing a JSON for each line
private static final String source = "tweets.json.bz2";
public static void main(String[] args) {
/*
* The idea of this code is very simple: parse a BZip2 compressed file (~161MB, 1.6GB decompressed)
* of JSONs representing tweets. Using an ExecutorService, perform parallel operations on the tweets,
* in this case, just extract some fields and print them.
*/
// get the number of available physical cores
int cores = Runtime.getRuntime().availableProcessors();
// we want to keep one core free so we don't overload the machine
ExecutorService exec = Executors.newFixedThreadPool(cores - 1);
// Matrioska time!
try (
// stream data from the input file
InputStream fis = new FileInputStream(source);
// decompress the input stream
InputStream gzis = new BZip2CompressorInputStream(fis);
// add a stream reader
Reader isr = new InputStreamReader(gzis);
// bufferize the stream reader
BufferedReader br = new BufferedReader(isr)
) {
// if we have a string date, we expect this format in english
final SimpleDateFormat sdf =
new SimpleDateFormat("EEE MMM dd HH:mm:ss ZZZZZ yyyy", Locale.ENGLISH);
String line;
while ((line = br.readLine()) != null) {
// convert a line to a JSONObect
final JSONObject jo = new JSONObject(line);
// submit a job to the ExecutorService. The job is described with a Lambda function
exec.submit(() -> {
// for safety, surround everything with a try-catch
try {
// extract some data from the tweets
long id = jo.getLong("id");
long userId = jo.getLong("user_id");
String text = jo.getString("text");
String lang = jo.getString("lang");
int retweet = jo.getInt("retweet_count");
int favourites = jo.getInt("favorite_count");
// parse the date and cosider two cases:
Date date;
String strDate = jo.optString("created_at", null);
// the date is in string format
try {
date = sdf.parse(strDate);
} catch (ParseException e) {
log.warn("Cannot parse date " + strDate, e);
// by failing we assume that the date is in numeric (long) format
long longDate = jo.getLong("created_at");
Instant i = Instant.ofEpochSecond(longDate);
date = Date.from(i);
}
// print to standard output.
System.out.println(String.format(
"%10d %10d %5s %7d %7d %s %s",
id, userId, lang, retweet, favourites, date, text
));
// TODO: add extra operations there...
} catch (Exception e) {
log.fatal("Something very bad happened!", e);
}
});
}
} catch (IOException e) {
log.error("IO Error: ", e);
} finally {
// remember to ALWAYS shutdown the executor!
exec.shutdown();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment