Skip to content

Instantly share code, notes, and snippets.

(w/defbufferop sum-stats [tuples]
[(rest (reduce (fn [[id-old & vals-old :as old] [id & vals]]
(if (= id-old id)
old
(cons id (map + vals-old vals))))
tuples))]))
@nathanmarz
nathanmarz / gist:511938
Created August 6, 2010 20:20
Random number in Cascalog
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlowProcess;
import cascading.operation.FunctionCall;
import cascading.operation.OperationCall;
import cascading.tuple.Tuple;
import java.util.Random;
import cascalog.CascalogFunction;
public class RandInt extends CascalogFunction {
long _seed;
;; Source:
(defmacro binder [& body]
(let [[tobind lexpr] (split-at (dec (count body)) body)
binded (vec (mapcat (fn [e]
(if (and (list? e) (= 'bind (first e)))
[(second e) (last e)]
['_ e]
))
tobind ))]
`(let ~binded
(def average
(<- [!val :> !avg])
(c/count !count)
(c/sum !val :> !sum)
(div !sum !count :> !avg))
(defn constructor-num-args [klass]
(let [constrs (.getConstructors klass)]
(set
(for [c constrs]
(count (.getParameterTypes c))
))
))
(defn- mk-fn-inst [klass numargs]
(let [args (vec (for [i (range numargs)] (symbol (str "arg" i))))]
(defmapcatop intensive-op [str]
(c/with-timeout [10000] ; 10 second timeout
[(extract-tuple-using-crazy-regex str)]
))
---
local_persistence: elephantdb.persistence.JavaBerkDB
num_shards: 32
Tap source = new Hfs(new SequenceFile(new Fields("key", "value")), "/tmp/key-value-pairs");
DomainSpec spec = new DomainSpec(new JavaBerkDB(), 32);
ElephantDBTap sink = new ElephantDBTap("/data/output/my-edb-domain", spec);
Pipe p = new Pipe("pipe");
p = new ElephantTailAssembly(p, sink);
new FlowConnector().connect(source, sink, p).complete();
(?-
(elephant-tap "/data/output/my-edb-domain" {:num-shards 32 :persistence-factory (JavaBerkDB.)} {})
(name-vars (hfs-seqfile "/tmp/key-value-pairs") ["?key" "?value"]))
ElephantDBTap source = new ElephantDBTap("/data/output/my-edb-domain");
Pipe p = new Pipe("pipe");
p = new Each(p, new Fields("key", "value"), new ProcessKeyValuePairs(), Fields.RESULTS);
...