Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save natalia-chikina/f12c1c85888c1dca752fedbfd6fb786c to your computer and use it in GitHub Desktop.
Save natalia-chikina/f12c1c85888c1dca752fedbfd6fb786c to your computer and use it in GitHub Desktop.
package com.bam.device
import java.util.concurrent.TimeUnit
import akka.actor._
import com.bam.commons.{Common, DeviceStatsCommon}
import com.bam.data.DeviceMacAddress
import com.bam.data.tag.SensorSerial
import com.bam.device.bloh.BlohConfig
import com.bam.device.bloh.client.netty.NettyAsyncUplink.UplinkMessages
import com.bam.device.bloh.client.netty.{NettyAsyncDownlink, NettyAsyncUplink}
import com.bam.device.bloh.tag._
import com.bam.device.blohProtocol.{BlohConstants, BlohMessage}
import com.bam.domain._
import com.bam.exception.DeviceFSMStopException
import io.netty.handler.codec.http.cookie.DefaultCookie
import org.joda.time.{DateTime, DateTimeZone}
import scala.collection.JavaConverters._
import scala.collection.immutable.Queue
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import collection.JavaConversions._
/**
* Created by vikas on 10/5/16.
*/
object DeviceFSM {
// Actor commands
case class PutInBed(forPeriod: Duration = Duration.Inf)
case object PutOutOfBed
case class StopDevice()
case class Tick(time: DateTime)
case object AckTimeout
case class UpdateSoftware(deviceId: DeviceMacAddress, version: String) extends DeviceSpecific
case class ThreeXRate(buf:Boolean, time: Long)
case class ThreeXRateStop()
case class ConnectDisconnect(time:Long)
case class ConnectDisconnectStop()
// FSM states
sealed trait ConnectionState
case object Connecting extends ConnectionState
case object Connected extends ConnectionState
case object WaitingForSendAck extends ConnectionState
// FSM data
case class Data(uplink: Option[ActorRef] = None, downlink: Option[ActorRef] = None,
numOfAttempts: Int = 0, backlog: Queue[BlohMessage] = Queue.empty, connectionId: String = null)
// Actor creation factory.
trait PropsFactory {
def props(mac: DeviceMacAddress, supervisor: ActorRef, bc: BlohConfig, requestConnectionResponse: RequestConnectionResponse, scenarioBean: ScenarioBean, scenarioName: String): Props
}
object DefaultFactory extends PropsFactory {
def props(mac: DeviceMacAddress, supervisor: ActorRef, bc: BlohConfig, requestConnectionResponse: RequestConnectionResponse, scenarioBean: ScenarioBean, scenarioName: String): Props = Props(new DeviceFSM(mac, supervisor, bc, requestConnectionResponse, scenarioBean, scenarioName))
}
}
class DeviceFSM(mac: DeviceMacAddress, supervisor: ActorRef, bc: BlohConfig, requestConnectionResponse: RequestConnectionResponse, scenarioBean: ScenarioBean, scenarioName: String) extends Actor with FSM[DeviceFSM.ConnectionState, DeviceFSM.Data] with DiagnosticActorLogging {
import DeviceFSM._
import context.dispatcher
var tickSec: Long = 0
val random = new scala.util.Random
val zeroSecDuration = FiniteDuration(0, TimeUnit.SECONDS)
val scenarioBeanParametersByName = Common.getScenarioBeanParametersMap.get(scenarioName)
var lastStepIndex = -1
var eveningSecRandom = 0
var morningSecRandom = 0
scenarioBean.setStartScenarioBeanFromParams(scenarioBeanParametersByName.getStartParams)
val eveningDuration = asScalaBuffer(scenarioBeanParametersByName.getSteps())
.toList.find(x => x.getTypeOfDay == TimeofDayType.DAY).get.getDurationInSec
val dayDuration = asScalaBuffer(scenarioBeanParametersByName.getSteps())
.toList.find(x => x.getTypeOfDay == TimeofDayType.EVENING).get.getDurationInSec
val nightDuration = asScalaBuffer(scenarioBeanParametersByName.getSteps())
.toList.filter(x => x.getTypeOfDay == TimeofDayType.NIGHT)
.map(_.getDurationInSec)
.foldLeft(0l)((acc, curr) => acc + curr)
val morningDuration = asScalaBuffer(scenarioBeanParametersByName.getSteps())
.toList.find(x => x.getTypeOfDay == TimeofDayType.MORNING).get.getDurationInSec
val totalDuration = eveningDuration + dayDuration + nightDuration + morningDuration
val eveningRandomRange = 0 to eveningDuration.toInt
var stubDevice = new BlohMessageGenerator(scenarioBean)
val messagesSchedule = context.system.scheduler.schedule(1.second, 1.second)(sendTick)
val MAX_BACKLOG = 5 * 60
val THREEXRATE_STEP = 3
val MAX_UNSIGNED_SHORT_VALUE = Short.MaxValue * 2 + 1
// 5 minutes of data to keep
val BACKLOG_SENDING_SPEED: Int = 5
val logController: Int = 0
var terminatedChild: Boolean = false
var terminatedChildAtConnecting: Boolean = false
var seqNumCounter: Int = -1
var isInBedSignalSent = false
private[this] var noopCounter = 0
private[this] var ackCounter = 0
private[this] var sensorFlags: Map[SensorSerial, Int] = Map.empty
private[this] var reconnectTimeout = 60.seconds
private[this] var deviceTime = now
private[this] var uplinkConnected = false
private[this] var downlinkConnected = false
private[this] var threeXRateDuration: Option[Long] = None
val deviceInBedStats = DeviceStatsCommon.getDeviceInBedStats;
val connectedDeviceSet = DeviceStatsCommon.getConnectedDeviceSet;
var seqNumBuffer = ArrayBuffer.empty[Int]
val ackThreshold = new FiniteDuration(Common.getProps.getProperty("downlink.ack.threshold.seconds").toLong, TimeUnit.SECONDS)
override val supervisorStrategy =
AllForOneStrategy(maxNrOfRetries = 0) {
case e: Exception => {
log.info("In DeviceFSM strategy, sending stop signal", e)
DeviceStatsCommon.getDisconnectedDevices.put(mac.getAddressAsString, new DisconnectDeviceProperty(0, null, null))
connectedDeviceSet.remove(mac.getAddressAsString)
DeviceStatsCommon.decrementDeviceRunCount();
if (deviceInBedStats.get(self)) {
DeviceStatsCommon.decrementDeviceInBedCount();
}
deviceInBedStats.remove(self);
stop(FSM.Failure(s"Stopping device actor as one of the child is failed to connect ! device ${mac.toString()}, will get restart from supervisor actor"))
SupervisorStrategy.Stop
}
}
override def postStop() {
super.postStop
messagesSchedule.cancel
}
override val log = akka.event.Logging(this)
override def mdc(msg: Any) = Map("deviceId" -> mac, "target" -> supervisor.path.name)
startWith(Connecting, connect)
when(Connecting)//(transform
{
case Event(c: NettyAsyncUplink.Connected, data: Data) =>
log.info("Uplink Connected for mac {} ", mac.getAddressAsString);
uplinkConnected = true;
if (downlinkConnected) {log.info("Both uplinke and downlink connected for mac {} ", mac.getAddressAsString); goto(Connected) using data.copy(numOfAttempts = 0) }
else stay
case Event(c: NettyAsyncDownlink.Connected, data: Data) =>
log.info("Downlink Connected for mac {} ", mac.getAddressAsString);
downlinkConnected = true;
if (uplinkConnected) {log.info("Both uplinke and downlink connected for mac {} ", mac.getAddressAsString); goto(Connected) using data.copy(numOfAttempts = 0)}
else stay
case Event(t: Terminated, data: Data) =>
if (data.numOfAttempts < 3) {
if (t.getActor.path.name.equals("uplink")) {
log.info("Uplink terminated for {} while establishing connnection, reconnecting with new uplink actor, reconnect attempts# {}", mac.getAddressAsString, data.numOfAttempts)
val uplinkRef = context.actorOf(Props(new NettyAsyncUplink(self, requestConnectionResponse.getConnectionId, requestConnectionResponse.getUpLinkUri, requestConnectionResponse.getCookies.asScala.toList)), "uplink")
context.watch(uplinkRef)
goto(Connecting) using data.copy(uplink = Option(uplinkRef), numOfAttempts = data.numOfAttempts + 1)
}
else if (t.getActor.path.name.equals("downlink")) {
log.info("Downlink terminated for {} while establishing connnection, reconnecting with new downlink actor, reconnect attempts# {}", mac.getAddressAsString, data.numOfAttempts)
val downlinkRef = context.actorOf(Props(new NettyAsyncDownlink(self, requestConnectionResponse.getConnectionId, requestConnectionResponse.getDownLinkUri, requestConnectionResponse.getCookies.asScala.toList)), "downlink")
context.watch(downlinkRef)
goto(Connecting) using data.copy(downlink = Option(downlinkRef), numOfAttempts = data.numOfAttempts + 1)
} else {
log.info("Either Uplink or Downlink terminated for {} while establishing connnection, reconnecting new actors, reconnect attempts# {}", mac.getAddressAsString, data.numOfAttempts)
context unwatch data.uplink.get
context unwatch data.downlink.get
context stop data.uplink.get
context stop data.downlink.get
val uplinkRef = context.actorOf(Props(new NettyAsyncUplink(self, requestConnectionResponse.getConnectionId, requestConnectionResponse.getUpLinkUri, requestConnectionResponse.getCookies.asScala.toList)), "uplink")
context.watch(uplinkRef)
val downlinkRef = context.actorOf(Props(new NettyAsyncDownlink(self, requestConnectionResponse.getConnectionId, requestConnectionResponse.getDownLinkUri, requestConnectionResponse.getCookies.asScala.toList)), "downlink")
context.watch(downlinkRef)
goto(Connecting) using data.copy(uplink = Option(uplinkRef), downlink = Option(downlinkRef), numOfAttempts = data.numOfAttempts + 1)
}
} else {
terminatedChildAtConnecting = true
if (deviceInBedStats.get(self)) {
DeviceStatsCommon.decrementDeviceInBedCount()
}
deviceInBedStats.remove(self)
DeviceStatsCommon.decrementDeviceRunCount();
log.info("Made 3 attempts while connecting before stopping actor for mac: {}", mac.toString)
stop(FSM.Failure(s"Either uplink or downlink terminated while establishing connnection! device ${mac.toString()}, try 3 times, stoping device actor"))
}
}
//using sendBacklogIfExists)
onTransition {
case Connecting -> Connected =>
connectedDeviceSet.add(mac.getAddressAsString);
setTimer("downlinkAck", AckTimeout, ackThreshold, false);
log.info("On transition Connecting -> Connected for mac {}", mac.getAddressAsString)
case Connecting -> WaitingForSendAck =>
connectedDeviceSet.add(mac.getAddressAsString);
log.info("On transition Connecting -> Waiting for send ack for mac {}", mac.getAddressAsString)
}
when(Connected)(combine({
//TODO check the data for other fields exists here
case Event(Tick(time), Data(Some(uplink), _, _, _, _)) =>
val stepIndex = getCurrentScenarioStepIndex
val currentTypeOfDay = scenarioBeanParametersByName.getSteps.get(stepIndex).getTypeOfDay
if (lastStepIndex != stepIndex) {
lastStepIndex = stepIndex
if (currentTypeOfDay == TimeofDayType.DAY) {
setInBedTimer
}
}
if (currentTypeOfDay == TimeofDayType.EVENING
|| currentTypeOfDay == TimeofDayType.NIGHT) {
if (isInBedSignalSent && currentTypeOfDay == TimeofDayType.EVENING) {
scenarioBean.setScenarioBeanFromParams(scenarioBeanParametersByName.getSteps.get(stepIndex + 1).getParams)
} else {
scenarioBean.setScenarioBeanFromParams(scenarioBeanParametersByName.getSteps.get(stepIndex).getParams)
}
}
tickSec = (tickSec + 1) % totalDuration
if (!DeviceStatsCommon.getConnDisconnDeviceMap.keySet().contains(mac.getAddressAsString)
|| time.getMillis >= DeviceStatsCommon.getConnDisconnDeviceMap.get(mac.getAddressAsString).getTime) {
if (DeviceStatsCommon.getConnDisconnDeviceMap.keySet().contains(mac.getAddressAsString)) {
DeviceStatsCommon.getConnDisconnDeviceMap.remove(mac.getAddressAsString)
DeviceStatsCommon.decrementConnDisconnCount()
}
threeXRateDuration match {
case Some(duration) =>
val messages = (0 to 2).map { offset =>
val messageTime = (time.getMillis - duration * 1000) + offset * 1000
stubDevice.createMessage(new DateTime(messageTime, DateTimeZone.UTC), getSeqNum)
}.toList
messages.foreach { msg =>
seqNumBuffer += msg.getSeqNum
uplink ! msg; setAckTimer;
}
threeXRateDuration = Some(duration - THREEXRATE_STEP)
if (threeXRateDuration.get <= 0) {
threeXRateDuration = None
DeviceStatsCommon.decrementThreeXRateCountt()
log.info("Switched to 1X rate mode for mac: {}", mac.getAddressAsString)
}
case None =>
val msg = stubDevice.createMessage(time, getSeqNum)
seqNumBuffer += msg.getSeqNum
uplink ! msg; setAckTimer;
}
goto(WaitingForSendAck)
} else stay
//After Ack time out if sent event come from Uplink will fall into connected state, no need to do anything
case Event(NettyAsyncUplink.Sent(id), data) => stay
case Event(ThreeXRate(false, duration), _) =>
log.info("Switched to 3X rate mode with duration: {} s for mac: {}", duration, mac.getAddressAsString)
threeXRateDuration = Some(duration);
stay
case Event(NettyAsyncDownlink.MessageReceived(blohMsg), data: Data) =>
processClientMessage(data.connectionId, blohMsg); stay
case Event(AckTimeout, _) =>
log.info("Downlink ACK timeout for mac {}", mac.getAddressAsString); stay
}, handleDisconnect))
when(WaitingForSendAck)(transform(combine({
case Event(NettyAsyncUplink.Sent(id), data) => cancelAckTimer; goto(Connected)
case Event(AckTimeout, data) => log.info("Uplink mesage ACK timeout for mac {}", mac.getAddressAsString); goto(Connected)
}, handleDisconnect)) using sendBacklogIfExists)
whenUnhandled {
case Event(Tick(time), data) => ; stay using enqueDataPacket(time, data)
case Event(PutInBed(forPeriod), _) =>
if (!deviceInBedStats.get(self)) {
isInBedSignalSent = true
log.debug("Put device In Bed! for mac: { } with timeout= {}", mac.getAddressAsString, forPeriod.asInstanceOf[FiniteDuration].toString())
stubDevice.putInBed;
DeviceStatsCommon.incrementDeviceInBedCount
deviceInBedStats.put(self, true)
if (forPeriod.isFinite) {
setTimer("outOfBed", PutOutOfBed, forPeriod.asInstanceOf[FiniteDuration], false)
}
} else {
log.info("Put in bed got for mac {} which is already in bed, nothing to do ", mac.getAddressAsString)
}
stay
case Event(PutOutOfBed, _) => stubDevice.putOutOfBed
if (deviceInBedStats.get(self)) {
log.debug("Put device out of bed! for mac: {} ", mac.getAddressAsString)
isInBedSignalSent = false
cancelTimer("outOfBed")
deviceInBedStats.put(self, false)
DeviceStatsCommon.decrementDeviceInBedCount
} else {
log.info("Put out of bed got for mac {} which is already out of bed, shouldn't happen, canceling timer if any ", mac.getAddressAsString)
cancelTimer("outOfBed") //Defensive to cancel timer as it wont harm anything
}
stay
case Event(StopDevice, data: Data) =>
context unwatch data.uplink.get
context unwatch data.downlink.get
context stop data.uplink.get
context stop data.downlink.get
context stop self
stay
case Event(NettyAsyncDownlink.MessageReceived(blohMsg), data: Data) =>
processClientMessage(data.connectionId, blohMsg); stay
case Event(any, data) => log.info("Got unhandled event: {} in connected state for mac address: {}", any, mac.getAddressAsString); stay
}
/**
*
* @return
*/
def now = new DateTime(DateTimeZone.UTC) //.withMillis(0L)
def getCurrentScenarioStepIndex: Int = {
var i = 0
var sumOfStepDurations = scenarioBeanParametersByName.getSteps.get(0).getDurationInSec
while(i != scenarioBeanParametersByName.getSteps.size) {
if (tickSec < sumOfStepDurations) {
return i
} else {
sumOfStepDurations += scenarioBeanParametersByName.getSteps.get(i + 1).getDurationInSec
i += 1
}
}
i
}
/**
*
*/
def sendTick = {
deviceTime = deviceTime.plusSeconds(1)
self ! DeviceFSM.Tick(deviceTime)
}
/**
*
* @return
*/
def connect: DeviceFSM.Data = {
log.info("Connecting device with macaddress :" + mac.getAddressAsString)
uplinkConnected = false;
downlinkConnected = false;
createUplinkAndDownlink(requestConnectionResponse.getConnectionId, requestConnectionResponse.getCookies.asScala.toList, requestConnectionResponse.getUpLinkUri, requestConnectionResponse.getDownLinkUri)
}
/**
*
* @param connectionId
* @param cookies
* @param up
* @param down
* @return
*/
def createUplinkAndDownlink(connectionId: String, cookies: List[DefaultCookie], up: String, down: String): DeviceFSM.Data = {
log.info("Creating uplink and downlink actor for mac {}", mac.getAddressAsString)
val uplinkRef = context.actorOf(Props(new NettyAsyncUplink(self, connectionId, up, cookies)), "uplink")
context.watch(uplinkRef)
val downlinkRef = context.actorOf(Props(new NettyAsyncDownlink(self, connectionId, down, cookies)), "downlink")
context.watch(downlinkRef)
Data(Option(uplinkRef), Option(downlinkRef), 0, Queue.empty, connectionId)
}
/**
*
* @param time
* @param data
* @return
*/
def enqueDataPacket(time: DateTime, data: Data): Data = {
val msg = stubDevice.createMessage(time, getSeqNum)
val backlog = data.backlog.enqueue(msg)
if (backlog.size > MAX_BACKLOG) {
if (!terminatedChildAtConnecting) {
log.info(s"In Enque Data packet for mac =${mac.getAddressAsString}, backlog.size > MAX_BACKLOG. backlog.size = ${backlog.size}. state $stateName")
data.copy(backlog = backlog.dequeue._2)
} else {
log.info("No child running for Devise FSM, throwing null pointer exception for mac: {}", mac.toString)
throw new DeviceFSMStopException("All children already stop, lets stop the DeviceFSM ")
}
} else {
data.copy(backlog = backlog)
}
}
/**
*
*/
def getSeqNum = {
seqNumCounter = (seqNumCounter + 1) % (MAX_UNSIGNED_SHORT_VALUE + 1)
seqNumCounter
}
def setInBedTimer = {
eveningSecRandom = eveningRandomRange(random.nextInt(eveningRandomRange.length))
val putInBedTimerDuration = FiniteDuration(dayDuration + eveningSecRandom, SECONDS)
val forPeriod = FiniteDuration(nightDuration, SECONDS)
log.info("Move device {} to in bed after {} sec", mac.getAddressAsString, putInBedTimerDuration)
setTimer("putInBed", PutInBed(forPeriod), putInBedTimerDuration, false)
}
/**
*
*/
def setAckTimer {
setTimer("ack", AckTimeout, 5.seconds, false)
}
/**
*
*/
def cancelAckTimer = cancelTimer("ack")
/**
*
* @param n
* @param queue
* @param acc
* @return
*/
def dequeFirstN(n: Int, queue: Queue[BlohMessage], acc: ArrayBuffer[BlohMessage]): Queue[BlohMessage] = {
require(n >= 0)
queue.dequeueOption match {
case Some(a) =>
if (n == 0) {
a._2
} else {
dequeFirstN(n - 1, a._2, acc.+=(a._1))
}
case None => queue
}
}
/**
*
* @return
*/
def sendBacklogIfExists: PartialFunction[State, State] = {
//TODO check data state
case [email protected](state, data@Data(Some(uplink), _, _, backlog, _), timeout, stopReason, replies) if !backlog.isEmpty =>
if (!terminatedChild) {
val acc = new ArrayBuffer[BlohMessage]()
val rb = dequeFirstN(BACKLOG_SENDING_SPEED, backlog, acc)
if (acc.size > 5) {
log.info("Sending backlog size: {} Messages left: {} For mac: {} ", acc.size, rb.size, mac.getAddressAsString)
} else {
log.info("Sending backlog size: {} Messages left: {} For mac: {} ", acc.size, rb.size, mac.getAddressAsString)
}
uplink ! UplinkMessages(acc(0).getSeqNum, acc.toList);
setAckTimer
goto(WaitingForSendAck) using data.copy(backlog = rb)
} else {
//Stopping DeviceFSM here as Akka was not able to stop the device in handle disconnect, may be issue with Akka
log.info("Child already terminated for Devise FSM, throwing null pointer exception for mac: {}", mac.toString)
throw new DeviceFSMStopException("All children already stop, lets stop the DeviceFSM ")
}
}
/**
*
* @param first
* @param second
* @return
*/
def combine(first: StateFunction, second: StateFunction): StateFunction = first.orElse(second)
/**
* We should keep trying for disconnection without any limit ideally just like the actual device, no need to start device, just bring down both uplink and downlink and reconnect.
* @return
*/
def handleDisconnect: StateFunction = {
case Event(Terminated(t: ActorRef), data) =>
if (!terminatedChild) {
val name = t.path.name
log.info("Either uplink or downlink actor terminated with name {}! for mac: {}, restarting both uplink and downlink actor", name, mac.getAddressAsString)
terminatedChild = true
if (t.path.name.equals("downlink")) {
context stop data.uplink.get
context unwatch data.uplink.get
} else if (t.path.name.equals("uplink")) {
context stop data.downlink.get
context unwatch data.downlink.get
}
if (deviceInBedStats.get(self)) {
DeviceStatsCommon.decrementDeviceInBedCount();
}
deviceInBedStats.remove(self);
if (connectedDeviceSet.contains(mac.getAddressAsString)) {
DeviceStatsCommon.getDisconnectedDevices.put(mac.getAddressAsString, new DisconnectDeviceProperty(0, null, null))
connectedDeviceSet.remove(mac.getAddressAsString)
DeviceStatsCommon.decrementDeviceRunCount();
}
log.info("Before stopping the DeviceFSM for device : {}", mac.toString)
stop(FSM.Failure(s"In handleDisconnect Stopping device actor as one of the child is failed to connect ! device ${mac.toString()}, will get restart from supervisor actor"))
} else {
log.info("Child already terminated for Devise FSM, stopping device fsm actor for mac: {}", mac.toString)
stop(FSM.Failure(s"In handleDisconnect Stopping device actor as one of the child is failed to connect ! device ${mac.toString()}, will get restart from supervisor actor"))
}
case Event(Terminated, data) =>
if (!terminatedChild) {
log.info("Either uplink or downlink actor terminated without disconnect! for mac: {}, restarting both uplink and downlink actor ", mac.getAddressAsString)
terminatedChild = true;
context unwatch data.uplink.get
context unwatch data.downlink.get
context stop data.uplink.get
context stop data.downlink.get
if (deviceInBedStats.get(self)) {
DeviceStatsCommon.decrementDeviceInBedCount();
}
deviceInBedStats.remove(self);
if (connectedDeviceSet.contains(mac.getAddressAsString)) {
DeviceStatsCommon.getDisconnectedDevices.put(mac.getAddressAsString, new DisconnectDeviceProperty(0, null, null))
connectedDeviceSet.remove(mac.getAddressAsString)
DeviceStatsCommon.decrementDeviceRunCount();
}
log.info("Before stopping the DeviceFSM for device : {}", mac.toString)
stop(FSM.Failure(s"In handleDisconnect Stopping device actor as one of the child is failed to connect ! device ${mac.toString()}, will get restart from supervisor actor"))
} else {
log.info("Child already terminated for Devise FSM, stopping device fsm actor for mac: {}", mac.toString)
stop(FSM.Failure(s"In handleDisconnect Stopping device actor as one of the child is failed to connect ! device ${mac.toString()}, will get restart from supervisor actor"))
}
}
/**
*
* @param connectionId
* @param blohMsg
*/
def processClientMessage(connectionId: String, blohMsg: BlohMessage) = blohMsg.getType() match {
case BlohConstants.TYPE_NOOP =>
log.info("Receive NOOP message! device={} connectionId={}", mac.getAddressAsString, connectionId); noopCounter += 1
case BlohConstants.TYPE_DOWNLINK_COMMANDS =>
log.info("Receive Device Command message! device={} connectionId={}", mac.getAddressAsString, connectionId)
processCommands(blohMsg.getBuf())
}
/**
*
* @param buf
*/
private[this] def processCommands(buf: Array[Byte]) {
var sensor = new SensorSerial(0)
TagParser.parse(buf).fold({ err => log.error("Error parsing tagged message: {}", err) }, _.foreach {
case Ack(seq) =>
log.info("Receive ACK message! device={}", mac.getAddressAsString);
ackCounter += 1
val index = seqNumBuffer.indexOf(seq)
seqNumBuffer = seqNumBuffer.drop(index + 1)
setTimer("downlinkAck", AckTimeout, ackThreshold, false)
case s: Sensor => sensor = new SensorSerial(s.serial); log.info("Set sensor message! device={}, sensor={}", mac.getAddressAsString, sensor.getSerialNumber());
case RuntimeFlags(flags) =>
log.info("Receive runtime flags! device={} sensor={} flags={}", mac.getAddressAsString, sensor.getSerialNumber(), flags)
try {
stubDevice.setRuntimeFlags(sensor, flags)
} catch {
case iae: IllegalArgumentException => log.info("Illegal Argument Exception, Invalid sensor or runtime flags! " + iae.getMessage)
case e: Exception => log.info("Exeption, Invalid sensor or runtime flags!" + e.getMessage)
}
sensorFlags += sensor -> flags
case tagId => log.info("Receive command tag:{} for mac {}", tagId, mac.getAddressAsString)
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment