Created
August 31, 2017 21:31
-
-
Save amarjitdhillon/51ab6fba68452b301658786459e7d171 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class writeToCassandra { | |
private static Session session; | |
private static Cluster cluster; | |
private static final String CREATE_KEYSPACE_QUERY = "CREATE KEYSPACE data WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};"; | |
private static final String createTable = "CREATE TABLE test.patient(id int, heart_rate int, PRIMARY KEY(id));" ; | |
public static void main(String[] args) throws Exception { | |
//setting the env variable to local | |
StreamExecutionEnvironment envrionment = StreamExecutionEnvironment.createLocalEnvironment(1); | |
DataStream<cassandraData> hrEventDataStream = envrionment.addSource(new sensorData()); | |
// hrEventDataStream.print(); | |
CassandraSink.addSink(hrEventDataStream) | |
.setQuery("INSERT INTO test.patient(id,heart_rate) values (?,?);") | |
.setClusterBuilder(new ClusterBuilder() { | |
@Override | |
protected Cluster buildCluster(Cluster.Builder builder) { | |
return builder.addContactPoint("127.0.0.1").build(); | |
} | |
}) | |
.build(); | |
envrionment.execute(); | |
} //main | |
} //writeToCassandra |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment