Skip to content

Instantly share code, notes, and snippets.

@c-garcia
Created April 8, 2015 20:59
Show Gist options
  • Save c-garcia/aa23855939e77e0b7212 to your computer and use it in GitHub Desktop.
Save c-garcia/aa23855939e77e0b7212 to your computer and use it in GitHub Desktop.
Use gen-class to create a flume Sink implementation. The process method is not fully finished since the purpose of the gist is to show how to gen-class enables us to do this.
(ns minisink.Sink
(:import
minisink.Sink
(org.apache.flume Context Sink$Status))
(:gen-class
:extends org.apache.flume.sink.AbstractSink
:implements [org.apache.flume.conf.Configurable]
;; state an object will have. Use atoms, refs or agents
;; to introduce mutability. It will be exposed as a method.
:state state
;; constructors define which constructors will be created and
;; how they will call the parent constructor based on the types of the args.
;; In this case, a constructor taking a String will call super with no args
;; A constructor with no args will call super with no args as well
:constructors {[String] []
[] []}
;; init points to a fn that maps the signatures of the constructors above defined to actual
;; calls with values. For every signature in constructors, it needs to return
;; a vector of the values that will be passed to super and the state.
:init miniconstructor
:main false
;; methods defines which *new* methos will be exposed with its
;; signature
:methods [[message [] String]]))
(set! *warn-on-reflection* true)
(defn -miniconstructor
([msg] [[] (atom {:msg msg})]) ;; The constructor receiving a string will call super with
;; no args. It will set an atom as the state.
([] [[] (atom {:msg "default"})]))
(defn -configure [^Sink this ^Context ctx]
(let [^String opt (.getString ctx "message")]
(when opt
(reset! (.state this) {:msg opt}))))
(defn -message [^Sink this]
(-> (.state this) deref :msg))
(defn -toString [^Sink this]
(-> (.state this) deref :msg))
(defn -start [^Sink this]
(println "Starting"))
(defn -stop [^Sink this]
(println "Stopping"))
(defn ^Sink$Status -process [^Sink this]
(let [ch (.getChannel this)
tx (.getTransaction ch)
_ (.begin tx)]
(try
(let [ev (.take ch)]
(println "EVENT:" ev)
(.commit tx)
(if-not (nil? ev)
Sink$Status/READY
Sink$Status/BACKOFF))
(catch Exception e
(println "FUCKED EVENT:" e)
Sink$Status/BACKOFF)
(finally
(.close tx)))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment