Skip to content

Instantly share code, notes, and snippets.

@yashk
Last active January 4, 2016 16:29
Show Gist options
  • Save yashk/8648115 to your computer and use it in GitHub Desktop.
Save yashk/8648115 to your computer and use it in GitHub Desktop.
cascading.avro - supply types via Fields class
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/yash/.m2/repository/org/slf4j/slf4j-simple/1.6.1/slf4j-simple-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/yash/.m2/repository/org/slf4j/slf4j-log4j12/1.6.1/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
174 [main] INFO cascading.flow.hadoop.util.HadoopUtil - resolving application jar from found main method on: com.aegis.adl.etl.tmp.WordCountAvroWrite
175 [main] INFO cascading.flow.hadoop.planner.HadoopPlanner - using application jar: null
192 [main] INFO cascading.property.AppProps - using app.id: 762BA42C90064C4984800D8960EFEA0E
330 [main] WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
575 [flow wc] INFO cascading.util.Version - Concurrent, Inc - Cascading 2.2.0
577 [flow wc] INFO cascading.flow.Flow - [wc] starting
577 [flow wc] INFO cascading.flow.Flow - [wc] source: Hfs["TextDelimited[['Time', 'User_ID']]"]["sample/data/WC/input.txt"]
578 [flow wc] INFO cascading.flow.Flow - [wc] sink: Hfs["AvroScheme{schema={"type":"record","name":"wordcount","namespace":"cascading.avro.examples","fields":[{"name":"Time","type":["string","null"]},{"name":"User_ID","type":"long"}]}}"]["sample/data/WC/out"]
578 [flow wc] INFO cascading.flow.Flow - [wc] parallel execution is enabled: false
578 [flow wc] INFO cascading.flow.Flow - [wc] starting jobs: 1
578 [flow wc] INFO cascading.flow.Flow - [wc] allocating threads: 1
581 [pool-1-thread-1] INFO cascading.flow.FlowStep - [wc] starting step: (1/1) sample/data/WC/out
628 [pool-1-thread-1] WARN org.apache.hadoop.conf.Configuration - session.id is deprecated. Instead, use dfs.metrics.session-id
629 [pool-1-thread-1] INFO org.apache.hadoop.metrics.jvm.JvmMetrics - Initializing JVM Metrics with processName=JobTracker, sessionId=
653 [pool-1-thread-1] WARN org.apache.hadoop.mapred.JobClient - No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
671 [pool-1-thread-1] INFO org.apache.hadoop.mapred.FileInputFormat - Total input paths to process : 1
959 [pool-1-thread-1] INFO cascading.flow.FlowStep - [wc] submitted hadoop job: job_local613149809_0001
959 [Thread-11] INFO org.apache.hadoop.mapred.LocalJobRunner - OutputCommitter set in config null
960 [Thread-11] INFO org.apache.hadoop.mapred.LocalJobRunner - OutputCommitter is org.apache.hadoop.mapred.FileOutputCommitter
965 [Thread-11] INFO org.apache.hadoop.mapred.LocalJobRunner - Waiting for map tasks
967 [pool-2-thread-1] INFO org.apache.hadoop.mapred.LocalJobRunner - Starting task: attempt_local613149809_0001_m_000000_0
979 [pool-2-thread-1] WARN mapreduce.Counters - Group org.apache.hadoop.mapred.Task$Counter is deprecated. Use org.apache.hadoop.mapreduce.TaskCounter instead
1001 [pool-2-thread-1] INFO org.apache.hadoop.util.ProcessTree - setsid exited with exit code 0
1004 [pool-2-thread-1] INFO org.apache.hadoop.mapred.Task - Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@b901239
1012 [pool-2-thread-1] INFO cascading.tap.hadoop.io.MultiInputSplit - current split input path: file:/home/yash/prof/projects/aegis/code/adl/etl/sample/data/WC/input.txt
1012 [pool-2-thread-1] INFO org.apache.hadoop.mapred.MapTask - Processing split: cascading.tap.hadoop.io.MultiInputSplit@42d4a1cc
1015 [pool-2-thread-1] WARN mapreduce.Counters - Counter name MAP_INPUT_BYTES is deprecated. Use FileInputFormatCounters as group name and BYTES_READ as counter name instead
1017 [pool-2-thread-1] INFO org.apache.hadoop.mapred.MapTask - numReduceTasks: 0
1043 [pool-2-thread-1] INFO cascading.flow.hadoop.FlowMapper - cascading version: 2.2.0
1043 [pool-2-thread-1] INFO cascading.flow.hadoop.FlowMapper - child jvm opts: -Xmx200m
1063 [pool-2-thread-1] INFO cascading.flow.hadoop.FlowMapper - sourcing from: Hfs["TextDelimited[['Time', 'User_ID']]"]["sample/data/WC/input.txt"]
1064 [pool-2-thread-1] INFO cascading.flow.hadoop.FlowMapper - sinking to: Hfs["AvroScheme{schema={"type":"record","name":"wordcount","namespace":"cascading.avro.examples","fields":[{"name":"Time","type":["string","null"]},{"name":"User_ID","type":"long"}]}}"]["sample/data/WC/out"]
1080 [pool-2-thread-1] ERROR cascading.flow.stream.TrapHandler - caught Throwable, no trap available, rethrowing
cascading.tuple.TupleException: unable to sink into output identifier: sample/data/WC/out
at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:160)
at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:119)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:71)
at cascading.tuple.TupleEntrySchemeCollector.add(TupleEntrySchemeCollector.java:134)
at cascading.flow.stream.SinkStage.receive(SinkStage.java:90)
at cascading.flow.stream.SinkStage.receive(SinkStage.java:37)
at cascading.flow.stream.SourceStage.map(SourceStage.java:102)
at cascading.flow.stream.SourceStage.run(SourceStage.java:58)
at cascading.flow.hadoop.FlowMapper.run(FlowMapper.java:127)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:417)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:332)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:268)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:263)
at org.apache.avro.mapred.AvroOutputFormat$1.write(AvroOutputFormat.java:161)
at org.apache.avro.mapred.AvroOutputFormat$1.write(AvroOutputFormat.java:158)
at org.apache.hadoop.mapred.MapTask$DirectMapOutputCollector.collect(MapTask.java:716)
at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:526)
at cascading.tap.hadoop.util.MeasuredOutputCollector.collect(MeasuredOutputCollector.java:69)
at cascading.avro.AvroScheme.sink(AvroScheme.java:153)
at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:153)
... 16 more
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:79)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:257)
... 23 more
1081 [pool-2-thread-1] ERROR cascading.flow.stream.SourceStage - caught throwable
cascading.tuple.TupleException: unable to sink into output identifier: sample/data/WC/out
at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:160)
at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:119)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:71)
at cascading.tuple.TupleEntrySchemeCollector.add(TupleEntrySchemeCollector.java:134)
at cascading.flow.stream.SinkStage.receive(SinkStage.java:90)
at cascading.flow.stream.SinkStage.receive(SinkStage.java:37)
at cascading.flow.stream.SourceStage.map(SourceStage.java:102)
at cascading.flow.stream.SourceStage.run(SourceStage.java:58)
at cascading.flow.hadoop.FlowMapper.run(FlowMapper.java:127)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:417)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:332)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:268)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:263)
at org.apache.avro.mapred.AvroOutputFormat$1.write(AvroOutputFormat.java:161)
at org.apache.avro.mapred.AvroOutputFormat$1.write(AvroOutputFormat.java:158)
at org.apache.hadoop.mapred.MapTask$DirectMapOutputCollector.collect(MapTask.java:716)
at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:526)
at cascading.tap.hadoop.util.MeasuredOutputCollector.collect(MeasuredOutputCollector.java:69)
at cascading.avro.AvroScheme.sink(AvroScheme.java:153)
at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:153)
... 16 more
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:79)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:257)
... 23 more
1082 [Thread-11] INFO org.apache.hadoop.mapred.LocalJobRunner - Map task executor complete.
1083 [Thread-11] WARN org.apache.hadoop.mapred.LocalJobRunner - job_local613149809_0001
java.lang.Exception: cascading.tuple.TupleException: unable to sink into output identifier: sample/data/WC/out
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:406)
Caused by: cascading.tuple.TupleException: unable to sink into output identifier: sample/data/WC/out
at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:160)
at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:119)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:71)
at cascading.tuple.TupleEntrySchemeCollector.add(TupleEntrySchemeCollector.java:134)
at cascading.flow.stream.SinkStage.receive(SinkStage.java:90)
at cascading.flow.stream.SinkStage.receive(SinkStage.java:37)
at cascading.flow.stream.SourceStage.map(SourceStage.java:102)
at cascading.flow.stream.SourceStage.run(SourceStage.java:58)
at cascading.flow.hadoop.FlowMapper.run(FlowMapper.java:127)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:417)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:332)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:268)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:263)
at org.apache.avro.mapred.AvroOutputFormat$1.write(AvroOutputFormat.java:161)
at org.apache.avro.mapred.AvroOutputFormat$1.write(AvroOutputFormat.java:158)
at org.apache.hadoop.mapred.MapTask$DirectMapOutputCollector.collect(MapTask.java:716)
at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:526)
at cascading.tap.hadoop.util.MeasuredOutputCollector.collect(MeasuredOutputCollector.java:69)
at cascading.avro.AvroScheme.sink(AvroScheme.java:153)
at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:153)
... 16 more
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:79)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:257)
... 23 more
5960 [pool-1-thread-1] WARN cascading.flow.FlowStep - [wc] task completion events identify failed tasks
5960 [pool-1-thread-1] WARN cascading.flow.FlowStep - [wc] task completion events count: 0
5964 [flow wc] INFO cascading.flow.Flow - [wc] stopping all jobs
5964 [flow wc] INFO cascading.flow.FlowStep - [wc] stopping: (1/1) sample/data/WC/out
5964 [flow wc] INFO cascading.flow.Flow - [wc] stopped all jobs
5965 [flow wc] INFO cascading.tap.hadoop.util.Hadoop18TapUtil - deleting temp path sample/data/WC/out/_temporary
Exception in thread "main" cascading.flow.FlowException: local step failed
at cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:212)
at cascading.flow.planner.FlowStepJob.start(FlowStepJob.java:145)
at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:120)
at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:42)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Chris,
thanks for suggestion.
i tried supplying type info via Fields class , but it did not resolve issue - same classcastexception. i guess AvroSchema is not using type info supplied to it ?
Ken,
can you pls comment on this.
public class
WordCountAvroWrite
{
public static void
main( String[] args ) throws Exception
{
String docPath = "sample/data/WC/input.txt";
String wcPath = "sample/data/WC/out";
String schemaFile = "sample/data/WC/input.avsc";
Schema schema = new Schema.Parser().parse(new File(schemaFile));
Properties properties = new Properties();
AppProps.setApplicationJarClass( properties, WordCountAvroWrite.class );
HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );
Fields timeField = new Fields("Time");
Fields userField = new Fields("User_ID").applyTypes(long.class);
Fields outputFields = timeField.append(userField);
Tap docTap = new Hfs( new TextDelimited(outputFields,true,","),docPath );
Tap wcTap = new Hfs( new AvroScheme( schema), wcPath ,SinkMode.REPLACE);
Pipe copyPipe = new Pipe("wc");
// copyPipe = new Coerce(copyPipe,userField);
FlowDef flowDef = FlowDef.flowDef()
.setName( "wc" )
.addSource( copyPipe, docTap )
.addTailSink( copyPipe, wcTap );
Flow wcFlow = flowConnector.connect( flowDef );
wcFlow.complete();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment