Skip to content

Instantly share code, notes, and snippets.

@saswata-dutta
Forked from jfrazee/FilterBadGzipFiles.scala
Created September 5, 2019 12:28
Show Gist options
  • Save saswata-dutta/a2789d2235acad95a1e4ec93183cc1be to your computer and use it in GitHub Desktop.
Save saswata-dutta/a2789d2235acad95a1e4ec93183cc1be to your computer and use it in GitHub Desktop.
Spark job to read gzip files, ignoring corrupted files
import java.io._
import scala.io._
import java.util.zip._
// Spark
import org.slf4j.Logger
import org.apache.spark.{ SparkConf, SparkContext, Logging }
// Hadoop
import org.apache.hadoop.io.compress.GzipCodec
object FilterBadGzipFiles extends Logging {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
val sc = new SparkContext(sparkConf)
val files = sc.binaryFiles(args(0))
val lines =
files.flatMap {
case (path, stream) =>
try {
val is =
if (path.toLowerCase.endsWith(".gz"))
new GZIPInputStream(stream.open)
else
stream.open
try {
Source.fromInputStream(is).getLines.toList
} finally {
try { is.close } catch { case _: Throwable => }
}
} catch {
case e: Throwable =>
log.warn(s"error reading from ${path}: ${e.getMessage}", e)
List.empty[String]
}
}
lines.saveAsTextFile(args(1), classOf[GzipCodec])
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment