Last active
April 20, 2023 18:08
-
-
Save yashk/ed11fc712fece0c259a018e2a26c23b7 to your computer and use it in GitHub Desktop.
spark underscore
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// https://github.com/apache/spark/blob/01dc1cb491aefd30ffde6b01416175e2ac1b881a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L765 | |
if (checkFilesExist) { | |
val (filteredOut, filteredIn) = allPaths.partition { path => | |
HadoopFSUtils.shouldFilterOutPathName(path.getName) | |
} | |
if (filteredIn.isEmpty) { | |
logWarning( | |
s"All paths were ignored:\n ${filteredOut.mkString("\n ")}") | |
} else { | |
logDebug( | |
s"Some paths were ignored:\n ${filteredOut.mkString("\n ")}") | |
} | |
} | |
allPaths | |
} | |
// https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala#L299 | |
/** Checks if we should filter out this path name. */ | |
def shouldFilterOutPathName(pathName: String): Boolean = { | |
// We filter follow paths: | |
// 1. everything that starts with _ and ., except _common_metadata and _metadata | |
// because Parquet needs to find those metadata files from leaf files returned by this method. | |
// We should refactor this logic to not mix metadata files with data files. | |
// 2. everything that ends with `._COPYING_`, because this is a intermediate state of file. we | |
// should skip this file in case of double reading. | |
val exclude = (pathName.startsWith("_") && !pathName.contains("=")) || | |
pathName.startsWith(".") || pathName.endsWith("._COPYING_") | |
val include = pathName.startsWith("_common_metadata") || pathName.startsWith("_metadata") | |
exclude && !include | |
} | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
scala> val hadoopConf = spark.sparkContext.hadoopConfiguration | |
hadoopConf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, __spark_hadoop_conf__.xml | |
scala> val className = FileInputFormat.PATHFILTER_CLASS | |
className: String = mapreduce.input.pathFilter.class | |
scala> spark.sparkContext.hadoopConfiguration.setClass(FileInputFormat.PATHFILTER_CLASS, classOf[underscore.TmpFileFilter], classOf[org.apache.hadoop.fs.PathFilter]) | |
scala> val t1 = spark.read.textFile("hdfs://hdfs/tmp/yash/underscore/_test.txt") | |
23/04/20 17:34:25 WARN DataSource: All paths were ignored: | |
hdfs://hdfs/tmp/yash/underscore/_test.txt | |
t1: org.apache.spark.sql.Dataset[String] = [value: string] | |
scala> val test = spark.read.text("hdfs://hdfs/tmp/yash/underscore/_test.txt") | |
23/04/20 17:53:50 WARN DataSource: All paths were ignored: | |
hdfs://hdfs/tmp/yash/underscore/_test.txt | |
test: org.apache.spark.sql.DataFrame = [value: string] | |
scala> test.show(false) | |
+-----+ | |
|value| | |
+-----+ | |
+-----+ | |
scala> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package underscore | |
import org.apache.hadoop.fs.{Path, PathFilter} | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat | |
import org.apache.spark.sql.SparkSession | |
class TmpFileFilter extends PathFilter { | |
override def accept(path: Path): Boolean = { | |
println(path) | |
true | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment