Created
March 18, 2013 14:13
-
-
Save moaikids/5187421 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import java.io.InputStream; | |
import java.io.InputStreamReader; | |
import java.util.Map; | |
import java.util.concurrent.BlockingQueue; | |
import javax.annotation.Nullable; | |
import org.codehaus.jackson.map.ObjectMapper; | |
import com.twitter.hbc.common.CharacterStreamReader; | |
import com.twitter.hbc.core.Constants; | |
import com.twitter.hbc.core.processor.AbstractProcessor; | |
public class MapDelimitedProcessor extends | |
AbstractProcessor<Map<String, Object>> { | |
private final static int DEFAULT_BUFFER_SIZE = 50000; | |
private final static int MAX_ALLOWABLE_BUFFER_SIZE = 500000; | |
private final static String EMPTY_LINE = ""; | |
private CharacterStreamReader reader; | |
private ObjectMapper json; | |
public MapDelimitedProcessor(BlockingQueue<Map<String, Object>> queue) { | |
super(queue); | |
} | |
public MapDelimitedProcessor(BlockingQueue<Map<String, Object>> queue, | |
long offerTimeoutMillis) { | |
super(queue, offerTimeoutMillis); | |
} | |
@Override | |
public void setup(InputStream input) { | |
reader = new CharacterStreamReader(new InputStreamReader(input, | |
Constants.DEFAULT_CHARSET), DEFAULT_BUFFER_SIZE); | |
json = new ObjectMapper(); | |
} | |
@SuppressWarnings("unchecked") | |
@Override | |
@Nullable | |
protected Map<String, Object> processNextMessage() throws IOException { | |
String line = reader.readline(); | |
if (line == null) { | |
throw new IOException("Unable to read new line from stream"); | |
} else if (line.equals(EMPTY_LINE)) { | |
return null; | |
} | |
int delimitedCount = Integer.parseInt(line); | |
if (delimitedCount > MAX_ALLOWABLE_BUFFER_SIZE) { | |
// this is to protect us from nastiness | |
throw new IOException("Unreasonable message size " + delimitedCount); | |
} | |
String val = reader.read(delimitedCount); | |
return json.readValue(val, Map.class); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment