Created
June 21, 2021 03:33
-
-
Save anjijava16/c4792d7e240e04b712e73ce1fba77f1c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.sql.types._ | |
// Create an RDD | |
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt") | |
// The schema is encoded in a string | |
val schemaString = "name age" | |
// Generate the schema based on the string of schema | |
val fields = schemaString.split(" ") | |
.map(fieldName => StructField(fieldName, StringType, nullable = true)) | |
val schema = StructType(fields) | |
// Convert records of the RDD (people) to Rows | |
val rowRDD = peopleRDD | |
.map(_.split(",")) | |
.map(attributes => Row(attributes(0), attributes(1).trim)) | |
// Apply the schema to the RDD | |
val peopleDF = spark.createDataFrame(rowRDD, schema) | |
// Creates a temporary view using the DataFrame | |
peopleDF.createOrReplaceTempView("people") | |
// SQL can be run over a temporary view created using DataFrames | |
val results = spark.sql("SELECT name FROM people") | |
// The results of SQL queries are DataFrames and support all the normal RDD operations | |
// The columns of a row in the result can be accessed by field index or by field name | |
results.map(attributes => "Name: " + attributes(0)).show() | |
// +-------------+ | |
// | value| | |
// +-------------+ | |
// |Name: Michael| | |
// | Name: Andy| | |
// | Name: Justin| | |
// +-------------+ | |
# Import data types | |
from pyspark.sql.types import * | |
sc = spark.sparkContext | |
# Load a text file and convert each line to a Row. | |
lines = sc.textFile("examples/src/main/resources/people.txt") | |
parts = lines.map(lambda l: l.split(",")) | |
# Each line is converted to a tuple. | |
people = parts.map(lambda p: (p[0], p[1].strip())) | |
# The schema is encoded in a string. | |
schemaString = "name age" | |
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()] | |
schema = StructType(fields) | |
# Apply the schema to the RDD. | |
schemaPeople = spark.createDataFrame(people, schema) | |
# Creates a temporary view using the DataFrame | |
schemaPeople.createOrReplaceTempView("people") | |
# SQL can be run over DataFrames that have been registered as a table. | |
results = spark.sql("SELECT name FROM people") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment