Skip to content

Instantly share code, notes, and snippets.

@fabsta
Last active June 20, 2018 05:05
Show Gist options
  • Save fabsta/56150d8e68843316f87ebb6450567ceb to your computer and use it in GitHub Desktop.
Save fabsta/56150d8e68843316f87ebb6450567ceb to your computer and use it in GitHub Desktop.
Spark snippets
# Hive
```
```
# Oracle
```
val oracle_db = sqlContext.load("jdbc", Map("url" -> "jdbc:oracle:thin:user/passwd@//server.com:1526/serviceID.com", "dbtable" -> "table"))
```
# Elasticsearch
val elasticOptions = Map("es.mapping.id" -> "id",
"es.nodes" -> "localhost",
"es.port" -> "9200",
"es.index.auto.create" -> "yes"
)
dataFrame.write.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Append)
.options(elasticOptions)
.save(s"$elasticIndex/$elasticMapping")
# Cassandra
val cassandraOptions = Map("table" -> cassandraTable, "keyspace" -> cassandraKeyspace)
dataFrame.write
.format("org.apache.spark.sql.cassandra")
.mode(SaveMode.Append)
.options(cassandraOptions)
.save()

Config

tmp-space

spark.local.dir                     SOME/DIR/WHERE/YOU/HAVE/SPACE
Scala: spark-shell --executor-memory 4G --driver-memory 4G
Python: pyspark --executor-memory 16G --driver-memory 16G --num-executors 8

Imports

To make $ work in selects
import sqlContext.implicits._

Input/Output

External

Read Parquet

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
parquetFile = sqlContext.read.parquet("people.parquet")

Save as Parquet

df.write.parquet("/user/spark/data/parquet1000g/"+tablename)

Hive

from pyspark.sql import HiveContext
hive_context = HiveContext(sc)
bank = hive_context.table("default.bank")
bank.show()
bank.registerTempTable("bank_temp")
hive_context.sql("select * from bank_temp").show()

SQL

pyspark: link

Table

table1.printSchema()
table1.head(5)
table1.head(5,truncate= True)

# count
table1.count()

# columns
len(table1.columns)
table1.describe('Product_ID').show()

# stats
table1.describe().show()

Selecting

#Column
table1.select('User_ID','Age').show(5)
table1.select('Product_ID').distinct().count()     # distinct

# Condition
table1.filter(train.Purchase > 15000).count()     # filter
df.filter("ANALYSIS_ID = 21940523").show()



outputPartition['partition']       # for DFs from Hive tables
# GroupBy
table1.groupby('Age').agg({'Purchase': 'mean'}).show()   # groupby
table1.groupby('Age').count().show()

# is in
import sqlContext.implicits._
df.where($"type".isin("type1","type2") and $"status".isin("completed","inprogress"))

# Join
df.join(df2, 'name').select(df.name, df2.height).collect()
# alias
df1.alias('a').join(df2.alias('b'),col('b.id') = col('a.id')).select([col('a.'+xx) for xx in a.columns] + [col('b.other1'),col('b.other2')])


# Sample
df.sample(False, 0.5, 42).count()   # subset


Manipulating

table1.select('Age','Gender').dropDuplicates().show()   # drop duplicates
table1.dropna().count()  # drop with null values
table1.fillna(-1).show(2)   # fill na values

Optimization

Partition

df_t1.rdd.getNumPartitions()

Broadcasting

from pyspark.sql.functions import broadcast
large_df.join(broadcast(small_df), ["foo"])

Spark-shell

Within spark-shell

# scala
:load PATH_TO_FILE

External script

# scala
spark-shell -i file.scala

/usr/bin/spark-submit --master yarn-client --queue default \
    --num-executors 20 --executor-memory 1G --executor-cores 2 \
    --driver-memory 1G \
    go_to_sleep.py

Web UI

via spark-shell
 sc._jsc.sc().uiWebUrl().get()

Standalone app

application template python

## Imports
from pyspark import SparkConf, SparkContext
## CONSTANTS
APP_NAME = "My Spark Application"
##OTHER FUNCTIONS/CLASSES

## Main functionality
def main(sc):
rdd = sc.parallelize(range(1000), 10)
print rdd.mean()

if __name__ == "__main__":
    # Configure OPTIONS
    conf = SparkConf().setAppName(APP_NAME)
    conf = conf.setMaster("local[*]")
    #in cluster this will be like
    #"spark://ec2-0-17-03-078.compute-#1.amazonaws.com:7077"
    sc   = SparkContext(conf=conf)
    # Execute Main functionality
    main(sc)

run with

spark-submit hello.py abctext.txt
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment