Skip to content

Instantly share code, notes, and snippets.

View Hungsiro506's full-sized avatar

Hưng Vũ Hungsiro506

View GitHub Profile
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/J2SE-1.5">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
<attributes>
<attribute name="maven.pomderived" value="true"/>
@Hungsiro506
Hungsiro506 / bigdata-radius_.gitignore
Created May 8, 2017 00:51
Move from payTV-kafka-connector to radius
*.~
.*
!.gitignore
target/
@Hungsiro506
Hungsiro506 / HyperLogLogStoreUDAF.scala
Created June 13, 2017 05:33 — forked from MLnick/HyperLogLogStoreUDAF.scala
Experimenting with Spark SQL UDAF - HyperLogLog UDAF for distinct counts, that stores the actual HLL for each row to allow further aggregation
class HyperLogLogStoreUDAF extends UserDefinedAggregateFunction {
override def inputSchema = new StructType()
.add("stringInput", BinaryType)
override def update(buffer: MutableAggregationBuffer, input: Row) = {
// This input Row only has a single column storing the input value in String (or other Binary data).
// We only update the buffer when the input value is not null.
if (!input.isNullAt(0)) {
if (buffer.isNullAt(0)) {
*.~
.*
!.gitignore
target/
*.class
*.iml
*.ipr
*.iws
.idea
out
@Hungsiro506
Hungsiro506 / gist:f5a61100e8a103fcb98e6c537db023ae
Created July 16, 2017 16:51
I've just started looking around for a solution for stateful computation with Spark Streaming when I came across the updateStateByKey() function. The problem I'm trying to solve: 10,000 sensors produce a binary value every minute. If consecutive values a sensor reports are different from each other, I would like to flag that and send it down Kaf…
val sensorData: DStream[(String, Int)] = ???
val state = sensorData.updateStateByKey[(String, Int)](updateFunction _)
def updateFunction(newValues: Seq[(String, Int)], currentValues: Seq[(String, Int)]) = {
val newValuesMap = newValues.toMap
val currentValuesMap = currentValues.toMap
currentValuesMap.keys.foreach ( (id) =>
if(currrentValuesMap.get(id) != newValuesMap.getOrElse(id, -1)) {
```
I've just started looking around for a solution for stateful computation with Spark Streaming when
I came across the updateStateByKey() function. The problem I'm trying to solve: 10,000 sensors
produce a binary value every minute. If consecutive values a sensor reports are different from each other,
I would like to flag that and send it down Kafka as a state change event. My assumption is that updateStateByKey()
can be used in this example, however I'm not entirely aware of the recommended approach of implementing the same.||
I am assuming that you will get a stream of (String, Int) pairs from the sensors where the String is the ID of the sensor and the Int is the binary value returned by the sensor. With that assumption you could try something like this:
```
### Lookup cache using mapWithState:
`import org.apache.spark.streaming.{ StreamingContext, Seconds }
val ssc = new StreamingContext(sc, batchDuration = Seconds(5))
// checkpointing is mandatory
ssc.checkpoint("_checkpoints")
val rdd = sc.parallelize(0 to 9).map(n => (n, n % 2 toString))
import org.apache.spark.sql.SparkSession
import import java.util.concurrent.Executors
import scala.concurrent._
import scala.concurrent.duration._
object FancyApp {
def def appMain(args: Array[String]) = {
// configure spark
val spark = SparkSession
.builder
//////////////////// Upsert //////////////////////////////////////////////////////////////////
import java.sql._
dataframe.coalesce("NUMBER OF WORKERS").mapPartitions((d) => Iterator(d)).foreach { batch =>
val dbc: Connection = DriverManager.getConnection("JDBCURL")
val st: PreparedStatement = dbc.prepareStatement("YOUR PREPARED STATEMENT")
batch.grouped("# Of Rows you want per batch").foreach { session =>
session.foreach { x =>
st.setDouble(1, x.getDouble(1))
st.addBatch()
val head: Row = brasSumCounting.head(1).toList(0)
val signInTotalCount = head.getAs[Long]("total_signin")
val logOffTotalCount = head.getAs[Long]("total_logoff")
val signInDistinctTotalCount = head.getAs[Long]("total_signin_distinct")
val logOffDistinctTotalCount = head.getAs[Long]("total_logoff_distinct")
val timeBrasCount = head.getAs[java.sql.Timestamp]("time")