Skip to content

Instantly share code, notes, and snippets.

@jeffkole
Created May 23, 2013 18:05
Show Gist options
  • Save jeffkole/5638106 to your computer and use it in GitHub Desktop.
Save jeffkole/5638106 to your computer and use it in GitHub Desktop.
Storm bolt to write data to a Kiji table.
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import org.kiji.schema.Kiji;
import org.kiji.schema.KijiURI;
import org.kiji.schema.KijiTable;
import org.kiji.schema.KijiTableWriter;
import org.kiji.schema.util.ResourceUtils;
import java.io.IOException;
import java.util.Map;
public class KijiBolt extends BaseRichBolt {
private transient OutputCollector collector;
private transient Kiji kiji;
private transient KijiTable table;
private transient KijiTableWriter writer;
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
KijiURI kijiUri = KijiURI.newBuilder().withInstanceName("default").withZookeeperClientPort(2181).build();
System.err.println("Opening Kiji: " + kijiUri);
try {
this.kiji = Kiji.Factory.open(kijiUri);
this.table = this.kiji.openTable("table_name");
this.writer = this.table.openTableWriter();
}
catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
@Override
public void execute(Tuple tuple) {
try {
for (Object value : tuple.getValues()) {
EventMessage message = (EventMessage)value;
this.writer.put(this.table.getEntityId(message.getId(), System.currentTimeMillis()),
"data", "event", message);
}
this.writer.flush();
this.collector.ack(tuple);
}
catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
@Override
public void cleanup() {
ResourceUtils.closeOrLog(this.writer);
ResourceUtils.releaseOrLog(this.table);
ResourceUtils.releaseOrLog(this.kiji);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment