Skip to content

Instantly share code, notes, and snippets.

@mannharleen
Last active September 20, 2017 15:07
Show Gist options
  • Save mannharleen/69342cba411b0109f0f281270ecef206 to your computer and use it in GitHub Desktop.
Save mannharleen/69342cba411b0109f0f281270ecef206 to your computer and use it in GitHub Desktop.
When case classes cannot be created in advance. e.g. schema is being read from a JSON file, we cannot use reflection method i.e. cannot use rdd.map(x=> case_class(x._1,x._2)).toDF()
//Programmatically Specifying the Schema
import org.apache.spark.{SparkContext,SparkConf}
import org.apache.spark.sql.hive.HiveContext
//initializations
val conf = new SparkConf().setAppName("xx").setMaster("local[2]")
val sc = new SparkContext(conf)
val hiveContext = new HiveContext(sc)
//
//create DF from RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, IntegerType}
val schema = StructType( StructField("column", IntegerType, false) :: Nil)
//pyspark:
//from pyspark.sql.types import IntegerType,StructType,StructField
//qlContext.createDataFrame(rdd,StructType([StructField("col1",IntegerType(),False),StructField("col2",IntegerType(),False)]))
val df = hiveContext.createDataFrame(rdd.map(x=> Row(x)),schema)
df.show
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment