Last active
January 4, 2016 16:29
-
-
Save yashk/8648115 to your computer and use it in GitHub Desktop.
cascading.avro - supply types via Fields class
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SLF4J: Class path contains multiple SLF4J bindings. | |
SLF4J: Found binding in [jar:file:/home/yash/.m2/repository/org/slf4j/slf4j-simple/1.6.1/slf4j-simple-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] | |
SLF4J: Found binding in [jar:file:/home/yash/.m2/repository/org/slf4j/slf4j-log4j12/1.6.1/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] | |
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. | |
174 [main] INFO cascading.flow.hadoop.util.HadoopUtil - resolving application jar from found main method on: com.aegis.adl.etl.tmp.WordCountAvroWrite | |
175 [main] INFO cascading.flow.hadoop.planner.HadoopPlanner - using application jar: null | |
192 [main] INFO cascading.property.AppProps - using app.id: 762BA42C90064C4984800D8960EFEA0E | |
330 [main] WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable | |
575 [flow wc] INFO cascading.util.Version - Concurrent, Inc - Cascading 2.2.0 | |
577 [flow wc] INFO cascading.flow.Flow - [wc] starting | |
577 [flow wc] INFO cascading.flow.Flow - [wc] source: Hfs["TextDelimited[['Time', 'User_ID']]"]["sample/data/WC/input.txt"] | |
578 [flow wc] INFO cascading.flow.Flow - [wc] sink: Hfs["AvroScheme{schema={"type":"record","name":"wordcount","namespace":"cascading.avro.examples","fields":[{"name":"Time","type":["string","null"]},{"name":"User_ID","type":"long"}]}}"]["sample/data/WC/out"] | |
578 [flow wc] INFO cascading.flow.Flow - [wc] parallel execution is enabled: false | |
578 [flow wc] INFO cascading.flow.Flow - [wc] starting jobs: 1 | |
578 [flow wc] INFO cascading.flow.Flow - [wc] allocating threads: 1 | |
581 [pool-1-thread-1] INFO cascading.flow.FlowStep - [wc] starting step: (1/1) sample/data/WC/out | |
628 [pool-1-thread-1] WARN org.apache.hadoop.conf.Configuration - session.id is deprecated. Instead, use dfs.metrics.session-id | |
629 [pool-1-thread-1] INFO org.apache.hadoop.metrics.jvm.JvmMetrics - Initializing JVM Metrics with processName=JobTracker, sessionId= | |
653 [pool-1-thread-1] WARN org.apache.hadoop.mapred.JobClient - No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String). | |
671 [pool-1-thread-1] INFO org.apache.hadoop.mapred.FileInputFormat - Total input paths to process : 1 | |
959 [pool-1-thread-1] INFO cascading.flow.FlowStep - [wc] submitted hadoop job: job_local613149809_0001 | |
959 [Thread-11] INFO org.apache.hadoop.mapred.LocalJobRunner - OutputCommitter set in config null | |
960 [Thread-11] INFO org.apache.hadoop.mapred.LocalJobRunner - OutputCommitter is org.apache.hadoop.mapred.FileOutputCommitter | |
965 [Thread-11] INFO org.apache.hadoop.mapred.LocalJobRunner - Waiting for map tasks | |
967 [pool-2-thread-1] INFO org.apache.hadoop.mapred.LocalJobRunner - Starting task: attempt_local613149809_0001_m_000000_0 | |
979 [pool-2-thread-1] WARN mapreduce.Counters - Group org.apache.hadoop.mapred.Task$Counter is deprecated. Use org.apache.hadoop.mapreduce.TaskCounter instead | |
1001 [pool-2-thread-1] INFO org.apache.hadoop.util.ProcessTree - setsid exited with exit code 0 | |
1004 [pool-2-thread-1] INFO org.apache.hadoop.mapred.Task - Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@b901239 | |
1012 [pool-2-thread-1] INFO cascading.tap.hadoop.io.MultiInputSplit - current split input path: file:/home/yash/prof/projects/aegis/code/adl/etl/sample/data/WC/input.txt | |
1012 [pool-2-thread-1] INFO org.apache.hadoop.mapred.MapTask - Processing split: cascading.tap.hadoop.io.MultiInputSplit@42d4a1cc | |
1015 [pool-2-thread-1] WARN mapreduce.Counters - Counter name MAP_INPUT_BYTES is deprecated. Use FileInputFormatCounters as group name and BYTES_READ as counter name instead | |
1017 [pool-2-thread-1] INFO org.apache.hadoop.mapred.MapTask - numReduceTasks: 0 | |
1043 [pool-2-thread-1] INFO cascading.flow.hadoop.FlowMapper - cascading version: 2.2.0 | |
1043 [pool-2-thread-1] INFO cascading.flow.hadoop.FlowMapper - child jvm opts: -Xmx200m | |
1063 [pool-2-thread-1] INFO cascading.flow.hadoop.FlowMapper - sourcing from: Hfs["TextDelimited[['Time', 'User_ID']]"]["sample/data/WC/input.txt"] | |
1064 [pool-2-thread-1] INFO cascading.flow.hadoop.FlowMapper - sinking to: Hfs["AvroScheme{schema={"type":"record","name":"wordcount","namespace":"cascading.avro.examples","fields":[{"name":"Time","type":["string","null"]},{"name":"User_ID","type":"long"}]}}"]["sample/data/WC/out"] | |
1080 [pool-2-thread-1] ERROR cascading.flow.stream.TrapHandler - caught Throwable, no trap available, rethrowing | |
cascading.tuple.TupleException: unable to sink into output identifier: sample/data/WC/out | |
at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:160) | |
at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:119) | |
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:71) | |
at cascading.tuple.TupleEntrySchemeCollector.add(TupleEntrySchemeCollector.java:134) | |
at cascading.flow.stream.SinkStage.receive(SinkStage.java:90) | |
at cascading.flow.stream.SinkStage.receive(SinkStage.java:37) | |
at cascading.flow.stream.SourceStage.map(SourceStage.java:102) | |
at cascading.flow.stream.SourceStage.run(SourceStage.java:58) | |
at cascading.flow.hadoop.FlowMapper.run(FlowMapper.java:127) | |
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:417) | |
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:332) | |
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:268) | |
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) | |
at java.util.concurrent.FutureTask.run(FutureTask.java:262) | |
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) | |
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) | |
at java.lang.Thread.run(Thread.java:744) | |
Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long | |
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:263) | |
at org.apache.avro.mapred.AvroOutputFormat$1.write(AvroOutputFormat.java:161) | |
at org.apache.avro.mapred.AvroOutputFormat$1.write(AvroOutputFormat.java:158) | |
at org.apache.hadoop.mapred.MapTask$DirectMapOutputCollector.collect(MapTask.java:716) | |
at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:526) | |
at cascading.tap.hadoop.util.MeasuredOutputCollector.collect(MeasuredOutputCollector.java:69) | |
at cascading.avro.AvroScheme.sink(AvroScheme.java:153) | |
at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:153) | |
... 16 more | |
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long | |
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:79) | |
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143) | |
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114) | |
at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175) | |
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104) | |
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66) | |
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143) | |
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58) | |
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:257) | |
... 23 more | |
1081 [pool-2-thread-1] ERROR cascading.flow.stream.SourceStage - caught throwable | |
cascading.tuple.TupleException: unable to sink into output identifier: sample/data/WC/out | |
at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:160) | |
at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:119) | |
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:71) | |
at cascading.tuple.TupleEntrySchemeCollector.add(TupleEntrySchemeCollector.java:134) | |
at cascading.flow.stream.SinkStage.receive(SinkStage.java:90) | |
at cascading.flow.stream.SinkStage.receive(SinkStage.java:37) | |
at cascading.flow.stream.SourceStage.map(SourceStage.java:102) | |
at cascading.flow.stream.SourceStage.run(SourceStage.java:58) | |
at cascading.flow.hadoop.FlowMapper.run(FlowMapper.java:127) | |
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:417) | |
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:332) | |
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:268) | |
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) | |
at java.util.concurrent.FutureTask.run(FutureTask.java:262) | |
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) | |
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) | |
at java.lang.Thread.run(Thread.java:744) | |
Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long | |
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:263) | |
at org.apache.avro.mapred.AvroOutputFormat$1.write(AvroOutputFormat.java:161) | |
at org.apache.avro.mapred.AvroOutputFormat$1.write(AvroOutputFormat.java:158) | |
at org.apache.hadoop.mapred.MapTask$DirectMapOutputCollector.collect(MapTask.java:716) | |
at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:526) | |
at cascading.tap.hadoop.util.MeasuredOutputCollector.collect(MeasuredOutputCollector.java:69) | |
at cascading.avro.AvroScheme.sink(AvroScheme.java:153) | |
at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:153) | |
... 16 more | |
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long | |
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:79) | |
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143) | |
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114) | |
at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175) | |
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104) | |
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66) | |
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143) | |
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58) | |
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:257) | |
... 23 more | |
1082 [Thread-11] INFO org.apache.hadoop.mapred.LocalJobRunner - Map task executor complete. | |
1083 [Thread-11] WARN org.apache.hadoop.mapred.LocalJobRunner - job_local613149809_0001 | |
java.lang.Exception: cascading.tuple.TupleException: unable to sink into output identifier: sample/data/WC/out | |
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:406) | |
Caused by: cascading.tuple.TupleException: unable to sink into output identifier: sample/data/WC/out | |
at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:160) | |
at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:119) | |
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:71) | |
at cascading.tuple.TupleEntrySchemeCollector.add(TupleEntrySchemeCollector.java:134) | |
at cascading.flow.stream.SinkStage.receive(SinkStage.java:90) | |
at cascading.flow.stream.SinkStage.receive(SinkStage.java:37) | |
at cascading.flow.stream.SourceStage.map(SourceStage.java:102) | |
at cascading.flow.stream.SourceStage.run(SourceStage.java:58) | |
at cascading.flow.hadoop.FlowMapper.run(FlowMapper.java:127) | |
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:417) | |
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:332) | |
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:268) | |
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) | |
at java.util.concurrent.FutureTask.run(FutureTask.java:262) | |
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) | |
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) | |
at java.lang.Thread.run(Thread.java:744) | |
Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long | |
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:263) | |
at org.apache.avro.mapred.AvroOutputFormat$1.write(AvroOutputFormat.java:161) | |
at org.apache.avro.mapred.AvroOutputFormat$1.write(AvroOutputFormat.java:158) | |
at org.apache.hadoop.mapred.MapTask$DirectMapOutputCollector.collect(MapTask.java:716) | |
at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:526) | |
at cascading.tap.hadoop.util.MeasuredOutputCollector.collect(MeasuredOutputCollector.java:69) | |
at cascading.avro.AvroScheme.sink(AvroScheme.java:153) | |
at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:153) | |
... 16 more | |
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long | |
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:79) | |
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143) | |
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114) | |
at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175) | |
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104) | |
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66) | |
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143) | |
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58) | |
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:257) | |
... 23 more | |
5960 [pool-1-thread-1] WARN cascading.flow.FlowStep - [wc] task completion events identify failed tasks | |
5960 [pool-1-thread-1] WARN cascading.flow.FlowStep - [wc] task completion events count: 0 | |
5964 [flow wc] INFO cascading.flow.Flow - [wc] stopping all jobs | |
5964 [flow wc] INFO cascading.flow.FlowStep - [wc] stopping: (1/1) sample/data/WC/out | |
5964 [flow wc] INFO cascading.flow.Flow - [wc] stopped all jobs | |
5965 [flow wc] INFO cascading.tap.hadoop.util.Hadoop18TapUtil - deleting temp path sample/data/WC/out/_temporary | |
Exception in thread "main" cascading.flow.FlowException: local step failed | |
at cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:212) | |
at cascading.flow.planner.FlowStepJob.start(FlowStepJob.java:145) | |
at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:120) | |
at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:42) | |
at java.util.concurrent.FutureTask.run(FutureTask.java:262) | |
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) | |
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) | |
at java.lang.Thread.run(Thread.java:744) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Chris, | |
thanks for suggestion. | |
i tried supplying type info via Fields class , but it did not resolve issue - same classcastexception. i guess AvroSchema is not using type info supplied to it ? | |
Ken, | |
can you pls comment on this. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class | |
WordCountAvroWrite | |
{ | |
public static void | |
main( String[] args ) throws Exception | |
{ | |
String docPath = "sample/data/WC/input.txt"; | |
String wcPath = "sample/data/WC/out"; | |
String schemaFile = "sample/data/WC/input.avsc"; | |
Schema schema = new Schema.Parser().parse(new File(schemaFile)); | |
Properties properties = new Properties(); | |
AppProps.setApplicationJarClass( properties, WordCountAvroWrite.class ); | |
HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties ); | |
Fields timeField = new Fields("Time"); | |
Fields userField = new Fields("User_ID").applyTypes(long.class); | |
Fields outputFields = timeField.append(userField); | |
Tap docTap = new Hfs( new TextDelimited(outputFields,true,","),docPath ); | |
Tap wcTap = new Hfs( new AvroScheme( schema), wcPath ,SinkMode.REPLACE); | |
Pipe copyPipe = new Pipe("wc"); | |
// copyPipe = new Coerce(copyPipe,userField); | |
FlowDef flowDef = FlowDef.flowDef() | |
.setName( "wc" ) | |
.addSource( copyPipe, docTap ) | |
.addTailSink( copyPipe, wcTap ); | |
Flow wcFlow = flowConnector.connect( flowDef ); | |
wcFlow.complete(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment