Created
March 6, 2017 05:07
-
-
Save dmpetrov/a4a5dc2cc8719619410e37dedde5130e to your computer and use it in GitHub Desktop.
Save Spark dataframe to a single CSV file
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Code for blogpost: | |
# https://fullstackml.com/2015/12/21/how-to-export-data-frame-from-apache-spark/ | |
def saveDfToCsv(df: DataFrame, tsvOutput: String, | |
sep: String = ",", header: Boolean = false): Unit = { | |
val tmpParquetDir = "Posts.tmp.parquet" | |
df.repartition(1).write. | |
format("com.databricks.spark.csv"). | |
option("header", header.toString). | |
option("delimiter", sep). | |
save(tmpParquetDir) | |
val dir = new File(tmpParquetDir) | |
val tmpTsvFile = tmpParquetDir + File.separatorChar + "part-00000" | |
(new File(tmpTsvFile)).renameTo(new File(tsvOutput)) | |
dir.listFiles.foreach( f => f.delete ) | |
dir.delete | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I kindly request for a python equivalent, I have tried severally to save pyspark dataframe to csv without succcess.
I run spark on my local machine.
Thanks very much!!!
Py4JJavaError Traceback (most recent call last)
in ()
1 out_path = "C:/Users/NG005454/OneDrive - CCHellenic/Documents/Python_Exercise/new_territory"
2 customer_ps1_terr = customer_data.select(customer_data.demand_area_name,customer_data.sub_demand_area_name,customer_data.ps1_territory_id).filter(customer_data.sub_demand_area_name.isin(sub_demand_area)).distinct()
----> 3 customer_ps1_terr.write.save(out_path, format="csv", header=True)
~\AppData\Local\Continuum\anaconda3\lib\site-packages\pyspark\sql\readwriter.py in save(self, path, format, mode, partitionBy, **options)
736 self._jwrite.save()
737 else:
--> 738 self._jwrite.save(path)
739
740 @SInCE(1.4)
~\AppData\Local\Continuum\anaconda3\lib\site-packages\py4j\java_gateway.py in call(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
~\AppData\Local\Continuum\anaconda3\lib\site-packages\pyspark\sql\utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
~\AppData\Local\Continuum\anaconda3\lib\site-packages\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o716.save.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 80.0 failed 1 times, most recent failure: Lost task 0.0 in stage 80.0 (TID 1193, localhost, executor driver): java.io.IOException: (null) entry in command string: null chmod 0644 C:\Users\NG005454\OneDrive - CCHellenic\Documents\Python_Exercise\new_territory_temporary\0_temporary\attempt_20200116102706_0080_m_000000_1193\part-00000-a8e4807b-ad43-4e16-9a00-5c7218423c6e-c000.csv
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:770)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:866)