Last active
January 23, 2018 08:10
-
-
Save basiszwo/d84612129ee8d62a2c085b5eda6204c4 to your computer and use it in GitHub Desktop.
Load local json and feed to accumulo instance
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package one.flak.flinkgeomesa; | |
import com.google.gson.JsonElement; | |
import com.google.gson.JsonParser; | |
import java.io.IOException; | |
import java.nio.charset.Charset; | |
import java.nio.file.Files; | |
import java.nio.file.Paths; | |
/** | |
* Created by sb on 16.05.17. | |
*/ | |
public class FileHelpers { | |
public static JsonElement readJsonFromFile(String filePath) { | |
String jsonString; | |
try { | |
jsonString = new String(Files.readAllBytes(Paths.get(filePath)), Charset.forName("UTF-8")); | |
} catch(IOException e) { | |
jsonString = ""; | |
} | |
if(jsonString.isEmpty()) { | |
return null; | |
} else { | |
return new JsonParser().parse(jsonString); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package one.flak.flinkgeomesa; | |
import com.google.gson.JsonArray; | |
import com.google.gson.JsonElement; | |
import com.google.gson.JsonObject; | |
import com.vividsolutions.jts.geom.Geometry; | |
import org.apache.commons.cli.BasicParser; | |
import org.apache.commons.cli.CommandLine; | |
import org.apache.commons.cli.CommandLineParser; | |
import org.apache.commons.cli.Options; | |
import org.geotools.data.*; | |
import org.geotools.data.simple.SimpleFeatureStore; | |
import org.geotools.feature.DefaultFeatureCollection; | |
import org.geotools.feature.FeatureCollection; | |
import org.geotools.feature.FeatureIterator; | |
import org.geotools.feature.simple.SimpleFeatureBuilder; | |
import org.geotools.filter.text.cql2.CQL; | |
import org.joda.time.DateTime; | |
import org.locationtech.geomesa.utils.interop.WKTUtils; | |
import org.opengis.feature.Feature; | |
import org.opengis.feature.simple.SimpleFeature; | |
import org.opengis.feature.simple.SimpleFeatureType; | |
import org.opengis.filter.Filter; | |
import java.io.IOException; | |
import java.util.Date; | |
import java.util.Map; | |
public class JsonToAccumuloConverter { | |
public static void main(String[] args) throws Exception { | |
System.out.println("READ CMDLINE OPTIONS"); | |
CommandLine cmd = getCmdLineOptions(args); | |
System.out.println("ACCESSING DATA STORE"); | |
DataStore dataStore = getAccumuloDataStore(cmd); | |
String filename = cmd.getOptionValue("filePath", "/Users/sb/Development/diploma-thesis/20160901/75226.json"); | |
JsonArray tripSamples = FileHelpers.readJsonFromFile(filename).getAsJsonArray(); | |
// establish specifics concerning the SimpleFeatureType to store | |
SimpleFeatureType simpleFeatureType = AccumuloConfig.createSimpleFeatureType(AccumuloConfig.SIMPLE_FEATURE_TYPE_NAME); | |
DefaultFeatureCollection featureCollection = new DefaultFeatureCollection(); | |
Object[] emptyFeature = {}; | |
String tripIdentifier = "75226"; | |
for(JsonElement sample : tripSamples) { | |
SimpleFeature simpleFeature = SimpleFeatureBuilder.build(simpleFeatureType, emptyFeature, null); | |
JsonObject json = sample.getAsJsonObject(); | |
long rawTimestamp = (long)(json.get("timestamp").getAsDouble() * 1000); | |
double latitude = Double.parseDouble(json.get("latitude").getAsString()); | |
double longitude = Double.parseDouble(json.get("longitude").getAsString()); | |
double accelerationZ = Double.parseDouble(json.get("accl_z").getAsString()); | |
simpleFeature.setAttribute("OccuredAt", new DateTime().withMillis(rawTimestamp).toDate()); | |
simpleFeature.setAttribute("TripIdentifier", tripIdentifier); | |
simpleFeature.setAttribute("AccelerationZ", accelerationZ); | |
Geometry geometry = WKTUtils.read("POINT(" + latitude + " " + longitude + ")"); | |
simpleFeature.setAttribute("SamplePosition", geometry); | |
featureCollection.add(simpleFeature); | |
System.out.println( | |
"TripId " + tripIdentifier + " | " + | |
"OccuredAt " + new Date(rawTimestamp) + " | " + | |
"Position " + latitude + " / " + longitude + " | " + | |
"AccelerationZ " + accelerationZ + " | " | |
); | |
} | |
System.out.printf("INSERTING %d samples ...", tripSamples.size()); | |
insertFeatures(AccumuloConfig.SIMPLE_FEATURE_TYPE_NAME, dataStore, featureCollection); | |
System.out.println("DONE"); | |
System.out.println("Reading some values ..."); | |
Filter filter = CQL.toFilter("INCLUDE"); | |
Query query = new Query(AccumuloConfig.SIMPLE_FEATURE_TYPE_NAME, filter); | |
FeatureSource source = dataStore.getFeatureSource(AccumuloConfig.SIMPLE_FEATURE_TYPE_NAME); | |
FeatureIterator features = source.getFeatures(query).features(); | |
while(features.hasNext()) { | |
Feature feature = features.next(); | |
System.out.println( | |
"TripId " + feature.getProperty("TripIdentifier").getValue() + " | " + | |
"OccuredAt " + feature.getProperty("OccuredAt").getValue() + " | " + | |
"Position " + feature.getProperty("SamplePosition").getValue() + " | " + | |
"AccelerationZ " + feature.getProperty("AccelerationZ").getValue() + " | " | |
); | |
} | |
features.close(); | |
dataStore.dispose(); | |
} | |
static void insertFeatures(String simpleFeatureTypeName, | |
DataStore dataStore, | |
FeatureCollection featureCollection) | |
throws IOException { | |
FeatureStore featureStore = (SimpleFeatureStore) dataStore.getFeatureSource(simpleFeatureTypeName); | |
featureStore.addFeatures(featureCollection); | |
} | |
static CommandLine getCmdLineOptions(String[] args) throws Exception { | |
// find out where -- in Accumulo -- the user wants to store data | |
CommandLineParser parser = new BasicParser(); | |
Options options = AccumuloConfig.getCommonRequiredOptions(); | |
CommandLine cmd = parser.parse( options, args); | |
return cmd; | |
} | |
static DataStore getAccumuloDataStore(CommandLine cmd) throws Exception { | |
// verify that we can see this Accumulo destination in a GeoTools manner | |
Map<String, String> dsConf = AccumuloConfig.getAccumuloDataStoreConf(cmd); | |
DataStore dataStore = DataStoreFinder.getDataStore(dsConf); | |
assert dataStore != null; | |
return dataStore; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment