|
package at.chaudum.tsvjdbc; |
|
|
|
import java.io.BufferedReader; |
|
import java.io.FileReader; |
|
import java.io.IOException; |
|
import java.sql.*; |
|
import java.util.function.Consumer; |
|
import java.util.stream.Stream; |
|
|
|
public class TsvImporter { |
|
|
|
private static final String JDBC_CONNECTION_URL = "jdbc:crate://localhost:5432/?user=crate&prepareThreshold=0"; |
|
private static final String INSERT_QUERY = "INSERT INTO t1(\"time\", \"location\", \"speed\") VALUES (?, ?, ?)"; |
|
|
|
public static void main(String[] args) { |
|
if (args.length != 1) { |
|
System.err.println("This program requires exactly one argument (file)!"); |
|
return; |
|
} |
|
String filePath = args[0]; |
|
try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) { |
|
try (Connection conn = connect()) { |
|
conn.setAutoCommit(true); |
|
try (Statement stmt = conn.createStatement()) { |
|
stmt.execute("DROP TABLE IF EXISTS t1"); |
|
stmt.execute("CREATE TABLE IF NOT EXISTS t1 (" + |
|
"\"time\" TIMESTAMP," + |
|
"\"location\" GEO_POINT," + |
|
"\"speed\" DOUBLE" + |
|
") WITH (number_of_replicas=0)"); |
|
} |
|
Stream<String> lines = reader.lines(); |
|
long startTime = System.currentTimeMillis(); |
|
lines.forEach(new TsvLineConsumer(conn, 5_000)); |
|
long duration = System.currentTimeMillis() - startTime; |
|
System.out.println("--- Processing took: " + String.valueOf(duration) + "ms"); |
|
try (Statement stmt = conn.createStatement()) { |
|
stmt.execute("REFRESH TABLE t1"); |
|
ResultSet rs = stmt.executeQuery("SELECT COUNT(*) FROM t1"); |
|
rs.next(); |
|
Long insertCount = rs.getLong(1); |
|
System.out.println("--- Records inserted: " + String.valueOf(insertCount)); |
|
} |
|
} catch (SQLException e) { |
|
e.printStackTrace(); |
|
} |
|
} catch (IOException e) { |
|
e.printStackTrace(); |
|
} |
|
} |
|
|
|
private static Connection connect() throws SQLException { |
|
return DriverManager.getConnection(JDBC_CONNECTION_URL, null); |
|
} |
|
|
|
static class TsvLineConsumer implements Consumer<String> { |
|
|
|
private final int batchSize; |
|
private final PreparedStatement stmt; |
|
private final Connection conn; |
|
private long counter; |
|
|
|
TsvLineConsumer(Connection conn, int batchSize) throws SQLException { |
|
counter = 0; |
|
stmt = conn.prepareStatement(INSERT_QUERY); |
|
this.batchSize = batchSize; |
|
this.conn = conn; |
|
} |
|
|
|
@Override |
|
public void accept(String s) { |
|
Object[] data = parseLine(s); |
|
try { |
|
stmt.setLong(1, (long) data[0]); |
|
stmt.setArray(2, conn.createArrayOf("double", new Object[] { data[1], data[2] })); |
|
stmt.setDouble(3, (double) data[3]); |
|
stmt.addBatch(); |
|
counter++; |
|
if (counter % batchSize == 0) { |
|
stmt.executeBatch(); |
|
System.out.println("--- Executed " + String.valueOf(counter)); |
|
} |
|
} catch (SQLException e) { |
|
System.out.println("--- Execution error"); |
|
e.printStackTrace(); |
|
} |
|
} |
|
|
|
private static Object[] parseLine(String line) { |
|
String[] parts = line.split("\t"); |
|
String[] loc = parts[1].substring(1, parts[1].length() - 1).split(", "); |
|
return new Object[] { |
|
Long.valueOf(parts[0]), |
|
Double.valueOf(loc[0]), |
|
Double.valueOf(loc[1]), |
|
Double.valueOf(parts[2]) |
|
}; |
|
} |
|
} |
|
} |