Created
April 8, 2015 20:59
-
-
Save c-garcia/aa23855939e77e0b7212 to your computer and use it in GitHub Desktop.
Use gen-class to create a flume Sink implementation. The process method is not fully finished since the purpose of the gist is to show how to gen-class enables us to do this.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns minisink.Sink | |
(:import | |
minisink.Sink | |
(org.apache.flume Context Sink$Status)) | |
(:gen-class | |
:extends org.apache.flume.sink.AbstractSink | |
:implements [org.apache.flume.conf.Configurable] | |
;; state an object will have. Use atoms, refs or agents | |
;; to introduce mutability. It will be exposed as a method. | |
:state state | |
;; constructors define which constructors will be created and | |
;; how they will call the parent constructor based on the types of the args. | |
;; In this case, a constructor taking a String will call super with no args | |
;; A constructor with no args will call super with no args as well | |
:constructors {[String] [] | |
[] []} | |
;; init points to a fn that maps the signatures of the constructors above defined to actual | |
;; calls with values. For every signature in constructors, it needs to return | |
;; a vector of the values that will be passed to super and the state. | |
:init miniconstructor | |
:main false | |
;; methods defines which *new* methos will be exposed with its | |
;; signature | |
:methods [[message [] String]])) | |
(set! *warn-on-reflection* true) | |
(defn -miniconstructor | |
([msg] [[] (atom {:msg msg})]) ;; The constructor receiving a string will call super with | |
;; no args. It will set an atom as the state. | |
([] [[] (atom {:msg "default"})])) | |
(defn -configure [^Sink this ^Context ctx] | |
(let [^String opt (.getString ctx "message")] | |
(when opt | |
(reset! (.state this) {:msg opt})))) | |
(defn -message [^Sink this] | |
(-> (.state this) deref :msg)) | |
(defn -toString [^Sink this] | |
(-> (.state this) deref :msg)) | |
(defn -start [^Sink this] | |
(println "Starting")) | |
(defn -stop [^Sink this] | |
(println "Stopping")) | |
(defn ^Sink$Status -process [^Sink this] | |
(let [ch (.getChannel this) | |
tx (.getTransaction ch) | |
_ (.begin tx)] | |
(try | |
(let [ev (.take ch)] | |
(println "EVENT:" ev) | |
(.commit tx) | |
(if-not (nil? ev) | |
Sink$Status/READY | |
Sink$Status/BACKOFF)) | |
(catch Exception e | |
(println "FUCKED EVENT:" e) | |
Sink$Status/BACKOFF) | |
(finally | |
(.close tx))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment