Last active
May 17, 2017 09:13
-
-
Save btiernay/1ad5e3dea08904fe07d9 to your computer and use it in GitHub Desktop.
Example of extracting information from HDFS paths in a Spark transformation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Input | |
val hadoopRDD = javaTextFile(context, projectPaths); | |
// Transform | |
val transformed = hadoopRDD.mapPartitionsWithInputSplit(new PreProcessLine(), false); | |
// ... | |
private static JavaHadoopRDD<LongWritable, Text> javaTextFile(JavaSparkContext context, String paths) { | |
return (JavaHadoopRDD<LongWritable, Text>) context.hadoopFile(paths, TextInputFormat.class, LongWritable.class, | |
Text.class); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.icgc.dcc.etl.staging.function; | |
import static com.google.common.base.Stopwatch.createStarted; | |
import static com.google.common.collect.Iterables.toArray; | |
import static org.icgc.dcc.common.core.util.FormatUtils.formatCount; | |
import static org.icgc.dcc.common.core.util.FormatUtils.formatPercent; | |
import static org.icgc.dcc.common.core.util.Splitters.TAB; | |
import java.io.Serializable; | |
import java.util.Iterator; | |
import lombok.val; | |
import lombok.extern.slf4j.Slf4j; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapred.FileSplit; | |
import org.apache.hadoop.mapred.InputSplit; | |
import org.apache.spark.api.java.function.Function2; | |
import scala.Tuple2; | |
@Slf4j | |
public class PreProcessLine implements | |
Function2<InputSplit, Iterator<Tuple2<LongWritable, Text>>, Iterator<String[]>>, Serializable { | |
/** | |
* Constants. | |
*/ | |
private static final int LINE_STATUS_COUNT = 10 * 1000 * 1000; | |
@Override | |
public Iterator<String[]> call(final InputSplit split, final Iterator<Tuple2<LongWritable, Text>> it) | |
throws Exception { | |
val watch = createStarted(); | |
val fileSplit = (FileSplit) split; | |
val length = fileSplit.getLength(); | |
val projectName = getProjectName(fileSplit); | |
// Lazy iterator | |
return new Iterator<String[]>() { | |
private long lineCount = 0; | |
private Tuple2<LongWritable, Text> record; | |
@Override | |
public boolean hasNext() { | |
if (!it.hasNext()) { | |
return false; | |
} | |
// Peek | |
record = it.next(); | |
lineCount++; | |
if (lineCount % LINE_STATUS_COUNT == 0) { | |
val offset = getOffset(record); | |
val percent = offset * 1.0 / length; | |
log.info("{}: Processed {} lines ({} %) in {}", | |
fileSplit, formatCount(lineCount), formatPercent(percent), watch); | |
} | |
// Skip headers | |
if (isHeader(record)) { | |
return hasNext(); | |
} | |
return true; | |
} | |
@Override | |
public String[] next() { | |
val line = getLine(record); | |
val prependedLine = projectName + "\t" + line; | |
String[] values = toArray(TAB.split(prependedLine), String.class); | |
return values; | |
} | |
@Override | |
public void remove() { | |
throw new UnsupportedOperationException("Cannot remove a transformed iterator"); | |
} | |
}; | |
} | |
private static boolean isHeader(Tuple2<LongWritable, Text> record) { | |
return getOffset(record) == 0; | |
} | |
private static long getOffset(Tuple2<LongWritable, Text> record) { | |
return record._1.get(); | |
} | |
private static String getLine(Tuple2<LongWritable, Text> record) { | |
return record._2.toString(); | |
} | |
private static String getProjectName(FileSplit fileSplit) { | |
return fileSplit.getPath().getParent().getName(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment