tmp-space
spark.local.dir SOME/DIR/WHERE/YOU/HAVE/SPACE
Scala: spark-shell --executor-memory 4G --driver-memory 4G
Python: pyspark --executor-memory 16G --driver-memory 16G --num-executors 8
To make $ work in selects
import sqlContext.implicits._
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
parquetFile = sqlContext.read.parquet("people.parquet")
df.write.parquet("/user/spark/data/parquet1000g/"+tablename)
from pyspark.sql import HiveContext
hive_context = HiveContext(sc)
bank = hive_context.table("default.bank")
bank.show()
bank.registerTempTable("bank_temp")
hive_context.sql("select * from bank_temp").show()
pyspark: link
table1.printSchema()
table1.head(5)
table1.head(5,truncate= True)
# count
table1.count()
# columns
len(table1.columns)
table1.describe('Product_ID').show()
# stats
table1.describe().show()
#Column
table1.select('User_ID','Age').show(5)
table1.select('Product_ID').distinct().count() # distinct
# Condition
table1.filter(train.Purchase > 15000).count() # filter
df.filter("ANALYSIS_ID = 21940523").show()
outputPartition['partition'] # for DFs from Hive tables
# GroupBy
table1.groupby('Age').agg({'Purchase': 'mean'}).show() # groupby
table1.groupby('Age').count().show()
# is in
import sqlContext.implicits._
df.where($"type".isin("type1","type2") and $"status".isin("completed","inprogress"))
# Join
df.join(df2, 'name').select(df.name, df2.height).collect()
# alias
df1.alias('a').join(df2.alias('b'),col('b.id') = col('a.id')).select([col('a.'+xx) for xx in a.columns] + [col('b.other1'),col('b.other2')])
# Sample
df.sample(False, 0.5, 42).count() # subset
table1.select('Age','Gender').dropDuplicates().show() # drop duplicates
table1.dropna().count() # drop with null values
table1.fillna(-1).show(2) # fill na values
df_t1.rdd.getNumPartitions()
from pyspark.sql.functions import broadcast
large_df.join(broadcast(small_df), ["foo"])
# scala
:load PATH_TO_FILE
# scala
spark-shell -i file.scala
/usr/bin/spark-submit --master yarn-client --queue default \
--num-executors 20 --executor-memory 1G --executor-cores 2 \
--driver-memory 1G \
go_to_sleep.py
via spark-shell
sc._jsc.sc().uiWebUrl().get()
application template python
## Imports
from pyspark import SparkConf, SparkContext
## CONSTANTS
APP_NAME = "My Spark Application"
##OTHER FUNCTIONS/CLASSES
## Main functionality
def main(sc):
rdd = sc.parallelize(range(1000), 10)
print rdd.mean()
if __name__ == "__main__":
# Configure OPTIONS
conf = SparkConf().setAppName(APP_NAME)
conf = conf.setMaster("local[*]")
#in cluster this will be like
#"spark://ec2-0-17-03-078.compute-#1.amazonaws.com:7077"
sc = SparkContext(conf=conf)
# Execute Main functionality
main(sc)
run with
spark-submit hello.py abctext.txt