- JsonInterceptor.java
- flume.conf
- JsonInterceptor$Builder.java
code snippet for: https://geunhokhim.wordpress.com/2016/03/28/apache-flume-implement-custom-interceptor/
code snippet for: https://geunhokhim.wordpress.com/2016/03/28/apache-flume-implement-custom-interceptor/
e.g. com.test.flume.CustomInterceptor$Builder | |
... | |
agent.sources.s1.interceptors = ci | |
agent.sources.s1.interceptors.ci.type = com.test.flume.CustomInterceptor$Builder | |
agent.sources.s1.interceptors.ci.${parameter_name} = ${value} |
e.g. agent.sources.s1.interceptors.ci.isISO8601Format = true | |
... | |
public void configure(Context context) { | |
boolean isISO8601Format = context.getBoolean("isISO8601Format", true); | |
} |
… | |
@Override | |
public Event intercept(Event event) { | |
Map<String, String> headers = event.getHeaders(); | |
String payload = new String(event.getBody(), StandardCharsets.UTF_8); | |
Map<String, Object> json = parseJson(payload); | |
if(json.containsKey(timestamp)) { | |
String ts = (String)json.get(timestamp); | |
DateTime dt = ISODateTimeFormat.dateTime().parseDateTime(ts); | |
long tm = dt.getMillis(); | |
headers.put(timestamp, Long.toString(tm)); | |
} else { | |
long now = System.currentTimeMillis(); | |
headers.put(timestamp, Long.toString(now)); | |
} | |
return event; | |
} | |
private Map<String, Object> parseJson(String payload) { | |
return gson.fromJson(payload, Map.class); | |
} | |
… |