This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<classpath> | |
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/J2SE-1.5"> | |
<attributes> | |
<attribute name="maven.pomderived" value="true"/> | |
</attributes> | |
</classpathentry> | |
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER"> | |
<attributes> | |
<attribute name="maven.pomderived" value="true"/> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
*.~ | |
.* | |
!.gitignore | |
target/ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class HyperLogLogStoreUDAF extends UserDefinedAggregateFunction { | |
override def inputSchema = new StructType() | |
.add("stringInput", BinaryType) | |
override def update(buffer: MutableAggregationBuffer, input: Row) = { | |
// This input Row only has a single column storing the input value in String (or other Binary data). | |
// We only update the buffer when the input value is not null. | |
if (!input.isNullAt(0)) { | |
if (buffer.isNullAt(0)) { |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
*.~ | |
.* | |
!.gitignore | |
target/ | |
*.class | |
*.iml | |
*.ipr | |
*.iws | |
.idea | |
out |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val sensorData: DStream[(String, Int)] = ??? | |
val state = sensorData.updateStateByKey[(String, Int)](updateFunction _) | |
def updateFunction(newValues: Seq[(String, Int)], currentValues: Seq[(String, Int)]) = { | |
val newValuesMap = newValues.toMap | |
val currentValuesMap = currentValues.toMap | |
currentValuesMap.keys.foreach ( (id) => | |
if(currrentValuesMap.get(id) != newValuesMap.getOrElse(id, -1)) { |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
``` | |
I've just started looking around for a solution for stateful computation with Spark Streaming when | |
I came across the updateStateByKey() function. The problem I'm trying to solve: 10,000 sensors | |
produce a binary value every minute. If consecutive values a sensor reports are different from each other, | |
I would like to flag that and send it down Kafka as a state change event. My assumption is that updateStateByKey() | |
can be used in this example, however I'm not entirely aware of the recommended approach of implementing the same.|| | |
I am assuming that you will get a stream of (String, Int) pairs from the sensors where the String is the ID of the sensor and the Int is the binary value returned by the sensor. With that assumption you could try something like this: | |
``` |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
### Lookup cache using mapWithState: | |
`import org.apache.spark.streaming.{ StreamingContext, Seconds } | |
val ssc = new StreamingContext(sc, batchDuration = Seconds(5)) | |
// checkpointing is mandatory | |
ssc.checkpoint("_checkpoints") | |
val rdd = sc.parallelize(0 to 9).map(n => (n, n % 2 toString)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.sql.SparkSession | |
import import java.util.concurrent.Executors | |
import scala.concurrent._ | |
import scala.concurrent.duration._ | |
object FancyApp { | |
def def appMain(args: Array[String]) = { | |
// configure spark | |
val spark = SparkSession | |
.builder |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//////////////////// Upsert ////////////////////////////////////////////////////////////////// | |
import java.sql._ | |
dataframe.coalesce("NUMBER OF WORKERS").mapPartitions((d) => Iterator(d)).foreach { batch => | |
val dbc: Connection = DriverManager.getConnection("JDBCURL") | |
val st: PreparedStatement = dbc.prepareStatement("YOUR PREPARED STATEMENT") | |
batch.grouped("# Of Rows you want per batch").foreach { session => | |
session.foreach { x => | |
st.setDouble(1, x.getDouble(1)) | |
st.addBatch() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val head: Row = brasSumCounting.head(1).toList(0) | |
val signInTotalCount = head.getAs[Long]("total_signin") | |
val logOffTotalCount = head.getAs[Long]("total_logoff") | |
val signInDistinctTotalCount = head.getAs[Long]("total_signin_distinct") | |
val logOffDistinctTotalCount = head.getAs[Long]("total_logoff_distinct") | |
val timeBrasCount = head.getAs[java.sql.Timestamp]("time") |
OlderNewer