Skip to content

Instantly share code, notes, and snippets.

@suyash
Created March 20, 2016 18:47
Show Gist options
  • Save suyash/d32045b5fae161b443b9 to your computer and use it in GitHub Desktop.
Save suyash/d32045b5fae161b443b9 to your computer and use it in GitHub Desktop.
Cassandra Indexing using Spark Cassandra Connector
group 'in.suyash.tests'
version '1.0-SNAPSHOT'
apply plugin: 'java'
apply plugin: 'application'
sourceCompatibility = 1.8
repositories {
mavenCentral()
}
dependencies {
compile group: 'org.apache.cassandra', name: 'cassandra-all', version: '2.1.6'
compile group: 'com.datastax.cassandra', name: 'cassandra-driver-core', version: '2.1.6'
testCompile group: 'junit', name: 'junit', version: '4.11'
}
sourceSets {
main {
java {
srcDir './'
}
}
}
mainClassName = "Main"
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Main {
private static final Logger logger = LoggerFactory.getLogger(Main.class);
private static final String HOST = "107.108.214.154";
private static final String KEYSPACE = "wordcount";
private static final String COLUMN_FAMILY = "input";
private static Cluster createConnection (String... nodes) {
Cluster.Builder clusterBuilder = Cluster.builder();
for (String node: nodes) {
clusterBuilder.addContactPoint(node);
}
return clusterBuilder.build();
}
private static void logClusterInformation (Cluster cluster) {
Metadata metadata = cluster.getMetadata();
logger.info("Connected To Cluster: {}", metadata.getClusterName());
logger.info("Cluster Details");
for (Host host: metadata.getAllHosts()) {
logger.info("Datacenter: {}, Host: {}, Rack: {}",
host.getDatacenter(),
host.getAddress(),
host.getRack()
);
}
}
private static void ensureKeySpaceExists (Session session) {
String query = "CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE
+ " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2 }";
session.execute(query);
logger.info("Ran query \"{}\"", query);
}
private static void createColumnFamily (Session session) {
String query = "CREATE TABLE " + KEYSPACE + "." + COLUMN_FAMILY
+ " (id int PRIMARY KEY, line text)";
session.execute(query);
logger.info("Ran query \"{}\"", query);
}
private static void insertData (Session session) {
String [] lines = {
"Suyash is a good boy",
"Suyash is a good programmer",
"Suyash is a good developer"
};
PreparedStatement statement = session.prepare("INSERT INTO " + KEYSPACE
+ "." + COLUMN_FAMILY
+ " (id, line) VALUES (?, ?)");
int times = 100;
for (int t = 0 ; t < times ; t++) {
for (int i = 0; i < lines.length; i++) {
session.execute(statement.bind(i * t + 1, lines[i]));
}
}
logger.info("inserted {} statements", lines.length * times);
}
public static void main (String... args) {
Cluster cluster = createConnection(HOST);
logClusterInformation(cluster);
Session session = cluster.connect();
try {
ensureKeySpaceExists(session);
createColumnFamily(session);
insertData(session);
} catch (Exception e) {
e.printStackTrace();
}
session.close();
cluster.close();
System.exit(0);
}
}
rootProject.name = 'CassandraInput'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment