Skip to content

Instantly share code, notes, and snippets.

@moaikids
Created March 18, 2013 14:13
Show Gist options
  • Save moaikids/5187421 to your computer and use it in GitHub Desktop.
Save moaikids/5187421 to your computer and use it in GitHub Desktop.
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import javax.annotation.Nullable;
import org.codehaus.jackson.map.ObjectMapper;
import com.twitter.hbc.common.CharacterStreamReader;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.processor.AbstractProcessor;
public class MapDelimitedProcessor extends
AbstractProcessor<Map<String, Object>> {
private final static int DEFAULT_BUFFER_SIZE = 50000;
private final static int MAX_ALLOWABLE_BUFFER_SIZE = 500000;
private final static String EMPTY_LINE = "";
private CharacterStreamReader reader;
private ObjectMapper json;
public MapDelimitedProcessor(BlockingQueue<Map<String, Object>> queue) {
super(queue);
}
public MapDelimitedProcessor(BlockingQueue<Map<String, Object>> queue,
long offerTimeoutMillis) {
super(queue, offerTimeoutMillis);
}
@Override
public void setup(InputStream input) {
reader = new CharacterStreamReader(new InputStreamReader(input,
Constants.DEFAULT_CHARSET), DEFAULT_BUFFER_SIZE);
json = new ObjectMapper();
}
@SuppressWarnings("unchecked")
@Override
@Nullable
protected Map<String, Object> processNextMessage() throws IOException {
String line = reader.readline();
if (line == null) {
throw new IOException("Unable to read new line from stream");
} else if (line.equals(EMPTY_LINE)) {
return null;
}
int delimitedCount = Integer.parseInt(line);
if (delimitedCount > MAX_ALLOWABLE_BUFFER_SIZE) {
// this is to protect us from nastiness
throw new IOException("Unreasonable message size " + delimitedCount);
}
String val = reader.read(delimitedCount);
return json.readValue(val, Map.class);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment