Skip to content

Instantly share code, notes, and snippets.

@leeyc0
Last active July 24, 2023 14:44
Show Gist options
  • Save leeyc0/2bdab65901fe5754c471832acdc00890 to your computer and use it in GitHub Desktop.
Save leeyc0/2bdab65901fe5754c471832acdc00890 to your computer and use it in GitHub Desktop.
SPARK-44512
import java.io.IOException;
import java.util.List;
import java.util.function.Function;
import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Tuple3;
// To compile: javac Test.java && jar cvf Test.jar Test.class
// bug: spark-submit --class Test Test.jar
// no bug: spark-submit --class Test Test.jar workaround
public class Test {
public static void main(String args[]) throws IOException {
final var spark = SparkSession
.builder()
.config("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
.getOrCreate();
final var hadoopConf = spark.sparkContext().hadoopConfiguration();
final var fs = FileSystem.get(hadoopConf);
fs.setWriteChecksum(false);
// create a minimal dataset that is enough to reproduce the bug
// The three columns are named _1, _2, and _3 (the field names of Tuple3)
var dataset = spark.createDataset(List.of(
new Tuple3<Long, String, String>(3L, "a", "r"),
new Tuple3<Long, String, String>(3L, "b", "r"),
new Tuple3<Long, String, String>(2L, "b", "q"),
new Tuple3<Long, String, String>(2L, "a", "q"),
new Tuple3<Long, String, String>(1L, "a", "p"),
new Tuple3<Long, String, String>(1L, "b", "p")
),
Encoders.tuple(Encoders.LONG(), Encoders.STRING(), Encoders.STRING()))
.sort("_1")
.select("_2", "_3");
// This is an identity mapper, i.e. returns itself
// Enabled by adding an argument "workaround" when executing spark-submit.
// With AQE enabled, .sort() will work as intended only if this identity mapper
// is inserted between .sort() and .select() in the pipeline
if (args.length > 0 && args[0].equals("workaround")) {
dataset = dataset.map((MapFunction<Row, Row>) row -> row, dataset.encoder());
}
// output column _3 to text files, partitioned by column _2
// _1 is only for sorting purpose, not used in output
// output will not be sorted without the identity mapper
dataset.write()
.mode("overwrite")
.partitionBy("_2")
.text("output");
dataset.explain();
spark.close();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment