Created
January 26, 2019 07:54
-
-
Save zhangyuan/3ec3b76e58549e76113ea95128235f30 to your computer and use it in GitHub Desktop.
OverwriteCSVSink
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
example.SinkProvider |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package example | |
import org.apache.spark.sql.execution.streaming.Sink | |
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider} | |
import org.apache.spark.sql.streaming.OutputMode | |
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} | |
class OverwriteCSVSink(sqlContext: SQLContext, | |
parameters: Map[String, String], | |
partitionColumns: Seq[String], | |
outputMode: OutputMode) extends Sink { | |
override def addBatch(batchId: Long, data: DataFrame): Unit = { | |
data.sparkSession.createDataFrame( | |
data.sparkSession.sparkContext.parallelize(data.collect()), data.schema) | |
.repartition(1) | |
.write | |
.mode(SaveMode.Overwrite) | |
.format("csv") | |
.option("header", parameters.get("header").orNull) | |
.option("truncate", parameters.get("truncate").orNull) | |
.option("checkpointLocation", parameters.get("checkpointLocation").orNull) | |
.option("path", parameters.get("path").orNull) | |
.save() | |
} | |
} | |
class SinkProvider extends StreamSinkProvider | |
with DataSourceRegister { | |
override def createSink( | |
sqlContext: SQLContext, | |
parameters: Map[String, String], | |
partitionColumns: Seq[String], | |
outputMode: OutputMode): Sink = { | |
new OverwriteCSVSink(sqlContext, parameters, partitionColumns, outputMode) | |
} | |
override def shortName(): String = "overwriteCSV" | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment