Skip to content

Instantly share code, notes, and snippets.

@geunho
Last active July 20, 2016 01:53
Show Gist options
  • Save geunho/b9f6fe147dc09594044a9c53ce1e428c to your computer and use it in GitHub Desktop.
Save geunho/b9f6fe147dc09594044a9c53ce1e428c to your computer and use it in GitHub Desktop.
apache-flume-implement-custom-interceptor
e.g. com.test.flume.CustomInterceptor$Builder
...
agent.sources.s1.interceptors = ci
agent.sources.s1.interceptors.ci.type = com.test.flume.CustomInterceptor$Builder
agent.sources.s1.interceptors.ci.${parameter_name} = ${value}
e.g. agent.sources.s1.interceptors.ci.isISO8601Format = true
...
public void configure(Context context) {
boolean isISO8601Format = context.getBoolean("isISO8601Format", true);
}
@Override
public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();
String payload = new String(event.getBody(), StandardCharsets.UTF_8);
Map<String, Object> json = parseJson(payload);
if(json.containsKey(timestamp)) {
String ts = (String)json.get(timestamp);
DateTime dt = ISODateTimeFormat.dateTime().parseDateTime(ts);
long tm = dt.getMillis();
headers.put(timestamp, Long.toString(tm));
} else {
long now = System.currentTimeMillis();
headers.put(timestamp, Long.toString(now));
}
return event;
}
private Map<String, Object> parseJson(String payload) {
return gson.fromJson(payload, Map.class);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment