Skip to content

Instantly share code, notes, and snippets.

@FabioBatSilva
Last active December 6, 2019 18:54
Show Gist options
  • Save FabioBatSilva/d6b168a01cf4ba991d9e77881c5ea1e5 to your computer and use it in GitHub Desktop.
Save FabioBatSilva/d6b168a01cf4ba991d9e77881c5ea1e5 to your computer and use it in GitHub Desktop.
Delta lake SymlinkTextInputFormat Manifest Generation
package com.a3k.dw.tracking.driver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.delta.DeltaLog;
import org.apache.spark.sql.delta.Snapshot;
import org.apache.spark.sql.delta.actions.AddFile;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class SymlinkManifestWriter {
private static Path fragmentPath(final AddFile file) {
return new Path(file.path()).getParent();
}
private static Path createManifestPath(final Path dataPath) {
return new Path(dataPath, "_symlink_format_manifest");
}
private static Path createFragmentPath(final Path manifestPath, final Path fragment) {
return new Path(new Path(manifestPath, fragment), "manifest.txt");
}
private static String createManifestContent(final Path dataPath, final List<AddFile> files) {
return files.stream()
.map(f -> new Path(dataPath, f.path()))
.map(Path::toString)
.collect(Collectors.joining("\n"));
}
private static Map<Path, List<AddFile>> groupByPartitions(final Snapshot snapshot) {
return snapshot.allFiles()
.collectAsList()
.stream()
.collect(Collectors.groupingBy(SymlinkManifestWriter::fragmentPath));
}
private static void writeFile(final FileSystem fs, final Path path, final String content) throws IOException {
try (FSDataOutputStream stream = fs.create(path, true)) {
if (!fs.exists(path.getParent())) {
fs.mkdirs(path.getParent());
}
stream.writeBytes(content);
}
}
public static void write(final FileSystem fs, final Snapshot snapshot) throws IOException {
final Path dataPath = snapshot.deltaLog().dataPath();
final Path manifestPath = createManifestPath(dataPath);
final Map<Path, List<AddFile>> groups = groupByPartitions(snapshot);
for (Map.Entry<Path, List<AddFile>> e : groups.entrySet()) {
final Path path = createFragmentPath(manifestPath, e.getKey());
final String content = createManifestContent(dataPath, e.getValue());
writeFile(fs, path, content);
}
}
public static void write(final FileSystem fs, final DeltaLog log) throws IOException {
write(fs, log.snapshot());
}
public static void write(final SparkSession spark, final DeltaLog log) throws IOException {
final SparkContext sparkContext = spark.sparkContext();
final Configuration hadoopConf = sparkContext.hadoopConfiguration();
final FileSystem fs = FileSystem.get(log.dataPath().toUri(), hadoopConf);
write(fs, log);
}
public static void write(final SparkSession spark, final String dataPath) throws IOException {
write(spark, DeltaLog.forTable(spark, dataPath));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment