Skip to content

Instantly share code, notes, and snippets.

@rmetzger
Last active August 29, 2015 14:18
Show Gist options
  • Save rmetzger/a218beca4b0442f3c1f3 to your computer and use it in GitHub Desktop.
Save rmetzger/a218beca4b0442f3c1f3 to your computer and use it in GitHub Desktop.
diff --git a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
index 1f39694..cadb938 100644
--- a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
+++ b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
@@ -20,10 +20,12 @@ package org.apache.flink.addons.hbase.example;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
@@ -63,10 +65,16 @@ public class HBaseWriteExample {
Job job = Job.getInstance();
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, outputTableName);
job.getConfiguration().set("mapred.output.dir","/tmp/test");
- counts.map(new MapFunction<Tuple2<String,Integer>, Tuple2<Text,Mutation>>() {
+ counts.map(new RichMapFunction<Tuple2<String,Integer>, Tuple2<Text,Mutation>>() {
private final byte[] CF_SOME = Bytes.toBytes("test-column");
private final byte[] Q_SOME = Bytes.toBytes("value");
- private Tuple2<Text, Mutation> reuse = new Tuple2<Text, Mutation>();
+ private transient Tuple2<Text, Mutation> reuse = new Tuple2<Text, Mutation>();
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ reuse = new Tuple2<Text, Mutation>();
+ }
@Override
public Tuple2<Text, Mutation> map(Tuple2<String, Integer> t) throws Exception {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment