Last active
September 9, 2021 02:39
-
-
Save joemcmahon/fb41b55fd6262f79c49a3354c532924e to your computer and use it in GitHub Desktop.
A syntax-corrected version of the code in https://stackoverflow.com/a/46594963/39791
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* | |
* Write a dataframe to CSV at the location specified. Assumes that the contents | |
* of the dataframe being written are small enough to fit in memory on the Spark | |
* master. (I use this for deequ suggested checks, which is usually less than 30 | |
* or so entries.) | |
* | |
*/ | |
import org.apache.spark.sql._ | |
import org.apache.hadoop.fs.{FileSystem, Path} | |
import org.apache.hadoop.conf.Configuration | |
import java.io.{BufferedWriter, OutputStreamWriter} | |
val SPARK_WRITE_LOCATION = "/tmp" | |
def saveCSV(spark: SparkSession, results : DataFrame, filename: String) { | |
var fs = FileSystem.get(spark.sparkContext.hadoopConfiguration) | |
if (spark.conf.get("spark.master").toString.contains("local")) { | |
fs = FileSystem.getLocal(new Configuration()) | |
} | |
val tempWritePath = new Path(SPARK_WRITE_LOCATION) | |
if (fs.exists(tempWritePath)) { | |
val x = fs.delete(new Path(SPARK_WRITE_LOCATION), true) | |
assert(x) | |
} | |
if (results.count > 0) { | |
val hadoopFilepath = new Path(SPARK_WRITE_LOCATION, filename) | |
val writeStream = fs.create(hadoopFilepath, true) | |
val bw = new BufferedWriter( new OutputStreamWriter( writeStream, "UTF-8" ) ) | |
val x = results.collect() | |
for (row : Row <- x) { | |
val rowString = row.mkString(start = "", sep = ",", end="\n") | |
bw.write(rowString) | |
} | |
bw.close() | |
writeStream.close() | |
val resultsWritePath = new Path(SPARK_WRITE_LOCATION, filename) | |
if (fs.exists(resultsWritePath)) { | |
fs.delete(resultsWritePath, true) | |
} | |
fs.copyToLocalFile(false, hadoopFilepath, resultsWritePath, true) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment