Skip to content

Instantly share code, notes, and snippets.

View buildlackey's full-sized avatar

Chris Bedford buildlackey

  • Data Lackey Labs
View GitHub Profile
@buildlackey
buildlackey / convertAndShow
Created September 6, 2019 22:47
convertAndShow
def updateTimeZone(tz: String, sessionTz: String) = {
import java.time._
import java.util.TimeZone
System.setProperty("user.timezone", tz);
TimeZone.setDefault(TimeZone.getTimeZone(tz))
spark.conf.set("spark.sql.session.timeZone", sessionTz)
}
@buildlackey
buildlackey / savedTz
Created September 6, 2019 22:47
savedTz
val savedTz = spark.conf.get("spark.sql.session.timeZone")
spark.conf.set("spark.sql.session.timeZone", "GMT")
List("1970-01-01T00:00:00-01:00").toDF("timestr").
withColumn("ts", col("timestr").cast("timestamp")).
withColumn("tsAsInt", col("ts").cast("integer")).
withColumn("asUtc", date_format($"ts", "yyyy-MM-dd'T'HH:mm:ssX")).
show(false)
spark.conf.set("spark.sql.session.timeZone", savedTz )
// RESULT:
// +-------------------------+-------------------+-------+--------------------+
@buildlackey
buildlackey / json.csv.timestamp
Last active September 7, 2019 00:19
json.csv.timestamp
// Will work on MacOS and Linux,
// but needs slight modification on Windows where noted
import java.io.{FileOutputStream, PrintWriter}
import org.apache.spark.sql.types._
import sys.process._
System.setProperty("user.timezone", "PST");
TimeZone.setDefault(TimeZone.getTimeZone("PST"))
@buildlackey
buildlackey / badtimesInPst
Last active September 5, 2019 22:28
badtimesInPst
import java.util.TimeZone
def updateTimeZone(zone: String) = {
System.setProperty("user.timezone", zone);
TimeZone.setDefault(TimeZone.getTimeZone(zone))
}
// Set timezone to Pacific Standard, where -- at least for now --
// daylight savings is in effect
@buildlackey
buildlackey / streaming-inner-join-test
Last active August 24, 2019 03:37
Streaming inner joins unit test
it should "output the result as soon as it arrives without watermark" in {
  val mainEventsStream = new MemoryStream[MainEvent](1, sparkSession.sqlContext)
  val joinedEventsStream = new MemoryStream[JoinedEvent](2, sparkSession.sqlContext)
 
  val stream = mainEventsStream.toDS().join(joinedEventsStream.toDS(), $"mainKey" === $"joinedKey")
 
  val query = stream.writeStream.foreach(RowProcessor).start()
 
  while (!query.isActive) {}
  new Thread(new Runnable() {
[root@RESCUE-master /]# fdisk -l
Disk /dev/xvdc: 2147 MB, 2147483648 bytes
22 heads, 16 sectors/track, 11915 cylinders
Units = cylinders of 352 * 512 = 180224 bytes
Sector size (logical/physical): 512 bytes / 512 bytes
I/O size (minimum/optimal): 512 bytes / 512 bytes
Disk identifier: 0x0006cf9d
Device Boot Start End Blocks Id System