Created
March 4, 2011 20:34
-
-
Save eslick/855663 to your computer and use it in GitHub Desktop.
An example custom HBase Sink for Flume written in Clojure
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; | |
;; Flume HBase User Sink | |
;; | |
(gen-class | |
:name compass.flume.UserSink | |
:extends "com.cloudera.flume.core.EventSink$Base" | |
:prefix "sink-" | |
:main false | |
:constructors {[String] [] | |
[String String] []} | |
:state state | |
:init constructor | |
:methods [ #^{:static true} [getSinkBuilders [] java.util.ArrayList] | |
#^{:static true} [builder [] com.cloudera.flume.core.EventSink] ]) | |
(defn sink-constructor | |
"Return a clojure structure containing the provided name" | |
([#^String table-name] | |
[[] {:name table-name | |
:threshold *flush-size*}]) | |
([#^String table-name #^String threshold] | |
[[] {:name table-name | |
:threshold (Integer/parseInt threshold)}])) | |
(defn sink-table [this] | |
(:name (.state this))) | |
(defn sink-threshold [this] | |
(:threshold (.state this))) | |
(defn sink-open | |
"Force connect on open" | |
[#^UserSink sink] | |
(client/get (sink-table sink) 0)) | |
(defn sink-append | |
[#^UserSink sink event] | |
;; If valid parse, push message onto queue | |
(when-let [rec (event->message event)] | |
(queue-push rec)) | |
;; When queue is full, write messages to HBase | |
(when (queue-full? (sink-threshold sink)) | |
(client/put-multi (sink-table sink) | |
(map as-row+fmap (queue-records))) | |
(queue-reset))) | |
(import '[compass.flume UserSink]) | |
(defn sink-close | |
[#^UserSink sink] | |
(when (queue-has-data?) | |
(client/put-multi (sink-table sink) | |
(map as-row+fmap (queue-records))) | |
(queue-reset))) | |
(defn sink-builder | |
[] | |
(proxy [SinkFactory$SinkBuilder] [] | |
(build [#^Context ctx args] | |
(UserSink. (first args))))) | |
(defn sink-getSinkBuilders | |
[] | |
(doto (new ArrayList) | |
(.add (new Pair "UserSink" (sink-builder))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment