package com.basho; import java.io.IOException; import java.io.InputStream; import java.io.UnsupportedEncodingException; import java.net.URL; import java.net.URLConnection; import java.net.URLEncoder; import org.apache.wink.common.internal.providers.multipart.MultiPartParser; import org.apache.wink.common.model.multipart.InMultiPart; import org.apache.wink.common.model.multipart.InPart; import org.json.JSONArray; import org.json.JSONObject; public class Streaming2iReader { String host, port, bucketType, bucket, index, queryValue = ""; URL url = null; URLConnection connection = null; InputStream response = null; public Streaming2iReader(String host, String port, String bucketType, String bucket, String index, String queryValue) throws IOException { this.host = host; this.port = port; this.bucketType = bucketType; this.bucket = bucket; this.index = index; this.queryValue = queryValue; String url = "http://"+host+":"+port+"/types/"+bucketType+"/buckets/"+bucket+"/index/"+index+"/"+queryValue; String charset = "UTF-8"; // Or in Java 7 and later, use the constant: java.nio.charset.StandardCharsets.UTF_8.name() String stream = "true"; String query=""; try { query = String.format("stream=%s", URLEncoder.encode(stream, charset)); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } if (! query.equals("")) { this.url = new URL(url + "?" + query); } else { this.url = new URL(url); } this.connection = this.url.openConnection(); connection.setRequestProperty("Accept-Charset", charset); this.response = connection.getInputStream(); } String getContentType() { return this.connection.getContentType(); } InputStream getResponse() { return this.response; } boolean isMultiPart() { return connection.getContentType().startsWith("multipart/mixed"); } String getBoundary() { if (isMultiPart()) { return getContentType().substring(25); } else { return null; } } public static void main(String[] args) throws Exception { String host = "172.28.128.7"; String port = "8098"; String bucketType = "default"; String bucket = "postings"; String index = "user_bin"; String queryValue = "charlie"; Streaming2iReader reader = new Streaming2iReader(host, port, bucketType, bucket, index, queryValue); if (reader.isMultiPart()) { InMultiPart imp = new InMultiPart(new MultiPartParser(reader.getResponse(), reader.getBoundary())); reader.processMessage(imp); } } public void processMessage(InMultiPart inMultiPart) throws Exception { while(inMultiPart.hasNext()) { InPart part = inMultiPart.next(); if (part.getContentType().startsWith("application/json")) { String jsonString = convertStreamToString(part.getInputStream()); JSONObject jo = new JSONObject(jsonString); JSONArray keys = (JSONArray) jo.get("keys"); for (Object key : keys) { String keyString = (String) key; processKey(keyString); } } else { throw new Exception("Unexpected content type: "+ part.getContentType()); } } } public static void processKey(String keyString) { System.out.println(keyString); } public static void debug(String message) { System.out.println("DEBUG: " + message); } static String convertStreamToString(java.io.InputStream is) { java.util.Scanner s = new java.util.Scanner(is).useDelimiter("\\A"); return s.hasNext() ? s.next() : ""; } }