Skip to content

Instantly share code, notes, and snippets.

@sritchie
Created December 30, 2011 17:46
Show Gist options
  • Select an option

  • Save sritchie/1540755 to your computer and use it in GitHub Desktop.

Select an option

Save sritchie/1540755 to your computer and use it in GitHub Desktop.
package forma.tap;
import backtype.hadoop.pail.PailStructure;
import cascading.kryo.KryoFactory;
import cascalog.hadoop.ClojureKryoSerialization;
import com.esotericsoftware.kryo.ObjectBuffer;
import java.util.Collections;
import java.util.List;
public abstract class KryoPailStructure implements PailStructure<Object> {
private transient ObjectBuffer kryoBuf;
private ObjectBuffer getKryoBuffer() {
if(kryoBuf == null) {
ClojureKryoSerialization serialization = new ClojureKryoSerialization();
kryoBuf = KryoFactory.newBuffer(serialization.makeKryo());
}
return kryoBuf;
}
public byte[] serialize(Object obj) {
return getKryoBuffer().writeClassAndObject(obj);
}
public Object deserialize(byte[] record) {
return getKryoBuffer().readClassAndObject(record);
}
public boolean isValidTarget(String... dirs) {
return true;
}
public List<String> getTarget(Object o) {
return Collections.EMPTY_LIST;
}
}
(ns forma.hadoop.pail
(:use cascalog.api
[cascalog.io :only (with-fs-tmp)]
[forma.reproject :only (hv->tilestring)])
(:import [backtype.cascading.tap PailTap PailTap$PailTapOptions]
[backtype.hadoop.pail PailStructure Pail]
[forma.tap KryoPailStructure]))
;; ## Pail Data Structures
(gen-class :name forma.hadoop.pail.DataChunkPailStructure
:extends forma.tap.KryoPailStructure
:prefix "pail-")
(defn pail-getType [this]
clojure.lang.PersistentHashMap)
(defn pail-getTarget
[this {:keys [location temporal-res dataset]}]
(let [{:keys [spatial-res mod-h mod-v]} location
resolution (format "%s-%s" spatial-res temporal-res)
tilestring (hv->tilestring mod-h mod-v)]
[dataset resolution tilestring]))
(defn pail-isValidTarget
[this dir-seq]
(boolean (#{3 4} (count dir-seq))))
(defn pail-structure []
(forma.hadoop.pail.DataChunkPailStructure.))
;; ## Pail Taps
(defn- pail-tap
[path colls structure]
(let [seqs (into-array java.util.List colls)
spec (PailTap/makeSpec nil structure)
opts (PailTap$PailTapOptions. spec "datachunk" seqs nil)]
(PailTap. path opts)))
(defn split-chunk-tap [path & colls]
(pail-tap path colls (pail-structure)))
(defn ?pail-*
"Executes the supplied query into the pail located at the supplied
path, consolidating when finished."
[tap pail-path query]
(let [pail (Pail. pail-path)]
(with-fs-tmp [_ tmp]
(?- (tap tmp) query)
(.absorb pail (Pail. tmp))
(.consolidate pail))))
(defmacro ?pail-
"Executes the supplied query into the pail located at the supplied
path, consolidating when finished."
[[tap path] query]
(list `?pail-* tap path query))
;; ## Query Examples
forma.hadoop.pail> (??- [[{:key "val"}]])
(([{:key "val"}]))
forma.hadoop.pail> (?- (stdout) [[{:key "val"}]])
RESULTS
-----------------------
{:key "val"}
-----------------------
(?- (split-chunk-tap "/tmp/pailpath") [[{:key "val"}]])
;; cascading.flow.FlowException: internal error: ['{:key "val"}']
;; at cascading.flow.stack.SinkMapperStackElement.operateSink(SinkMapperStackElement.java:117)
;; at cascading.flow.stack.SinkMapperStackElement.collect(SinkMapperStackElement.java:72)
;; at cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532)
;; at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:71)
;; at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:55)
;; at cascading.operation.Identity.operate(Identity.java:99)
;; at cascading.pipe.Each.applyFunction(Each.java:380)
;; at cascading.pipe.Each.access$200(Each.java:53)
;; at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543)
;; at cascading.pipe.Each$EachHandler.operate(Each.java:478)
;; at cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElement.java:94)
;; at cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement.java:82)
;; at cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532)
;; at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:71)
;; at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:55)
;; at cascading.operation.Identity.operate(Identity.java:99)
;; at cascading.pipe.Each.applyFunction(Each.java:380)
;; at cascading.pipe.Each.access$200(Each.java:53)
;; at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543)
;; at cascading.pipe.Each$EachHandler.operate(Each.java:478)
;; at cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElement.java:94)
;; at cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement.java:82)
;; at cascading.flow.stack.FlowMapperStack.map(FlowMapperStack.java:220)
;; at cascading.flow.FlowMapper.map(FlowMapper.java:75)
;; at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
;; at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358)
;; at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307)
;; at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:176)
;; Caused by: java.lang.ClassCastException: clojure.lang.PersistentArrayMap cannot be cast to java.lang.Comparable
;; at cascading.tuple.Tuple.get(Tuple.java:221)
;; at cascading.tuple.TupleEntry.get(TupleEntry.java:274)
;; at backtype.cascading.tap.PailTap$PailScheme.sink(PailTap.java:106)
;; at cascading.tap.Tap.sink(Tap.java:280)
;; at cascading.flow.stack.SinkMapperStackElement.operateSink(SinkMapperStackElement.java:95)
;; ... 27 more
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment