Skip to content

Instantly share code, notes, and snippets.

@basiszwo
Last active January 23, 2018 08:10
Show Gist options
  • Save basiszwo/d84612129ee8d62a2c085b5eda6204c4 to your computer and use it in GitHub Desktop.
Save basiszwo/d84612129ee8d62a2c085b5eda6204c4 to your computer and use it in GitHub Desktop.
Load local json and feed to accumulo instance
package one.flak.flinkgeomesa;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
/**
* Created by sb on 16.05.17.
*/
public class FileHelpers {
public static JsonElement readJsonFromFile(String filePath) {
String jsonString;
try {
jsonString = new String(Files.readAllBytes(Paths.get(filePath)), Charset.forName("UTF-8"));
} catch(IOException e) {
jsonString = "";
}
if(jsonString.isEmpty()) {
return null;
} else {
return new JsonParser().parse(jsonString);
}
}
}
package one.flak.flinkgeomesa;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.vividsolutions.jts.geom.Geometry;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Options;
import org.geotools.data.*;
import org.geotools.data.simple.SimpleFeatureStore;
import org.geotools.feature.DefaultFeatureCollection;
import org.geotools.feature.FeatureCollection;
import org.geotools.feature.FeatureIterator;
import org.geotools.feature.simple.SimpleFeatureBuilder;
import org.geotools.filter.text.cql2.CQL;
import org.joda.time.DateTime;
import org.locationtech.geomesa.utils.interop.WKTUtils;
import org.opengis.feature.Feature;
import org.opengis.feature.simple.SimpleFeature;
import org.opengis.feature.simple.SimpleFeatureType;
import org.opengis.filter.Filter;
import java.io.IOException;
import java.util.Date;
import java.util.Map;
public class JsonToAccumuloConverter {
public static void main(String[] args) throws Exception {
System.out.println("READ CMDLINE OPTIONS");
CommandLine cmd = getCmdLineOptions(args);
System.out.println("ACCESSING DATA STORE");
DataStore dataStore = getAccumuloDataStore(cmd);
String filename = cmd.getOptionValue("filePath", "/Users/sb/Development/diploma-thesis/20160901/75226.json");
JsonArray tripSamples = FileHelpers.readJsonFromFile(filename).getAsJsonArray();
// establish specifics concerning the SimpleFeatureType to store
SimpleFeatureType simpleFeatureType = AccumuloConfig.createSimpleFeatureType(AccumuloConfig.SIMPLE_FEATURE_TYPE_NAME);
DefaultFeatureCollection featureCollection = new DefaultFeatureCollection();
Object[] emptyFeature = {};
String tripIdentifier = "75226";
for(JsonElement sample : tripSamples) {
SimpleFeature simpleFeature = SimpleFeatureBuilder.build(simpleFeatureType, emptyFeature, null);
JsonObject json = sample.getAsJsonObject();
long rawTimestamp = (long)(json.get("timestamp").getAsDouble() * 1000);
double latitude = Double.parseDouble(json.get("latitude").getAsString());
double longitude = Double.parseDouble(json.get("longitude").getAsString());
double accelerationZ = Double.parseDouble(json.get("accl_z").getAsString());
simpleFeature.setAttribute("OccuredAt", new DateTime().withMillis(rawTimestamp).toDate());
simpleFeature.setAttribute("TripIdentifier", tripIdentifier);
simpleFeature.setAttribute("AccelerationZ", accelerationZ);
Geometry geometry = WKTUtils.read("POINT(" + latitude + " " + longitude + ")");
simpleFeature.setAttribute("SamplePosition", geometry);
featureCollection.add(simpleFeature);
System.out.println(
"TripId " + tripIdentifier + " | " +
"OccuredAt " + new Date(rawTimestamp) + " | " +
"Position " + latitude + " / " + longitude + " | " +
"AccelerationZ " + accelerationZ + " | "
);
}
System.out.printf("INSERTING %d samples ...", tripSamples.size());
insertFeatures(AccumuloConfig.SIMPLE_FEATURE_TYPE_NAME, dataStore, featureCollection);
System.out.println("DONE");
System.out.println("Reading some values ...");
Filter filter = CQL.toFilter("INCLUDE");
Query query = new Query(AccumuloConfig.SIMPLE_FEATURE_TYPE_NAME, filter);
FeatureSource source = dataStore.getFeatureSource(AccumuloConfig.SIMPLE_FEATURE_TYPE_NAME);
FeatureIterator features = source.getFeatures(query).features();
while(features.hasNext()) {
Feature feature = features.next();
System.out.println(
"TripId " + feature.getProperty("TripIdentifier").getValue() + " | " +
"OccuredAt " + feature.getProperty("OccuredAt").getValue() + " | " +
"Position " + feature.getProperty("SamplePosition").getValue() + " | " +
"AccelerationZ " + feature.getProperty("AccelerationZ").getValue() + " | "
);
}
features.close();
dataStore.dispose();
}
static void insertFeatures(String simpleFeatureTypeName,
DataStore dataStore,
FeatureCollection featureCollection)
throws IOException {
FeatureStore featureStore = (SimpleFeatureStore) dataStore.getFeatureSource(simpleFeatureTypeName);
featureStore.addFeatures(featureCollection);
}
static CommandLine getCmdLineOptions(String[] args) throws Exception {
// find out where -- in Accumulo -- the user wants to store data
CommandLineParser parser = new BasicParser();
Options options = AccumuloConfig.getCommonRequiredOptions();
CommandLine cmd = parser.parse( options, args);
return cmd;
}
static DataStore getAccumuloDataStore(CommandLine cmd) throws Exception {
// verify that we can see this Accumulo destination in a GeoTools manner
Map<String, String> dsConf = AccumuloConfig.getAccumuloDataStoreConf(cmd);
DataStore dataStore = DataStoreFinder.getDataStore(dsConf);
assert dataStore != null;
return dataStore;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment