Last active
December 6, 2019 18:54
-
-
Save FabioBatSilva/d6b168a01cf4ba991d9e77881c5ea1e5 to your computer and use it in GitHub Desktop.
Delta lake SymlinkTextInputFormat Manifest Generation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.a3k.dw.tracking.driver; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.FSDataOutputStream; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.spark.SparkContext; | |
import org.apache.spark.sql.SparkSession; | |
import org.apache.spark.sql.delta.DeltaLog; | |
import org.apache.spark.sql.delta.Snapshot; | |
import org.apache.spark.sql.delta.actions.AddFile; | |
import java.io.IOException; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.stream.Collectors; | |
public class SymlinkManifestWriter { | |
private static Path fragmentPath(final AddFile file) { | |
return new Path(file.path()).getParent(); | |
} | |
private static Path createManifestPath(final Path dataPath) { | |
return new Path(dataPath, "_symlink_format_manifest"); | |
} | |
private static Path createFragmentPath(final Path manifestPath, final Path fragment) { | |
return new Path(new Path(manifestPath, fragment), "manifest.txt"); | |
} | |
private static String createManifestContent(final Path dataPath, final List<AddFile> files) { | |
return files.stream() | |
.map(f -> new Path(dataPath, f.path())) | |
.map(Path::toString) | |
.collect(Collectors.joining("\n")); | |
} | |
private static Map<Path, List<AddFile>> groupByPartitions(final Snapshot snapshot) { | |
return snapshot.allFiles() | |
.collectAsList() | |
.stream() | |
.collect(Collectors.groupingBy(SymlinkManifestWriter::fragmentPath)); | |
} | |
private static void writeFile(final FileSystem fs, final Path path, final String content) throws IOException { | |
try (FSDataOutputStream stream = fs.create(path, true)) { | |
if (!fs.exists(path.getParent())) { | |
fs.mkdirs(path.getParent()); | |
} | |
stream.writeBytes(content); | |
} | |
} | |
public static void write(final FileSystem fs, final Snapshot snapshot) throws IOException { | |
final Path dataPath = snapshot.deltaLog().dataPath(); | |
final Path manifestPath = createManifestPath(dataPath); | |
final Map<Path, List<AddFile>> groups = groupByPartitions(snapshot); | |
for (Map.Entry<Path, List<AddFile>> e : groups.entrySet()) { | |
final Path path = createFragmentPath(manifestPath, e.getKey()); | |
final String content = createManifestContent(dataPath, e.getValue()); | |
writeFile(fs, path, content); | |
} | |
} | |
public static void write(final FileSystem fs, final DeltaLog log) throws IOException { | |
write(fs, log.snapshot()); | |
} | |
public static void write(final SparkSession spark, final DeltaLog log) throws IOException { | |
final SparkContext sparkContext = spark.sparkContext(); | |
final Configuration hadoopConf = sparkContext.hadoopConfiguration(); | |
final FileSystem fs = FileSystem.get(log.dataPath().toUri(), hadoopConf); | |
write(fs, log); | |
} | |
public static void write(final SparkSession spark, final String dataPath) throws IOException { | |
write(spark, DeltaLog.forTable(spark, dataPath)); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment