Skip to content

Instantly share code, notes, and snippets.

@zhangyuan
Created January 26, 2019 07:54
Show Gist options
  • Save zhangyuan/3ec3b76e58549e76113ea95128235f30 to your computer and use it in GitHub Desktop.
Save zhangyuan/3ec3b76e58549e76113ea95128235f30 to your computer and use it in GitHub Desktop.
OverwriteCSVSink
package example
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
class OverwriteCSVSink(sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode) extends Sink {
override def addBatch(batchId: Long, data: DataFrame): Unit = {
data.sparkSession.createDataFrame(
data.sparkSession.sparkContext.parallelize(data.collect()), data.schema)
.repartition(1)
.write
.mode(SaveMode.Overwrite)
.format("csv")
.option("header", parameters.get("header").orNull)
.option("truncate", parameters.get("truncate").orNull)
.option("checkpointLocation", parameters.get("checkpointLocation").orNull)
.option("path", parameters.get("path").orNull)
.save()
}
}
class SinkProvider extends StreamSinkProvider
with DataSourceRegister {
override def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
new OverwriteCSVSink(sqlContext, parameters, partitionColumns, outputMode)
}
override def shortName(): String = "overwriteCSV"
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment