Skip to content

Instantly share code, notes, and snippets.

@gbraccialli
Last active October 25, 2018 01:43
Show Gist options
  • Save gbraccialli/9c44ff4cdf526e31d4e968940ff22353 to your computer and use it in GitHub Desktop.
Save gbraccialli/9c44ff4cdf526e31d4e968940ff22353 to your computer and use it in GitHub Desktop.
sc.setLogLevel("ERROR")
//rename columns
val tmpDf = df.toDF(df.columns.map(x => x.toUpperCase): _*)
val dfNew = df.columns.foldLeft(df)((df, col) => df.withColumnRenamed(col, col + "x"))
val newSchema = StructType(df.schema.map(c => StructField(c.name+"xx", c.dataType, c.nullable)))
val dfNew = spark.createDataFrame(df.rdd, newSchema)
//add id to columns
df.toDF(df.columns.zipWithIndex.map(c => c._1 + '_' + c._2) : _*).show
//remove invalid characthers and put unique names for repeated colunm names
import org.apache.spark.sql.DataFrame
def dfWithCleanAndUniqueColumnNames(df: DataFrame) = {
import scala.collection.mutable.ArrayBuffer
val cols = ArrayBuffer[String]()
for (col <- df.columns){
def columnNameWithSequence(name: String, seq: Integer): String = {
val renamedColumn = if (seq == 1) name else (name + "_" + seq)
if (cols.contains(renamedColumn))
columnNameWithSequence(name, seq+1)
else
renamedColumn
}
cols += columnNameWithSequence(col.replaceAll("[^a-zA-Z0-9]", "_"), 1)
}
df.toDF(cols.toSeq : _*)
}
dfWithCleanAndUniqueColumnNames(df)
//set configs on the fly
spark.conf.set("spark.sql.shuffle.partitions","100")
//list conf
sc.hadoopConfiguration.get("mapred.output.committer.class")
sc.hadoopConfiguration.get("mapreduce.fileoutputcommitter.algorithm.version")
//list all variables in spark-shell
$intp.unqualifiedIds.foreach(println)
//broadcast threshold
spark.conf.set("spark.sql.conf.autoBroadcastJoinThreshold", 1000000000)
//local dir conf
--conf spark.local.dir=/tmp/xxx
//simple udf
def mod(id: Long, mod: Long) = {id % mod}
val udfmod = udf(mod _)
//or
val udfmod = udf((a:Int, b:Int) => a % b)
//or
df.select(udf{(a: Long) => a}.apply($"customer")).show()
//register parquet external table
spark.sqlContext.createExternalTable("test", "file:///xxxx")
sql("alter table test9 recover partitions")
spark.catalog.listTables.collect.foreach(t => {try{println(t.name);spark.sqlContext.sql("alter table " + t.name + " recover partitions ").show}catch{ case e: org.apache.spark.sql.AnalysisException => println("error")}})
//log time
def time[T](block: => T): T = {
val start = System.currentTimeMillis
val res = block
val totalTime = System.currentTimeMillis - start
println("Elapsed time: %.2f seconds".format(totalTime/1000.0))
res
}
time(spark.read.load("your_parquet").count)
//implicts, enhanced dataframe
object ImplicitsDF {
implicit class ExtendedDataFrame(df: org.apache.spark.sql.DataFrame) {
def saveS3(path: String) = {
df.coalesce(1).write.save(path)
}
}
}
import ImplicitsDF.ExtendedDataFrame
val dfTest = spark.sparkContext.parallelize(1 to 10).toDF
dfTest.saveS3("xx")
//df operations on multiple paths
val paths = Seq("test1sss", "test2", "test3xxx")
paths.foreach{path =>
println("processing: " + path)
try{
val df = spark.read.load(path)
//some df operation
}
catch{
case e: Exception => println("error:" + path + " :" + e.toString)}
}
}
//scala number formatting
println(f"asdfasdfasd $x%,d")
printf("asdfasdfasd %,d", x)
printf("asdfasdfasd %,2.2f", x.toDouble)
//spark session from intelliJ
val spark = SparkSession
.builder()
.master("local")
.appName("xxxxxxx")
.enableHiveSupport()
.getOrCreate()
import spark.sqlContext.implicits._
//number of partitions RDD
sc.textFile("file:///Users/guilherme.braccialli/Desktop/x.txt",222).map(line => (line ,1)).reduceByKey(_ + _, 100).foreach(println)
//number of partitions DataFrame
spark.conf.set("spark.sql.files.maxPartitionBytes", 100)
spark.conf.set("spark.sql.shuffle.partitions","100")
spark.read.load("xxx").count
//accumulators on udf
import org.apache.spark.util.LongAccumulator
val accumulatorCountFilterOut = sc.longAccumulator("rejected")
val updateAccumulator = udf {s:Boolean =>
{
if (!s)
accumulatorCountFilterOut.add(1)
s
}
}
df.filter(updateAccumulator($"id" === lit(1))).show(1000)
println(accumulatorCountFilterOut.value)
//window function
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
df.withColumn("lag", lag($"id", 1).over(Window.partitionBy("country").orderBy(col("date").desc))).show
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
def schemaFromString(schemaString: String) = {
StructType(
schemaString.filter(_ >= ' ').split(",").map(_.trim).map{f =>
val arrayFields = f.split(" ")
StructField(arrayFields(0), CatalystSqlParser.parseDataType("string"), true)
}
)
}
val schema = schemaFromString("a string, b string")
//nested schema
val schema = StructType(Array(
StructField("Meta Data",
StructType(Array(
StructField("1. Information", StringType),
StructField("2. Symbol", StringType),
StructField("3. Last Refreshed", StringType),
StructField("4. Output Size", StringType),
StructField("5. Time Zone", StringType)
)),
true
),
StructField("Weekly Time Series",
MapType(
StringType,
StructType(Array(
StructField("1. open", StringType),
StructField("2. high", StringType),
StructField("3. low", StringType),
StructField("4. close", StringType),
StructField("5. volume", StringType)
))
)
,true
)
))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment