When using MultipleOutputs
with DeprecatedParquetOutputFormat
, you see this error:
java.lang.NullPointerException
at org.apache.hadoop.fs.Path.<init>(Path.java:105)
at org.apache.hadoop.fs.Path.<init>(Path.java:94)
at org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat.getDefaultWorkFile(DeprecatedParquetOutputFormat.java:69)
at org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat.access$100(DeprecatedParquetOutputFormat.java:36)
at org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat$RecordWriterWrapper.<init>(DeprecatedParquetOutputFormat.java:89)
at org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat.getRecordWriter(DeprecatedParquetOutputFormat.java:77)
at com.bloomberg.bdip.ParquetAvroMultipleOutputs$InternalFileOutputFormat.getRecordWriter(ParquetAvroMultipleOutputs.java:537)
at com.bloomberg.bdip.ParquetAvroMultipleOutputs.getRecordWriter(ParquetAvroMultipleOutputs.java:326)
at com.bloomberg.bdip.ParquetAvroMultipleOutputs.getCollector(ParquetAvroMultipleOutputs.java:483)
at com.bloomberg.bdip.ParquetAvroMultipleOutputs.collect(ParquetAvroMultipleOutputs.java:402)
at com.bloomberg.bdip.ParquetAvroMultipleOutputFormat$$anon$2.write(PartitionedOutputFormats.scala:124)
at com.bloomberg.bdip.ParquetAvroMultipleOutputFormat$$anon$2.write(PartitionedOutputFormats.scala:104)
at org.apache.spark.SparkHadoopWriter.write(SparkHadoopWriter.scala:96)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply$mcV$sp(PairRDDFunctions.scala:1199)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1197)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1197)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1250)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1205)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1185)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
The issue is that a specific key is not set in conf
by Spark that is usually set by the Hadoop MR framework.
From org.apache.hadoop.mapred.Task#initialize(...)
Path outputPath = FileOutputFormat.getOutputPath(conf);
if (outputPath != null) {
if ((committer instanceof FileOutputCommitter)) {
FileOutputFormat.setWorkOutputPath(conf,
((FileOutputCommitter)committer).getTaskAttemptPath(taskContext));
} else {
FileOutputFormat.setWorkOutputPath(conf, outputPath);
}
}