Skip to content

Instantly share code, notes, and snippets.

@hakanilter
Last active May 24, 2019 12:18
Show Gist options
  • Save hakanilter/0cc72facaee0f5fa125ae3acaf6199b4 to your computer and use it in GitHub Desktop.
Save hakanilter/0cc72facaee0f5fa125ae3acaf6199b4 to your computer and use it in GitHub Desktop.
Creating DataFrame from weird CSV files
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
def wcsv_to_df(
fileName: String,
tableName: String,
columns: Array[String],
fieldTerminator: String,
lineTerminator: String
) = {
val conf = new Configuration(sc.hadoopConfiguration)
conf.set("textinputformat.record.delimiter", lineTerminator)
// Read csv
val rdd = sc.newAPIHadoopFile(fileName, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf)
.map { case (_, text) => text.toString }
.map(_.replaceAll(fieldTerminator, "\t"))
// Convert it to DF
val df = spark.read
.option("sep", "\t")
.option("inferSchema", "true")
.csv(rdd.toDS)
.toDF(columns: _*)
df.createOrReplaceTempView(tableName)
df
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment