Skip to content

Instantly share code, notes, and snippets.

@tsuna
Forked from AlexYangYu/gist:5476160
Last active December 16, 2015 18:49
Show Gist options
  • Save tsuna/5480390 to your computer and use it in GitHub Desktop.
Save tsuna/5480390 to your computer and use it in GitHub Desktop.
Recursive asynchronous HBase scanning
package me.alexyang.scanner;
import java.nio.charset.Charset;
import java.util.ArrayList;
import org.jboss.netty.util.CharsetUtil;
import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;
import org.hbase.async.HBaseClient;
import org.hbase.async.KeyValue;
import org.hbase.async.Scanner;
/**
* @author Alex Yang <[email protected]>
*/
public class ScannerTest {
public static void main(String[] args) throws Exception {
final HBaseClient hbase = new HBaseClient("centos-pd.alexyang.me");
try {
hbase.ensureTableExists("query").joinUninterruptibly();
final Scanner scanner = hbase.newScanner("query");
try {
scanner.setFamily("dims");
scanner.setQualifier("val");
final ArrayList<String> results = new ArrayList<String>();
class ResultHandler implements Callback<Object, ArrayList<ArrayList<KeyValue>>> {
public Object call(ArrayList<ArrayList<KeyValue>> t) throws Exception {
if (t == null) {
return null;
}
for (ArrayList<KeyValue> arr : t) {
for (KeyValue kv : arr) {
results.add(new String(kv.value(), CharsetUtil.ISO_8859_1));
}
}
return scanner.nextRows().addCallback(ResultHandler.this);
}
}
scanner.nextRows().addCallback(new ResultHandler()).join();
for (String str : results) {
System.out.println(str);
}
} finally {
scanner.close().joinUninterruptibly();
}
} finally {
hbase.shutdown().joinUninterruptibly();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment