Created
August 12, 2018 16:18
-
-
Save cbonesana/b955970a918d4fbd9611c060d532ce0d to your computer and use it in GitHub Desktop.
Just an experimental project to work with a relative big amount of tweets from a single JSON file compressed in BZip2. The idea is to work in parallel on many tweets and avoiding to load everything in memory.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<project xmlns="http://maven.apache.org/POM/4.0.0" | |
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<groupId>test</groupId> | |
<artifactId>tweet</artifactId> | |
<version>1.0-SNAPSHOT</version> | |
<build> | |
<plugins> | |
<plugin> | |
<groupId>org.apache.maven.plugins</groupId> | |
<artifactId>maven-compiler-plugin</artifactId> | |
<configuration> | |
<source>8</source> | |
<target>8</target> | |
</configuration> | |
</plugin> | |
</plugins> | |
</build> | |
<dependencies> | |
<!-- Used to decompress BZip2 --> | |
<dependency> | |
<groupId>org.apache.commons</groupId> | |
<artifactId>commons-compress</artifactId> | |
<version>1.0</version> | |
</dependency> | |
<!-- Manipulate the jsons --> | |
<dependency> | |
<groupId>org.json</groupId> | |
<artifactId>json</artifactId> | |
<version>20180130</version> | |
</dependency> | |
<!-- Log4j2 as logger --> | |
<dependency> | |
<groupId>org.apache.logging.log4j</groupId> | |
<artifactId>log4j-api</artifactId> | |
<version>2.11.1</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.logging.log4j</groupId> | |
<artifactId>log4j-core</artifactId> | |
<version>2.11.1</version> | |
</dependency> | |
</dependencies> | |
</project> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; | |
import org.apache.logging.log4j.LogManager; | |
import org.apache.logging.log4j.Logger; | |
import org.json.JSONObject; | |
import java.io.*; | |
import java.text.ParseException; | |
import java.text.SimpleDateFormat; | |
import java.time.Instant; | |
import java.util.Date; | |
import java.util.Locale; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
/** | |
* Author: Claudio "Dna" Bonesana | |
* Date: 12.08.2018 16:46 | |
* Project: test-tweet | |
*/ | |
public class TweetDecompressApplication { | |
private static final Logger log = LogManager.getLogger(TweetDecompressApplication.class); | |
// this is a single file containing a JSON for each line | |
private static final String source = "tweets.json.bz2"; | |
public static void main(String[] args) { | |
/* | |
* The idea of this code is very simple: parse a BZip2 compressed file (~161MB, 1.6GB decompressed) | |
* of JSONs representing tweets. Using an ExecutorService, perform parallel operations on the tweets, | |
* in this case, just extract some fields and print them. | |
*/ | |
// get the number of available physical cores | |
int cores = Runtime.getRuntime().availableProcessors(); | |
// we want to keep one core free so we don't overload the machine | |
ExecutorService exec = Executors.newFixedThreadPool(cores - 1); | |
// Matrioska time! | |
try ( | |
// stream data from the input file | |
InputStream fis = new FileInputStream(source); | |
// decompress the input stream | |
InputStream gzis = new BZip2CompressorInputStream(fis); | |
// add a stream reader | |
Reader isr = new InputStreamReader(gzis); | |
// bufferize the stream reader | |
BufferedReader br = new BufferedReader(isr) | |
) { | |
// if we have a string date, we expect this format in english | |
final SimpleDateFormat sdf = | |
new SimpleDateFormat("EEE MMM dd HH:mm:ss ZZZZZ yyyy", Locale.ENGLISH); | |
String line; | |
while ((line = br.readLine()) != null) { | |
// convert a line to a JSONObect | |
final JSONObject jo = new JSONObject(line); | |
// submit a job to the ExecutorService. The job is described with a Lambda function | |
exec.submit(() -> { | |
// for safety, surround everything with a try-catch | |
try { | |
// extract some data from the tweets | |
long id = jo.getLong("id"); | |
long userId = jo.getLong("user_id"); | |
String text = jo.getString("text"); | |
String lang = jo.getString("lang"); | |
int retweet = jo.getInt("retweet_count"); | |
int favourites = jo.getInt("favorite_count"); | |
// parse the date and cosider two cases: | |
Date date; | |
String strDate = jo.optString("created_at", null); | |
// the date is in string format | |
try { | |
date = sdf.parse(strDate); | |
} catch (ParseException e) { | |
log.warn("Cannot parse date " + strDate, e); | |
// by failing we assume that the date is in numeric (long) format | |
long longDate = jo.getLong("created_at"); | |
Instant i = Instant.ofEpochSecond(longDate); | |
date = Date.from(i); | |
} | |
// print to standard output. | |
System.out.println(String.format( | |
"%10d %10d %5s %7d %7d %s %s", | |
id, userId, lang, retweet, favourites, date, text | |
)); | |
// TODO: add extra operations there... | |
} catch (Exception e) { | |
log.fatal("Something very bad happened!", e); | |
} | |
}); | |
} | |
} catch (IOException e) { | |
log.error("IO Error: ", e); | |
} finally { | |
// remember to ALWAYS shutdown the executor! | |
exec.shutdown(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment