Created
December 30, 2011 17:46
-
-
Save sritchie/1540755 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package forma.tap; | |
| import backtype.hadoop.pail.PailStructure; | |
| import cascading.kryo.KryoFactory; | |
| import cascalog.hadoop.ClojureKryoSerialization; | |
| import com.esotericsoftware.kryo.ObjectBuffer; | |
| import java.util.Collections; | |
| import java.util.List; | |
| public abstract class KryoPailStructure implements PailStructure<Object> { | |
| private transient ObjectBuffer kryoBuf; | |
| private ObjectBuffer getKryoBuffer() { | |
| if(kryoBuf == null) { | |
| ClojureKryoSerialization serialization = new ClojureKryoSerialization(); | |
| kryoBuf = KryoFactory.newBuffer(serialization.makeKryo()); | |
| } | |
| return kryoBuf; | |
| } | |
| public byte[] serialize(Object obj) { | |
| return getKryoBuffer().writeClassAndObject(obj); | |
| } | |
| public Object deserialize(byte[] record) { | |
| return getKryoBuffer().readClassAndObject(record); | |
| } | |
| public boolean isValidTarget(String... dirs) { | |
| return true; | |
| } | |
| public List<String> getTarget(Object o) { | |
| return Collections.EMPTY_LIST; | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| (ns forma.hadoop.pail | |
| (:use cascalog.api | |
| [cascalog.io :only (with-fs-tmp)] | |
| [forma.reproject :only (hv->tilestring)]) | |
| (:import [backtype.cascading.tap PailTap PailTap$PailTapOptions] | |
| [backtype.hadoop.pail PailStructure Pail] | |
| [forma.tap KryoPailStructure])) | |
| ;; ## Pail Data Structures | |
| (gen-class :name forma.hadoop.pail.DataChunkPailStructure | |
| :extends forma.tap.KryoPailStructure | |
| :prefix "pail-") | |
| (defn pail-getType [this] | |
| clojure.lang.PersistentHashMap) | |
| (defn pail-getTarget | |
| [this {:keys [location temporal-res dataset]}] | |
| (let [{:keys [spatial-res mod-h mod-v]} location | |
| resolution (format "%s-%s" spatial-res temporal-res) | |
| tilestring (hv->tilestring mod-h mod-v)] | |
| [dataset resolution tilestring])) | |
| (defn pail-isValidTarget | |
| [this dir-seq] | |
| (boolean (#{3 4} (count dir-seq)))) | |
| (defn pail-structure [] | |
| (forma.hadoop.pail.DataChunkPailStructure.)) | |
| ;; ## Pail Taps | |
| (defn- pail-tap | |
| [path colls structure] | |
| (let [seqs (into-array java.util.List colls) | |
| spec (PailTap/makeSpec nil structure) | |
| opts (PailTap$PailTapOptions. spec "datachunk" seqs nil)] | |
| (PailTap. path opts))) | |
| (defn split-chunk-tap [path & colls] | |
| (pail-tap path colls (pail-structure))) | |
| (defn ?pail-* | |
| "Executes the supplied query into the pail located at the supplied | |
| path, consolidating when finished." | |
| [tap pail-path query] | |
| (let [pail (Pail. pail-path)] | |
| (with-fs-tmp [_ tmp] | |
| (?- (tap tmp) query) | |
| (.absorb pail (Pail. tmp)) | |
| (.consolidate pail)))) | |
| (defmacro ?pail- | |
| "Executes the supplied query into the pail located at the supplied | |
| path, consolidating when finished." | |
| [[tap path] query] | |
| (list `?pail-* tap path query)) | |
| ;; ## Query Examples | |
| forma.hadoop.pail> (??- [[{:key "val"}]]) | |
| (([{:key "val"}])) | |
| forma.hadoop.pail> (?- (stdout) [[{:key "val"}]]) | |
| RESULTS | |
| ----------------------- | |
| {:key "val"} | |
| ----------------------- | |
| (?- (split-chunk-tap "/tmp/pailpath") [[{:key "val"}]]) | |
| ;; cascading.flow.FlowException: internal error: ['{:key "val"}'] | |
| ;; at cascading.flow.stack.SinkMapperStackElement.operateSink(SinkMapperStackElement.java:117) | |
| ;; at cascading.flow.stack.SinkMapperStackElement.collect(SinkMapperStackElement.java:72) | |
| ;; at cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532) | |
| ;; at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:71) | |
| ;; at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:55) | |
| ;; at cascading.operation.Identity.operate(Identity.java:99) | |
| ;; at cascading.pipe.Each.applyFunction(Each.java:380) | |
| ;; at cascading.pipe.Each.access$200(Each.java:53) | |
| ;; at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543) | |
| ;; at cascading.pipe.Each$EachHandler.operate(Each.java:478) | |
| ;; at cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElement.java:94) | |
| ;; at cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement.java:82) | |
| ;; at cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532) | |
| ;; at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:71) | |
| ;; at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:55) | |
| ;; at cascading.operation.Identity.operate(Identity.java:99) | |
| ;; at cascading.pipe.Each.applyFunction(Each.java:380) | |
| ;; at cascading.pipe.Each.access$200(Each.java:53) | |
| ;; at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543) | |
| ;; at cascading.pipe.Each$EachHandler.operate(Each.java:478) | |
| ;; at cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElement.java:94) | |
| ;; at cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement.java:82) | |
| ;; at cascading.flow.stack.FlowMapperStack.map(FlowMapperStack.java:220) | |
| ;; at cascading.flow.FlowMapper.map(FlowMapper.java:75) | |
| ;; at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50) | |
| ;; at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358) | |
| ;; at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307) | |
| ;; at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:176) | |
| ;; Caused by: java.lang.ClassCastException: clojure.lang.PersistentArrayMap cannot be cast to java.lang.Comparable | |
| ;; at cascading.tuple.Tuple.get(Tuple.java:221) | |
| ;; at cascading.tuple.TupleEntry.get(TupleEntry.java:274) | |
| ;; at backtype.cascading.tap.PailTap$PailScheme.sink(PailTap.java:106) | |
| ;; at cascading.tap.Tap.sink(Tap.java:280) | |
| ;; at cascading.flow.stack.SinkMapperStackElement.operateSink(SinkMapperStackElement.java:95) | |
| ;; ... 27 more |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment