Skip to content

Instantly share code, notes, and snippets.

@robinkraft
Created July 8, 2011 18:44
Show Gist options
  • Save robinkraft/1072495 to your computer and use it in GitHub Desktop.
Save robinkraft/1072495 to your computer and use it in GitHub Desktop.
BytesWritable error storing data using elephant-tap
(ns eleph.start
(:use [cascalog api])
(:require [elephantdb.cascalog.core :as e])
(:import [elephantdb.persistence JavaBerkDB]))
(def data [
[1 "dan"]
[2 "sam"]
[3 "robin"]
[4 "david"]
[5 "darius"]
])
(defn test-eleph
[]
(?- (e/elephant-tap "/Users/robin/data/elephantdb/out/domain"
{:num-shards 32 :persistence-factory (JavaBerkDB.)}
{})
(name-vars data ["?id" "?name"])))
11/07/08 14:33:50 INFO flow.Flow: [] starting
11/07/08 14:33:50 INFO flow.Flow: [] source: cascalog.MemorySourceTap@3e0
11/07/08 14:33:50 INFO flow.Flow: [] sink: elephantdb.cascading.ElephantDBTap@d
11/07/08 14:33:50 INFO flow.Flow: [] parallel execution is enabled: false
11/07/08 14:33:50 INFO flow.Flow: [] starting jobs: 1
11/07/08 14:33:50 INFO flow.Flow: [] allocating threads: 1
11/07/08 14:33:50 INFO flow.FlowStep: [] starting step: (1/1) elephantdb.cascading.ElephantDBTap@d
11/07/08 14:33:50 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
11/07/08 14:33:50 INFO mapred.MapTask: numReduceTasks: 1
11/07/08 14:33:50 INFO mapred.MapTask: io.sort.mb = 100
11/07/08 14:33:51 INFO mapred.MapTask: data buffer = 79691776/99614720
11/07/08 14:33:51 INFO mapred.MapTask: record buffer = 262144/327680
11/07/08 14:33:51 INFO mapred.MapTask: Starting flush of map output
11/07/08 14:33:51 INFO mapred.MapTask: Finished spill 0
11/07/08 14:33:51 INFO mapred.TaskRunner: Task:attempt_local_0014_m_000000_0 is done. And is in the process of commiting
11/07/08 14:33:51 INFO mapred.LocalJobRunner:
11/07/08 14:33:51 INFO mapred.TaskRunner: Task 'attempt_local_0014_m_000000_0' done.
11/07/08 14:33:51 INFO mapred.LocalJobRunner:
11/07/08 14:33:51 INFO mapred.Merger: Merging 1 sorted segments
11/07/08 14:33:51 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 189 bytes
11/07/08 14:33:51 INFO mapred.LocalJobRunner:
11/07/08 14:33:51 WARN mapred.LocalJobRunner: job_local_0014
cascading.flow.FlowException: internal error: ['3', '4', 'david']
at cascading.flow.stack.SinkReducerStackElement.operateSink(SinkReducerStackElement.java:146)
at cascading.flow.stack.SinkReducerStackElement.collect(SinkReducerStackElement.java:79)
at cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:71)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:55)
at cascading.operation.Identity.operate(Identity.java:99)
at cascading.pipe.Each.applyFunction(Each.java:380)
at cascading.pipe.Each.access$200(Each.java:53)
at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543)
at cascading.pipe.Each$EachHandler.operate(Each.java:478)
at cascading.flow.stack.EachReducerStackElement.operateEach(EachReducerStackElement.java:97)
at cascading.flow.stack.EachReducerStackElement.collect(EachReducerStackElement.java:84)
at cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:71)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:55)
at cascading.operation.Identity.operate(Identity.java:99)
at cascading.pipe.Each.applyFunction(Each.java:380)
at cascading.pipe.Each.access$200(Each.java:53)
at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543)
at cascading.pipe.Each$EachHandler.operate(Each.java:478)
at cascading.flow.stack.EachReducerStackElement.operateEach(EachReducerStackElement.java:97)
at cascading.flow.stack.EachReducerStackElement.collect(EachReducerStackElement.java:84)
at cascading.pipe.Every$EveryBufferHandler$1.collect(Every.java:482)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:71)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:55)
at cascalog.ops.IdentityBuffer.operate(IdentityBuffer.java:31)
at cascalog.CascalogBufferExecutor.operate(CascalogBufferExecutor.java:41)
at cascading.pipe.Every$EveryBufferHandler.operate(Every.java:534)
at cascading.flow.stack.EveryBufferReducerStackElement.collect(EveryBufferReducerStackElement.java:89)
at cascading.flow.stack.GroupReducerStackElement.operateGroup(GroupReducerStackElement.java:74)
at cascading.flow.stack.GroupReducerStackElement.collect(GroupReducerStackElement.java:58)
at cascading.flow.stack.FlowReducerStack.reduce(FlowReducerStack.java:169)
at cascading.flow.FlowReducer.reduce(FlowReducer.java:75)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:463)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:411)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:215)
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.hadoop.io.BytesWritable
at elephantdb.cascading.ElephantDBTap.sink(Unknown Source)
at cascading.flow.stack.SinkReducerStackElement.operateSink(SinkReducerStackElement.java:124)
... 35 more
11/07/08 14:33:51 WARN flow.FlowStep: [] task completion events identify failed tasks
11/07/08 14:33:51 WARN flow.FlowStep: [] task completion events count: 0
11/07/08 14:33:51 WARN flow.Flow: stopping jobs
11/07/08 14:33:51 INFO flow.FlowStep: [] stopping: (1/1) elephantdb.cascading.ElephantDBTap@d
11/07/08 14:33:51 WARN flow.Flow: stopped jobs
11/07/08 14:33:51 WARN flow.Flow: shutting down job executor
11/07/08 14:33:51 WARN flow.Flow: shutdown complete
11/07/08 14:33:51 INFO hadoop.Hadoop18TapUtil: deleting temp path /Users/robin/data/elephantdb/out/domain/_temporary
11/07/08 14:33:57 INFO mapred.LocalJobRunner: reduce > reduce
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment