Skip to content

Instantly share code, notes, and snippets.

def updateFunction(alertStrings: Seq[String], stateOption: Option[(Boolean, Date, String)]): Option[(Boolean, Date, String)] = {
(alertStrings, stateOption) match {
case (_, Some((_, triggerTime, string))) => {
val cal = Calendar.getInstance()
cal.add(Calendar.Minute, -60)
if (triggerTime.before(cal.getTime)) None else Some((false, triggerTime, string))
}
case (Nil, None) => None
case (Seq(string, _*), None) => Some((true, Calendar.getInstance().getTime(), string))
}
-------------------------------------------
Time: 1485622596000 ms
-------------------------------------------
-------------------------------------------
Time: 1485622597000 ms
-------------------------------------------
-------------------------------------------
Time: 1485622598000 ms
bloodPressure - - - - - x - - - - - - - - - -
heartRate - - - - - - - x - - - - - - - -
alert - - - - - - - x x x x x x - - -
val alertStream = combinedStream
.filter {
case (_, (bloodPressureEvents, heartRateEvents)) =>
bloodPressureEvents.exists(_.systolic < 100) && heartRateEvents.exists(_.heartRate > 100)
case _ => false
}.map {
case (id, (_, _)) => (id, s"User $id has a problem")
}
def reducer(
a: (List[BloodPressureEvent], List[HeartRateEvent]),
b: (List[BloodPressureEvent], List[HeartRateEvent])
) = (a, b) match {
case ((bp1, hr1), (bp2, h2)) => (bp1 ++ bp2, hr1 ++ h2)
}
val combinedStream =
bloodPressureStream
.map((event) => (event.userId, List(event)))
case class HeartRateEvent(userId: Integer, heartRate: Integer) case class BloodPressureEvent(userId: Integer, systolic: Integer, diastolic: Integer)
def toEvent[T: Manifest] = (string: String) => { val objectMapper = new ObjectMapper with ScalaObjectMapper objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKECASE) objectMapper.configure(DeserializationFeature.FAILONUNKNOWNPROPERTIES, false) objectMapper.registerModule(DefaultScalaModule) objectMapper.readValueT }
val bloodPressureStream = bloodPressureInput.map(toEvent[BloodPressureEvent]) val heartRateStream = hearRateInput.map(toEvent[HeartRateEvent])
val conf = new SparkConf().setMaster("local[2]").setAppName("BiometricAlertStreamProcessor")
val sc = new SparkContext((conf))
val ssc = new StreamingContext(sc, Seconds(1))
val heartRateLines = mutable.Queue[RDD[String]]()
val hearRateInput = ssc.queueStream(heartRateLines)
val bloodPressureLines = mutable.Queue[RDD[String]]()
val bloodPressureInput = ssc.queueStream(bloodPressureLines)
# server.coffee
renderToString = (route, serializedStoreState, callback) ->
flux = new Flux()
flux.deserialize(serializedStoreState)
Router.run routes, route, (Handler, state) ->
html = React.renderToString(<FluxComponent flux={flux} render={ => <Handler />}></FluxComponent>)
embeddedStoreState = "<script>"
embeddedStoreState += "window.serializedStoreState = " + JSON.stringify(serializedStoreState)
# app.js
flux = new Flux()
flux.deserialize(window.serializedStoreState) if window.serializedStoreState
Router.run routes, Router.HistoryLocation, (Handler, state) ->
handler = <FluxComponent flux={flux} render={ => <Handler />}></FluxComponent>
React.render(handler, document.getElementById("main"))
# stores/post.coffee
# components/posts/index.cjsx
Index = React.createClass
displayName: "PostsIndex"
render: ->
<div>
{
@props.posts.map (post) ->
<article key={post.get('id')}>
<header>
<h3>{post.get('title')}</h3>