package com.basho;

import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.URL;
import java.net.URLConnection;
import java.net.URLEncoder;

import org.apache.wink.common.internal.providers.multipart.MultiPartParser;
import org.apache.wink.common.model.multipart.InMultiPart;
import org.apache.wink.common.model.multipart.InPart;
import org.json.JSONArray;
import org.json.JSONObject;

public class Streaming2iReader {

	String host, port, bucketType, bucket, index, queryValue = "";
	URL url = null;
	URLConnection connection = null;
	InputStream response = null;
	
	public Streaming2iReader(String host, String port, String bucketType, String bucket, String index, String queryValue) throws IOException {
		this.host = host;
		this.port = port;
		this.bucketType = bucketType;
		this.bucket = bucket;
		this.index = index;
		this.queryValue = queryValue;		
		String url = "http://"+host+":"+port+"/types/"+bucketType+"/buckets/"+bucket+"/index/"+index+"/"+queryValue;
		String charset = "UTF-8";  // Or in Java 7 and later, use the constant: java.nio.charset.StandardCharsets.UTF_8.name()
		String stream = "true";

		String query="";
		try {
			query = String.format("stream=%s", 
			URLEncoder.encode(stream, charset));
		} catch (UnsupportedEncodingException e) {
			e.printStackTrace();
		}
		
		if (! query.equals("")) {
			this.url = new URL(url + "?" + query);
		} else {
			this.url = new URL(url);
		}
		
		this.connection = this.url.openConnection();
		
		connection.setRequestProperty("Accept-Charset", charset);
		this.response = connection.getInputStream();

	}
	
	String getContentType() {
		return this.connection.getContentType();
	}
	
	InputStream getResponse() {
		return this.response;
	}
	
	boolean isMultiPart() {
		return connection.getContentType().startsWith("multipart/mixed");
	}
	
	String getBoundary() {
		if (isMultiPart()) {
			return getContentType().substring(25);
		} else {
			return null;
		}
	}

	public static void main(String[] args) throws Exception {

		String host = "172.28.128.7";
		String port = "8098";
		String bucketType = "default";
		String bucket = "postings";
		String index = "user_bin";
		String queryValue = "charlie";

		Streaming2iReader reader = new Streaming2iReader(host, port, bucketType, bucket, index, queryValue);
			
        if (reader.isMultiPart()) {
        	InMultiPart imp = new InMultiPart(new MultiPartParser(reader.getResponse(), reader.getBoundary()));
        	reader.processMessage(imp);
        }
	    
	}
	
	public void processMessage(InMultiPart inMultiPart) throws Exception  {
	    while(inMultiPart.hasNext()) {
	        InPart part = inMultiPart.next();
	        if (part.getContentType().startsWith("application/json")) {
		        String jsonString = convertStreamToString(part.getInputStream());
		        JSONObject jo = new JSONObject(jsonString);
		        JSONArray keys = (JSONArray) jo.get("keys");
		        for (Object key : keys) {
		        	String keyString = (String) key;
		        	processKey(keyString);
		        }
	        } else {
	        	throw new Exception("Unexpected content type: "+ part.getContentType());
	        }
	    }
	}
	
	public static void processKey(String keyString) {
		System.out.println(keyString);
	}
	
	public static void debug(String message) {
		System.out.println("DEBUG: " + message);
	}
	
	static String convertStreamToString(java.io.InputStream is) {
	    java.util.Scanner s = new java.util.Scanner(is).useDelimiter("\\A");
	    return s.hasNext() ? s.next() : "";
	}
}