Last active
October 25, 2018 01:43
-
-
Save gbraccialli/9c44ff4cdf526e31d4e968940ff22353 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sc.setLogLevel("ERROR") | |
//rename columns | |
val tmpDf = df.toDF(df.columns.map(x => x.toUpperCase): _*) | |
val dfNew = df.columns.foldLeft(df)((df, col) => df.withColumnRenamed(col, col + "x")) | |
val newSchema = StructType(df.schema.map(c => StructField(c.name+"xx", c.dataType, c.nullable))) | |
val dfNew = spark.createDataFrame(df.rdd, newSchema) | |
//add id to columns | |
df.toDF(df.columns.zipWithIndex.map(c => c._1 + '_' + c._2) : _*).show | |
//remove invalid characthers and put unique names for repeated colunm names | |
import org.apache.spark.sql.DataFrame | |
def dfWithCleanAndUniqueColumnNames(df: DataFrame) = { | |
import scala.collection.mutable.ArrayBuffer | |
val cols = ArrayBuffer[String]() | |
for (col <- df.columns){ | |
def columnNameWithSequence(name: String, seq: Integer): String = { | |
val renamedColumn = if (seq == 1) name else (name + "_" + seq) | |
if (cols.contains(renamedColumn)) | |
columnNameWithSequence(name, seq+1) | |
else | |
renamedColumn | |
} | |
cols += columnNameWithSequence(col.replaceAll("[^a-zA-Z0-9]", "_"), 1) | |
} | |
df.toDF(cols.toSeq : _*) | |
} | |
dfWithCleanAndUniqueColumnNames(df) | |
//set configs on the fly | |
spark.conf.set("spark.sql.shuffle.partitions","100") | |
//list conf | |
sc.hadoopConfiguration.get("mapred.output.committer.class") | |
sc.hadoopConfiguration.get("mapreduce.fileoutputcommitter.algorithm.version") | |
//list all variables in spark-shell | |
$intp.unqualifiedIds.foreach(println) | |
//broadcast threshold | |
spark.conf.set("spark.sql.conf.autoBroadcastJoinThreshold", 1000000000) | |
//local dir conf | |
--conf spark.local.dir=/tmp/xxx | |
//simple udf | |
def mod(id: Long, mod: Long) = {id % mod} | |
val udfmod = udf(mod _) | |
//or | |
val udfmod = udf((a:Int, b:Int) => a % b) | |
//or | |
df.select(udf{(a: Long) => a}.apply($"customer")).show() | |
//register parquet external table | |
spark.sqlContext.createExternalTable("test", "file:///xxxx") | |
sql("alter table test9 recover partitions") | |
spark.catalog.listTables.collect.foreach(t => {try{println(t.name);spark.sqlContext.sql("alter table " + t.name + " recover partitions ").show}catch{ case e: org.apache.spark.sql.AnalysisException => println("error")}}) | |
//log time | |
def time[T](block: => T): T = { | |
val start = System.currentTimeMillis | |
val res = block | |
val totalTime = System.currentTimeMillis - start | |
println("Elapsed time: %.2f seconds".format(totalTime/1000.0)) | |
res | |
} | |
time(spark.read.load("your_parquet").count) | |
//implicts, enhanced dataframe | |
object ImplicitsDF { | |
implicit class ExtendedDataFrame(df: org.apache.spark.sql.DataFrame) { | |
def saveS3(path: String) = { | |
df.coalesce(1).write.save(path) | |
} | |
} | |
} | |
import ImplicitsDF.ExtendedDataFrame | |
val dfTest = spark.sparkContext.parallelize(1 to 10).toDF | |
dfTest.saveS3("xx") | |
//df operations on multiple paths | |
val paths = Seq("test1sss", "test2", "test3xxx") | |
paths.foreach{path => | |
println("processing: " + path) | |
try{ | |
val df = spark.read.load(path) | |
//some df operation | |
} | |
catch{ | |
case e: Exception => println("error:" + path + " :" + e.toString)} | |
} | |
} | |
//scala number formatting | |
println(f"asdfasdfasd $x%,d") | |
printf("asdfasdfasd %,d", x) | |
printf("asdfasdfasd %,2.2f", x.toDouble) | |
//spark session from intelliJ | |
val spark = SparkSession | |
.builder() | |
.master("local") | |
.appName("xxxxxxx") | |
.enableHiveSupport() | |
.getOrCreate() | |
import spark.sqlContext.implicits._ | |
//number of partitions RDD | |
sc.textFile("file:///Users/guilherme.braccialli/Desktop/x.txt",222).map(line => (line ,1)).reduceByKey(_ + _, 100).foreach(println) | |
//number of partitions DataFrame | |
spark.conf.set("spark.sql.files.maxPartitionBytes", 100) | |
spark.conf.set("spark.sql.shuffle.partitions","100") | |
spark.read.load("xxx").count | |
//accumulators on udf | |
import org.apache.spark.util.LongAccumulator | |
val accumulatorCountFilterOut = sc.longAccumulator("rejected") | |
val updateAccumulator = udf {s:Boolean => | |
{ | |
if (!s) | |
accumulatorCountFilterOut.add(1) | |
s | |
} | |
} | |
df.filter(updateAccumulator($"id" === lit(1))).show(1000) | |
println(accumulatorCountFilterOut.value) | |
//window function | |
import org.apache.spark.sql.functions._ | |
import org.apache.spark.sql.expressions.Window | |
df.withColumn("lag", lag($"id", 1).over(Window.partitionBy("country").orderBy(col("date").desc))).show | |
import org.apache.spark.sql.types._ | |
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser | |
def schemaFromString(schemaString: String) = { | |
StructType( | |
schemaString.filter(_ >= ' ').split(",").map(_.trim).map{f => | |
val arrayFields = f.split(" ") | |
StructField(arrayFields(0), CatalystSqlParser.parseDataType("string"), true) | |
} | |
) | |
} | |
val schema = schemaFromString("a string, b string") | |
//nested schema | |
val schema = StructType(Array( | |
StructField("Meta Data", | |
StructType(Array( | |
StructField("1. Information", StringType), | |
StructField("2. Symbol", StringType), | |
StructField("3. Last Refreshed", StringType), | |
StructField("4. Output Size", StringType), | |
StructField("5. Time Zone", StringType) | |
)), | |
true | |
), | |
StructField("Weekly Time Series", | |
MapType( | |
StringType, | |
StructType(Array( | |
StructField("1. open", StringType), | |
StructField("2. high", StringType), | |
StructField("3. low", StringType), | |
StructField("4. close", StringType), | |
StructField("5. volume", StringType) | |
)) | |
) | |
,true | |
) | |
)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment