- JsonInterceptor.java
- flume.conf
- JsonInterceptor$Builder.java
code snippet for: https://geunhokhim.wordpress.com/2016/03/28/apache-flume-implement-custom-interceptor/
code snippet for: https://geunhokhim.wordpress.com/2016/03/28/apache-flume-implement-custom-interceptor/
| e.g. com.test.flume.CustomInterceptor$Builder | |
| ... | |
| agent.sources.s1.interceptors = ci | |
| agent.sources.s1.interceptors.ci.type = com.test.flume.CustomInterceptor$Builder | |
| agent.sources.s1.interceptors.ci.${parameter_name} = ${value} |
| e.g. agent.sources.s1.interceptors.ci.isISO8601Format = true | |
| ... | |
| public void configure(Context context) { | |
| boolean isISO8601Format = context.getBoolean("isISO8601Format", true); | |
| } |
| … | |
| @Override | |
| public Event intercept(Event event) { | |
| Map<String, String> headers = event.getHeaders(); | |
| String payload = new String(event.getBody(), StandardCharsets.UTF_8); | |
| Map<String, Object> json = parseJson(payload); | |
| if(json.containsKey(timestamp)) { | |
| String ts = (String)json.get(timestamp); | |
| DateTime dt = ISODateTimeFormat.dateTime().parseDateTime(ts); | |
| long tm = dt.getMillis(); | |
| headers.put(timestamp, Long.toString(tm)); | |
| } else { | |
| long now = System.currentTimeMillis(); | |
| headers.put(timestamp, Long.toString(now)); | |
| } | |
| return event; | |
| } | |
| private Map<String, Object> parseJson(String payload) { | |
| return gson.fromJson(payload, Map.class); | |
| } | |
| … |