Skip to content

Instantly share code, notes, and snippets.

@josep2
Created December 6, 2016 02:49
Show Gist options
  • Save josep2/fbd21b08d3722012947f9fd08aad7d91 to your computer and use it in GitHub Desktop.
Save josep2/fbd21b08d3722012947f9fd08aad7d91 to your computer and use it in GitHub Desktop.
import java.sql.Timestamp
import java.time.{LocalDateTime, ZoneId, ZonedDateTime}
import com.cloudera.sparkts._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions._
/**
* Created by josep2 on 12/5/16.
*/
case class TimeSeries(id: String, week_beginning: String, sales: Double)
object TimeSeriesBlog extends App {
val sparkSession = SparkSession.builder
.master("local[4]")
.appName("Panel Engine")
.config("spark.driver.memory", "1g")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
import sparkSession.implicits._
var TimeSeriesData = sparkSession.sparkContext.textFile("./data.txt")
.map(_.split(","))
.map(row => TimeSeries(row(0), row(1), row(2).toDouble))
.toDF()
def makeDate = udf((s: String) => Timestamp.from(ZonedDateTime.of(LocalDateTime.parse(s + "T00:00:00"), ZoneId.systemDefault()).toInstant))
TimeSeriesData = TimeSeriesData.withColumn("week_beginning", makeDate(TimeSeriesData("week_beginning")))
val zone = ZoneId.systemDefault()
val dtIndex = DateTimeIndex.uniformFromInterval(
ZonedDateTime.of(LocalDateTime.parse("2016-01-04T00:00:00"), zone),
ZonedDateTime.of(LocalDateTime.parse("2016-04-04T00:00:00"), zone),
new DayFrequency(7))
val dataRdd = TimeSeriesRDD.timeSeriesRDDFromObservations(dtIndex, TimeSeriesData,
"week_beginning", "id", "sales")
sparkSession.sparkContext.stop()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment