Last active
May 24, 2019 12:18
-
-
Save hakanilter/0cc72facaee0f5fa125ae3acaf6199b4 to your computer and use it in GitHub Desktop.
Creating DataFrame from weird CSV files
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hadoop.conf.Configuration | |
import org.apache.hadoop.mapreduce.Job | |
import org.apache.hadoop.io.{LongWritable, Text} | |
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat | |
def wcsv_to_df( | |
fileName: String, | |
tableName: String, | |
columns: Array[String], | |
fieldTerminator: String, | |
lineTerminator: String | |
) = { | |
val conf = new Configuration(sc.hadoopConfiguration) | |
conf.set("textinputformat.record.delimiter", lineTerminator) | |
// Read csv | |
val rdd = sc.newAPIHadoopFile(fileName, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf) | |
.map { case (_, text) => text.toString } | |
.map(_.replaceAll(fieldTerminator, "\t")) | |
// Convert it to DF | |
val df = spark.read | |
.option("sep", "\t") | |
.option("inferSchema", "true") | |
.csv(rdd.toDS) | |
.toDF(columns: _*) | |
df.createOrReplaceTempView(tableName) | |
df | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment