Skip to content

Instantly share code, notes, and snippets.

@chaudum
Last active September 7, 2017 11:35
Show Gist options
  • Save chaudum/ee69e835faaec3b4f8a929aecd7630bd to your computer and use it in GitHub Desktop.
Save chaudum/ee69e835faaec3b4f8a929aecd7630bd to your computer and use it in GitHub Desktop.
TSV Importer

README

Testing simple batch inserts using the Crate JDBC driver.

Project structure

.
├── build.gradle
├── gen_tsv.py
├── gradle
│   └── wrapper
│       ├── gradle-wrapper.jar
│       └── gradle-wrapper.properties
├── gradlew
├── gradlew.bat
├── settings.gradle
├── src
│   ├── main
│   │   ├── java
│   │   │   └── at
│   │   │       └── chaudum
│   │   │           └── tsvjdbc
│   │   │               └── TsvImporter.java
│   │   └── resources
│   └── test
│       ├── java
│       └── resources
└── test_10m.tsv

Generate tsv file

python gen_tsv.py --count 10000000 > test_10m.tsv

Results

Running CrateDB on the same machine with 4G of heap memory and default configuration:

CRATE_HEAP_SIZE=4g crate/bin/crate

Starting the Java application from within IntelliJ IDEA.

--- Processing took: 508192ms
--- Records inserted: 10000000
#!/usr/bin/env python3
import sys
import random
import argparse
from time import mktime
from datetime import datetime
def parse_args():
parser = argparse.ArgumentParser(description='')
parser.add_argument('--count', type=int, required=True)
return parser.parse_args()
def main():
params = parse_args()
for x in range(params.count):
line = [
int(mktime(datetime.utcnow().timetuple())) * 1000,
[random.randint(-180, 180), random.randint(-90, 90)],
random.random() * 100.0
]
print('\t'.join([str(l) for l in line]))
if __name__ == '__main__':
sys.exit(main())
package at.chaudum.tsvjdbc;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.sql.*;
import java.util.function.Consumer;
import java.util.stream.Stream;
public class TsvImporter {
private static final String JDBC_CONNECTION_URL = "jdbc:crate://localhost:5432/?user=crate&prepareThreshold=0";
private static final String INSERT_QUERY = "INSERT INTO t1(\"time\", \"location\", \"speed\") VALUES (?, ?, ?)";
public static void main(String[] args) {
if (args.length != 1) {
System.err.println("This program requires exactly one argument (file)!");
return;
}
String filePath = args[0];
try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {
try (Connection conn = connect()) {
conn.setAutoCommit(true);
try (Statement stmt = conn.createStatement()) {
stmt.execute("DROP TABLE IF EXISTS t1");
stmt.execute("CREATE TABLE IF NOT EXISTS t1 (" +
"\"time\" TIMESTAMP," +
"\"location\" GEO_POINT," +
"\"speed\" DOUBLE" +
") WITH (number_of_replicas=0)");
}
Stream<String> lines = reader.lines();
long startTime = System.currentTimeMillis();
lines.forEach(new TsvLineConsumer(conn, 5_000));
long duration = System.currentTimeMillis() - startTime;
System.out.println("--- Processing took: " + String.valueOf(duration) + "ms");
try (Statement stmt = conn.createStatement()) {
stmt.execute("REFRESH TABLE t1");
ResultSet rs = stmt.executeQuery("SELECT COUNT(*) FROM t1");
rs.next();
Long insertCount = rs.getLong(1);
System.out.println("--- Records inserted: " + String.valueOf(insertCount));
}
} catch (SQLException e) {
e.printStackTrace();
}
} catch (IOException e) {
e.printStackTrace();
}
}
private static Connection connect() throws SQLException {
return DriverManager.getConnection(JDBC_CONNECTION_URL, null);
}
static class TsvLineConsumer implements Consumer<String> {
private final int batchSize;
private final PreparedStatement stmt;
private final Connection conn;
private long counter;
TsvLineConsumer(Connection conn, int batchSize) throws SQLException {
counter = 0;
stmt = conn.prepareStatement(INSERT_QUERY);
this.batchSize = batchSize;
this.conn = conn;
}
@Override
public void accept(String s) {
Object[] data = parseLine(s);
try {
stmt.setLong(1, (long) data[0]);
stmt.setArray(2, conn.createArrayOf("double", new Object[] { data[1], data[2] }));
stmt.setDouble(3, (double) data[3]);
stmt.addBatch();
counter++;
if (counter % batchSize == 0) {
stmt.executeBatch();
System.out.println("--- Executed " + String.valueOf(counter));
}
} catch (SQLException e) {
System.out.println("--- Execution error");
e.printStackTrace();
}
}
private static Object[] parseLine(String line) {
String[] parts = line.split("\t");
String[] loc = parts[1].substring(1, parts[1].length() - 1).split(", ");
return new Object[] {
Long.valueOf(parts[0]),
Double.valueOf(loc[0]),
Double.valueOf(loc[1]),
Double.valueOf(parts[2])
};
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment