-
-
Save satishv/2259772 to your computer and use it in GitHub Desktop.
Counters in Cascalog
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;;; The majority of this code is copied from the Cascalog source (1.7.0-SNAPSHOT as of 9/17/2011). | |
;;; It is fragile, because it goes beyond the public API... even using a private | |
;;; Cascalog function in one case. It can easily break for new versions of Cascalog. | |
;;; | |
;;; Provides a deffilterfpop macro. This def op can be used to create a Cascalog operation | |
;;; which behaves like a Filter, but is also passed an instance of the Cascading | |
;;; FlowProcess object. FlowProcess can be used to set the Hadoop Status, and to increment | |
;;; Hadoop Counters. | |
(ns weatherbill.hadoop.cascading | |
(:require | |
[cascalog | |
[workflow :as w] | |
[debug :as debug] | |
[predicate :as p]] | |
(:import | |
[cascading.pipe Each] | |
com.weatherbill.hadoop.ClojureFilterFP)) | |
(defmethod cascalog.predicate/hof-predicate? :filterfp [op & args] (:hof? (meta op))) | |
(defmethod cascalog.predicate/predicate-default-var :filterfp [& args] :<) | |
(def hof-prepend (ns-resolve 'cascalog.predicate 'hof-prepend)) | |
(defmethod cascalog.predicate/build-predicate-specific :filterfp [op _ hof-args infields outfields options] | |
(let [[func-fields out-selector] | |
(if (not-empty outfields) [outfields Fields/ALL] [nil nil]) | |
assembly (apply op (hof-prepend hof-args infields :fn> func-fields :> out-selector))] | |
(p/predicate cascalog.predicate/operation assembly infields outfields false))) | |
(defn inc-counter | |
"Increment a Hadoop counter" | |
[fp group name amount] | |
(.increment fp group name amount)) | |
(def defop-helper (ns-resolve 'cascalog.workflow 'defop-helper)) | |
;TODO only (deffilterfpop filter-ok ...) is supported, not (deffilterop-fp [filter-ok [arg]] ...) | |
(defmacro deffilterfpop [& args] | |
(defop-helper 'weatherbill.hadoop.cascading/filterfp args)) | |
(defn filterfp [& args] | |
(fn [previous] | |
(debug/debug-print "filter" args) | |
(let [[in-fields func-fields spec out-fields stateful] (w/parse-args args)] | |
(if func-fields | |
(Each. previous in-fields | |
(throw (UnsupportedOperationException. "filterfp does not support output args"))) | |
(Each. previous in-fields | |
(ClojureFilterFP. spec stateful)))))) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* The majority of this class is copied form the Cascalog source (1.7.0-SNAPSHOT as of 9/17/2011). | |
* This is a filter operation, where the FlowProcess object is exposed | |
*/ | |
package com.weatherbill.hadoop; | |
import cascading.operation.Filter; | |
import cascading.operation.FilterCall; | |
import cascading.flow.FlowProcess; | |
import cascalog.ClojureCascadingBase; | |
import clojure.lang.ISeq; | |
import cascalog.Util; | |
public class ClojureFilterFP extends ClojureCascadingBase implements Filter { | |
public ClojureFilterFP(Object[] fn_spec, boolean stateful) { | |
super(fn_spec, stateful); | |
} | |
public boolean isRemove(FlowProcess flow_process, FilterCall filter_call) { | |
ISeq fn_args_seq = Util.coerceFromTuple(filter_call.getArguments().getTuple()); | |
ISeq new_fn_args_seq = fn_args_seq.cons(flow_process); | |
return !Util.truthy(applyFunction(new_fn_args_seq)); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;;; Here is a sample query, which use deffilterfpop to update two counters. | |
;;; For each source row, runs some QA function on it and increments either | |
;;; the "data/good" counter or "data/bad" counter. | |
(ns sample.query | |
(:require weatherbill.hadoop.cascading)) | |
... | |
(cs/deffilterfpop filter-ok | |
[fp status] | |
(if | |
(= "OK" status) | |
(do | |
(cs/inc-counter fp "data" "ok" 1) | |
true) | |
(do | |
(cs/inc-counter fp "data" "bad" 1) | |
false))) | |
(defn query | |
[input] | |
(?<- | |
histories [?source ?interval ?meas ?history] | |
(input ?source ?val) | |
(some-op ?val :> ?status) | |
(filter-ok ?status))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment