Skip to content

Instantly share code, notes, and snippets.

@sritchie
Last active December 17, 2015 07:09
Show Gist options
  • Save sritchie/5571012 to your computer and use it in GitHub Desktop.
Save sritchie/5571012 to your computer and use it in GitHub Desktop.
(defn cascalog-map
[op-var output-fields & {:keys [stateful?]}]
(let [ser (KryoService/serialize (ops/fn-spec op-var))]
(proxy [BaseOperation Function] [^Fields output-fields]
(prepare [^FlowProcess flow-process ^OperationCall op-call]
(let [op (Util/bootFn (KryoService/deserialize ser))]
(-> op-call
(.setContext [op (if stateful? (op))]))))
(operate [^FlowProcess flow-process ^FunctionCall fn-call]
(let [[op] (.getContext fn-call)
collector (-> fn-call .getOutputCollector)
^Tuple tuple (-> fn-call .getArguments .getTuple)]
(->> (Util/coerceFromTuple tuple)
(apply op)
(Util/coerceToTuple)
(.add collector))))
(cleanup [flow-process ^OperationCall op-call]
(if stateful?
(let [[op state] (.getContext op-call)]
(op state)))))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment