Created
April 13, 2017 15:27
-
-
Save natalia-chikina/f12c1c85888c1dca752fedbfd6fb786c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.bam.device | |
import java.util.concurrent.TimeUnit | |
import akka.actor._ | |
import com.bam.commons.{Common, DeviceStatsCommon} | |
import com.bam.data.DeviceMacAddress | |
import com.bam.data.tag.SensorSerial | |
import com.bam.device.bloh.BlohConfig | |
import com.bam.device.bloh.client.netty.NettyAsyncUplink.UplinkMessages | |
import com.bam.device.bloh.client.netty.{NettyAsyncDownlink, NettyAsyncUplink} | |
import com.bam.device.bloh.tag._ | |
import com.bam.device.blohProtocol.{BlohConstants, BlohMessage} | |
import com.bam.domain._ | |
import com.bam.exception.DeviceFSMStopException | |
import io.netty.handler.codec.http.cookie.DefaultCookie | |
import org.joda.time.{DateTime, DateTimeZone} | |
import scala.collection.JavaConverters._ | |
import scala.collection.immutable.Queue | |
import scala.collection.mutable.ArrayBuffer | |
import scala.concurrent.duration._ | |
import collection.JavaConversions._ | |
/** | |
* Created by vikas on 10/5/16. | |
*/ | |
object DeviceFSM { | |
// Actor commands | |
case class PutInBed(forPeriod: Duration = Duration.Inf) | |
case object PutOutOfBed | |
case class StopDevice() | |
case class Tick(time: DateTime) | |
case object AckTimeout | |
case class UpdateSoftware(deviceId: DeviceMacAddress, version: String) extends DeviceSpecific | |
case class ThreeXRate(buf:Boolean, time: Long) | |
case class ThreeXRateStop() | |
case class ConnectDisconnect(time:Long) | |
case class ConnectDisconnectStop() | |
// FSM states | |
sealed trait ConnectionState | |
case object Connecting extends ConnectionState | |
case object Connected extends ConnectionState | |
case object WaitingForSendAck extends ConnectionState | |
// FSM data | |
case class Data(uplink: Option[ActorRef] = None, downlink: Option[ActorRef] = None, | |
numOfAttempts: Int = 0, backlog: Queue[BlohMessage] = Queue.empty, connectionId: String = null) | |
// Actor creation factory. | |
trait PropsFactory { | |
def props(mac: DeviceMacAddress, supervisor: ActorRef, bc: BlohConfig, requestConnectionResponse: RequestConnectionResponse, scenarioBean: ScenarioBean, scenarioName: String): Props | |
} | |
object DefaultFactory extends PropsFactory { | |
def props(mac: DeviceMacAddress, supervisor: ActorRef, bc: BlohConfig, requestConnectionResponse: RequestConnectionResponse, scenarioBean: ScenarioBean, scenarioName: String): Props = Props(new DeviceFSM(mac, supervisor, bc, requestConnectionResponse, scenarioBean, scenarioName)) | |
} | |
} | |
class DeviceFSM(mac: DeviceMacAddress, supervisor: ActorRef, bc: BlohConfig, requestConnectionResponse: RequestConnectionResponse, scenarioBean: ScenarioBean, scenarioName: String) extends Actor with FSM[DeviceFSM.ConnectionState, DeviceFSM.Data] with DiagnosticActorLogging { | |
import DeviceFSM._ | |
import context.dispatcher | |
var tickSec: Long = 0 | |
val random = new scala.util.Random | |
val zeroSecDuration = FiniteDuration(0, TimeUnit.SECONDS) | |
val scenarioBeanParametersByName = Common.getScenarioBeanParametersMap.get(scenarioName) | |
var lastStepIndex = -1 | |
var eveningSecRandom = 0 | |
var morningSecRandom = 0 | |
scenarioBean.setStartScenarioBeanFromParams(scenarioBeanParametersByName.getStartParams) | |
val eveningDuration = asScalaBuffer(scenarioBeanParametersByName.getSteps()) | |
.toList.find(x => x.getTypeOfDay == TimeofDayType.DAY).get.getDurationInSec | |
val dayDuration = asScalaBuffer(scenarioBeanParametersByName.getSteps()) | |
.toList.find(x => x.getTypeOfDay == TimeofDayType.EVENING).get.getDurationInSec | |
val nightDuration = asScalaBuffer(scenarioBeanParametersByName.getSteps()) | |
.toList.filter(x => x.getTypeOfDay == TimeofDayType.NIGHT) | |
.map(_.getDurationInSec) | |
.foldLeft(0l)((acc, curr) => acc + curr) | |
val morningDuration = asScalaBuffer(scenarioBeanParametersByName.getSteps()) | |
.toList.find(x => x.getTypeOfDay == TimeofDayType.MORNING).get.getDurationInSec | |
val totalDuration = eveningDuration + dayDuration + nightDuration + morningDuration | |
val eveningRandomRange = 0 to eveningDuration.toInt | |
var stubDevice = new BlohMessageGenerator(scenarioBean) | |
val messagesSchedule = context.system.scheduler.schedule(1.second, 1.second)(sendTick) | |
val MAX_BACKLOG = 5 * 60 | |
val THREEXRATE_STEP = 3 | |
val MAX_UNSIGNED_SHORT_VALUE = Short.MaxValue * 2 + 1 | |
// 5 minutes of data to keep | |
val BACKLOG_SENDING_SPEED: Int = 5 | |
val logController: Int = 0 | |
var terminatedChild: Boolean = false | |
var terminatedChildAtConnecting: Boolean = false | |
var seqNumCounter: Int = -1 | |
var isInBedSignalSent = false | |
private[this] var noopCounter = 0 | |
private[this] var ackCounter = 0 | |
private[this] var sensorFlags: Map[SensorSerial, Int] = Map.empty | |
private[this] var reconnectTimeout = 60.seconds | |
private[this] var deviceTime = now | |
private[this] var uplinkConnected = false | |
private[this] var downlinkConnected = false | |
private[this] var threeXRateDuration: Option[Long] = None | |
val deviceInBedStats = DeviceStatsCommon.getDeviceInBedStats; | |
val connectedDeviceSet = DeviceStatsCommon.getConnectedDeviceSet; | |
var seqNumBuffer = ArrayBuffer.empty[Int] | |
val ackThreshold = new FiniteDuration(Common.getProps.getProperty("downlink.ack.threshold.seconds").toLong, TimeUnit.SECONDS) | |
override val supervisorStrategy = | |
AllForOneStrategy(maxNrOfRetries = 0) { | |
case e: Exception => { | |
log.info("In DeviceFSM strategy, sending stop signal", e) | |
DeviceStatsCommon.getDisconnectedDevices.put(mac.getAddressAsString, new DisconnectDeviceProperty(0, null, null)) | |
connectedDeviceSet.remove(mac.getAddressAsString) | |
DeviceStatsCommon.decrementDeviceRunCount(); | |
if (deviceInBedStats.get(self)) { | |
DeviceStatsCommon.decrementDeviceInBedCount(); | |
} | |
deviceInBedStats.remove(self); | |
stop(FSM.Failure(s"Stopping device actor as one of the child is failed to connect ! device ${mac.toString()}, will get restart from supervisor actor")) | |
SupervisorStrategy.Stop | |
} | |
} | |
override def postStop() { | |
super.postStop | |
messagesSchedule.cancel | |
} | |
override val log = akka.event.Logging(this) | |
override def mdc(msg: Any) = Map("deviceId" -> mac, "target" -> supervisor.path.name) | |
startWith(Connecting, connect) | |
when(Connecting)//(transform | |
{ | |
case Event(c: NettyAsyncUplink.Connected, data: Data) => | |
log.info("Uplink Connected for mac {} ", mac.getAddressAsString); | |
uplinkConnected = true; | |
if (downlinkConnected) {log.info("Both uplinke and downlink connected for mac {} ", mac.getAddressAsString); goto(Connected) using data.copy(numOfAttempts = 0) } | |
else stay | |
case Event(c: NettyAsyncDownlink.Connected, data: Data) => | |
log.info("Downlink Connected for mac {} ", mac.getAddressAsString); | |
downlinkConnected = true; | |
if (uplinkConnected) {log.info("Both uplinke and downlink connected for mac {} ", mac.getAddressAsString); goto(Connected) using data.copy(numOfAttempts = 0)} | |
else stay | |
case Event(t: Terminated, data: Data) => | |
if (data.numOfAttempts < 3) { | |
if (t.getActor.path.name.equals("uplink")) { | |
log.info("Uplink terminated for {} while establishing connnection, reconnecting with new uplink actor, reconnect attempts# {}", mac.getAddressAsString, data.numOfAttempts) | |
val uplinkRef = context.actorOf(Props(new NettyAsyncUplink(self, requestConnectionResponse.getConnectionId, requestConnectionResponse.getUpLinkUri, requestConnectionResponse.getCookies.asScala.toList)), "uplink") | |
context.watch(uplinkRef) | |
goto(Connecting) using data.copy(uplink = Option(uplinkRef), numOfAttempts = data.numOfAttempts + 1) | |
} | |
else if (t.getActor.path.name.equals("downlink")) { | |
log.info("Downlink terminated for {} while establishing connnection, reconnecting with new downlink actor, reconnect attempts# {}", mac.getAddressAsString, data.numOfAttempts) | |
val downlinkRef = context.actorOf(Props(new NettyAsyncDownlink(self, requestConnectionResponse.getConnectionId, requestConnectionResponse.getDownLinkUri, requestConnectionResponse.getCookies.asScala.toList)), "downlink") | |
context.watch(downlinkRef) | |
goto(Connecting) using data.copy(downlink = Option(downlinkRef), numOfAttempts = data.numOfAttempts + 1) | |
} else { | |
log.info("Either Uplink or Downlink terminated for {} while establishing connnection, reconnecting new actors, reconnect attempts# {}", mac.getAddressAsString, data.numOfAttempts) | |
context unwatch data.uplink.get | |
context unwatch data.downlink.get | |
context stop data.uplink.get | |
context stop data.downlink.get | |
val uplinkRef = context.actorOf(Props(new NettyAsyncUplink(self, requestConnectionResponse.getConnectionId, requestConnectionResponse.getUpLinkUri, requestConnectionResponse.getCookies.asScala.toList)), "uplink") | |
context.watch(uplinkRef) | |
val downlinkRef = context.actorOf(Props(new NettyAsyncDownlink(self, requestConnectionResponse.getConnectionId, requestConnectionResponse.getDownLinkUri, requestConnectionResponse.getCookies.asScala.toList)), "downlink") | |
context.watch(downlinkRef) | |
goto(Connecting) using data.copy(uplink = Option(uplinkRef), downlink = Option(downlinkRef), numOfAttempts = data.numOfAttempts + 1) | |
} | |
} else { | |
terminatedChildAtConnecting = true | |
if (deviceInBedStats.get(self)) { | |
DeviceStatsCommon.decrementDeviceInBedCount() | |
} | |
deviceInBedStats.remove(self) | |
DeviceStatsCommon.decrementDeviceRunCount(); | |
log.info("Made 3 attempts while connecting before stopping actor for mac: {}", mac.toString) | |
stop(FSM.Failure(s"Either uplink or downlink terminated while establishing connnection! device ${mac.toString()}, try 3 times, stoping device actor")) | |
} | |
} | |
//using sendBacklogIfExists) | |
onTransition { | |
case Connecting -> Connected => | |
connectedDeviceSet.add(mac.getAddressAsString); | |
setTimer("downlinkAck", AckTimeout, ackThreshold, false); | |
log.info("On transition Connecting -> Connected for mac {}", mac.getAddressAsString) | |
case Connecting -> WaitingForSendAck => | |
connectedDeviceSet.add(mac.getAddressAsString); | |
log.info("On transition Connecting -> Waiting for send ack for mac {}", mac.getAddressAsString) | |
} | |
when(Connected)(combine({ | |
//TODO check the data for other fields exists here | |
case Event(Tick(time), Data(Some(uplink), _, _, _, _)) => | |
val stepIndex = getCurrentScenarioStepIndex | |
val currentTypeOfDay = scenarioBeanParametersByName.getSteps.get(stepIndex).getTypeOfDay | |
if (lastStepIndex != stepIndex) { | |
lastStepIndex = stepIndex | |
if (currentTypeOfDay == TimeofDayType.DAY) { | |
setInBedTimer | |
} | |
} | |
if (currentTypeOfDay == TimeofDayType.EVENING | |
|| currentTypeOfDay == TimeofDayType.NIGHT) { | |
if (isInBedSignalSent && currentTypeOfDay == TimeofDayType.EVENING) { | |
scenarioBean.setScenarioBeanFromParams(scenarioBeanParametersByName.getSteps.get(stepIndex + 1).getParams) | |
} else { | |
scenarioBean.setScenarioBeanFromParams(scenarioBeanParametersByName.getSteps.get(stepIndex).getParams) | |
} | |
} | |
tickSec = (tickSec + 1) % totalDuration | |
if (!DeviceStatsCommon.getConnDisconnDeviceMap.keySet().contains(mac.getAddressAsString) | |
|| time.getMillis >= DeviceStatsCommon.getConnDisconnDeviceMap.get(mac.getAddressAsString).getTime) { | |
if (DeviceStatsCommon.getConnDisconnDeviceMap.keySet().contains(mac.getAddressAsString)) { | |
DeviceStatsCommon.getConnDisconnDeviceMap.remove(mac.getAddressAsString) | |
DeviceStatsCommon.decrementConnDisconnCount() | |
} | |
threeXRateDuration match { | |
case Some(duration) => | |
val messages = (0 to 2).map { offset => | |
val messageTime = (time.getMillis - duration * 1000) + offset * 1000 | |
stubDevice.createMessage(new DateTime(messageTime, DateTimeZone.UTC), getSeqNum) | |
}.toList | |
messages.foreach { msg => | |
seqNumBuffer += msg.getSeqNum | |
uplink ! msg; setAckTimer; | |
} | |
threeXRateDuration = Some(duration - THREEXRATE_STEP) | |
if (threeXRateDuration.get <= 0) { | |
threeXRateDuration = None | |
DeviceStatsCommon.decrementThreeXRateCountt() | |
log.info("Switched to 1X rate mode for mac: {}", mac.getAddressAsString) | |
} | |
case None => | |
val msg = stubDevice.createMessage(time, getSeqNum) | |
seqNumBuffer += msg.getSeqNum | |
uplink ! msg; setAckTimer; | |
} | |
goto(WaitingForSendAck) | |
} else stay | |
//After Ack time out if sent event come from Uplink will fall into connected state, no need to do anything | |
case Event(NettyAsyncUplink.Sent(id), data) => stay | |
case Event(ThreeXRate(false, duration), _) => | |
log.info("Switched to 3X rate mode with duration: {} s for mac: {}", duration, mac.getAddressAsString) | |
threeXRateDuration = Some(duration); | |
stay | |
case Event(NettyAsyncDownlink.MessageReceived(blohMsg), data: Data) => | |
processClientMessage(data.connectionId, blohMsg); stay | |
case Event(AckTimeout, _) => | |
log.info("Downlink ACK timeout for mac {}", mac.getAddressAsString); stay | |
}, handleDisconnect)) | |
when(WaitingForSendAck)(transform(combine({ | |
case Event(NettyAsyncUplink.Sent(id), data) => cancelAckTimer; goto(Connected) | |
case Event(AckTimeout, data) => log.info("Uplink mesage ACK timeout for mac {}", mac.getAddressAsString); goto(Connected) | |
}, handleDisconnect)) using sendBacklogIfExists) | |
whenUnhandled { | |
case Event(Tick(time), data) => ; stay using enqueDataPacket(time, data) | |
case Event(PutInBed(forPeriod), _) => | |
if (!deviceInBedStats.get(self)) { | |
isInBedSignalSent = true | |
log.debug("Put device In Bed! for mac: { } with timeout= {}", mac.getAddressAsString, forPeriod.asInstanceOf[FiniteDuration].toString()) | |
stubDevice.putInBed; | |
DeviceStatsCommon.incrementDeviceInBedCount | |
deviceInBedStats.put(self, true) | |
if (forPeriod.isFinite) { | |
setTimer("outOfBed", PutOutOfBed, forPeriod.asInstanceOf[FiniteDuration], false) | |
} | |
} else { | |
log.info("Put in bed got for mac {} which is already in bed, nothing to do ", mac.getAddressAsString) | |
} | |
stay | |
case Event(PutOutOfBed, _) => stubDevice.putOutOfBed | |
if (deviceInBedStats.get(self)) { | |
log.debug("Put device out of bed! for mac: {} ", mac.getAddressAsString) | |
isInBedSignalSent = false | |
cancelTimer("outOfBed") | |
deviceInBedStats.put(self, false) | |
DeviceStatsCommon.decrementDeviceInBedCount | |
} else { | |
log.info("Put out of bed got for mac {} which is already out of bed, shouldn't happen, canceling timer if any ", mac.getAddressAsString) | |
cancelTimer("outOfBed") //Defensive to cancel timer as it wont harm anything | |
} | |
stay | |
case Event(StopDevice, data: Data) => | |
context unwatch data.uplink.get | |
context unwatch data.downlink.get | |
context stop data.uplink.get | |
context stop data.downlink.get | |
context stop self | |
stay | |
case Event(NettyAsyncDownlink.MessageReceived(blohMsg), data: Data) => | |
processClientMessage(data.connectionId, blohMsg); stay | |
case Event(any, data) => log.info("Got unhandled event: {} in connected state for mac address: {}", any, mac.getAddressAsString); stay | |
} | |
/** | |
* | |
* @return | |
*/ | |
def now = new DateTime(DateTimeZone.UTC) //.withMillis(0L) | |
def getCurrentScenarioStepIndex: Int = { | |
var i = 0 | |
var sumOfStepDurations = scenarioBeanParametersByName.getSteps.get(0).getDurationInSec | |
while(i != scenarioBeanParametersByName.getSteps.size) { | |
if (tickSec < sumOfStepDurations) { | |
return i | |
} else { | |
sumOfStepDurations += scenarioBeanParametersByName.getSteps.get(i + 1).getDurationInSec | |
i += 1 | |
} | |
} | |
i | |
} | |
/** | |
* | |
*/ | |
def sendTick = { | |
deviceTime = deviceTime.plusSeconds(1) | |
self ! DeviceFSM.Tick(deviceTime) | |
} | |
/** | |
* | |
* @return | |
*/ | |
def connect: DeviceFSM.Data = { | |
log.info("Connecting device with macaddress :" + mac.getAddressAsString) | |
uplinkConnected = false; | |
downlinkConnected = false; | |
createUplinkAndDownlink(requestConnectionResponse.getConnectionId, requestConnectionResponse.getCookies.asScala.toList, requestConnectionResponse.getUpLinkUri, requestConnectionResponse.getDownLinkUri) | |
} | |
/** | |
* | |
* @param connectionId | |
* @param cookies | |
* @param up | |
* @param down | |
* @return | |
*/ | |
def createUplinkAndDownlink(connectionId: String, cookies: List[DefaultCookie], up: String, down: String): DeviceFSM.Data = { | |
log.info("Creating uplink and downlink actor for mac {}", mac.getAddressAsString) | |
val uplinkRef = context.actorOf(Props(new NettyAsyncUplink(self, connectionId, up, cookies)), "uplink") | |
context.watch(uplinkRef) | |
val downlinkRef = context.actorOf(Props(new NettyAsyncDownlink(self, connectionId, down, cookies)), "downlink") | |
context.watch(downlinkRef) | |
Data(Option(uplinkRef), Option(downlinkRef), 0, Queue.empty, connectionId) | |
} | |
/** | |
* | |
* @param time | |
* @param data | |
* @return | |
*/ | |
def enqueDataPacket(time: DateTime, data: Data): Data = { | |
val msg = stubDevice.createMessage(time, getSeqNum) | |
val backlog = data.backlog.enqueue(msg) | |
if (backlog.size > MAX_BACKLOG) { | |
if (!terminatedChildAtConnecting) { | |
log.info(s"In Enque Data packet for mac =${mac.getAddressAsString}, backlog.size > MAX_BACKLOG. backlog.size = ${backlog.size}. state $stateName") | |
data.copy(backlog = backlog.dequeue._2) | |
} else { | |
log.info("No child running for Devise FSM, throwing null pointer exception for mac: {}", mac.toString) | |
throw new DeviceFSMStopException("All children already stop, lets stop the DeviceFSM ") | |
} | |
} else { | |
data.copy(backlog = backlog) | |
} | |
} | |
/** | |
* | |
*/ | |
def getSeqNum = { | |
seqNumCounter = (seqNumCounter + 1) % (MAX_UNSIGNED_SHORT_VALUE + 1) | |
seqNumCounter | |
} | |
def setInBedTimer = { | |
eveningSecRandom = eveningRandomRange(random.nextInt(eveningRandomRange.length)) | |
val putInBedTimerDuration = FiniteDuration(dayDuration + eveningSecRandom, SECONDS) | |
val forPeriod = FiniteDuration(nightDuration, SECONDS) | |
log.info("Move device {} to in bed after {} sec", mac.getAddressAsString, putInBedTimerDuration) | |
setTimer("putInBed", PutInBed(forPeriod), putInBedTimerDuration, false) | |
} | |
/** | |
* | |
*/ | |
def setAckTimer { | |
setTimer("ack", AckTimeout, 5.seconds, false) | |
} | |
/** | |
* | |
*/ | |
def cancelAckTimer = cancelTimer("ack") | |
/** | |
* | |
* @param n | |
* @param queue | |
* @param acc | |
* @return | |
*/ | |
def dequeFirstN(n: Int, queue: Queue[BlohMessage], acc: ArrayBuffer[BlohMessage]): Queue[BlohMessage] = { | |
require(n >= 0) | |
queue.dequeueOption match { | |
case Some(a) => | |
if (n == 0) { | |
a._2 | |
} else { | |
dequeFirstN(n - 1, a._2, acc.+=(a._1)) | |
} | |
case None => queue | |
} | |
} | |
/** | |
* | |
* @return | |
*/ | |
def sendBacklogIfExists: PartialFunction[State, State] = { | |
//TODO check data state | |
case [email protected](state, data@Data(Some(uplink), _, _, backlog, _), timeout, stopReason, replies) if !backlog.isEmpty => | |
if (!terminatedChild) { | |
val acc = new ArrayBuffer[BlohMessage]() | |
val rb = dequeFirstN(BACKLOG_SENDING_SPEED, backlog, acc) | |
if (acc.size > 5) { | |
log.info("Sending backlog size: {} Messages left: {} For mac: {} ", acc.size, rb.size, mac.getAddressAsString) | |
} else { | |
log.info("Sending backlog size: {} Messages left: {} For mac: {} ", acc.size, rb.size, mac.getAddressAsString) | |
} | |
uplink ! UplinkMessages(acc(0).getSeqNum, acc.toList); | |
setAckTimer | |
goto(WaitingForSendAck) using data.copy(backlog = rb) | |
} else { | |
//Stopping DeviceFSM here as Akka was not able to stop the device in handle disconnect, may be issue with Akka | |
log.info("Child already terminated for Devise FSM, throwing null pointer exception for mac: {}", mac.toString) | |
throw new DeviceFSMStopException("All children already stop, lets stop the DeviceFSM ") | |
} | |
} | |
/** | |
* | |
* @param first | |
* @param second | |
* @return | |
*/ | |
def combine(first: StateFunction, second: StateFunction): StateFunction = first.orElse(second) | |
/** | |
* We should keep trying for disconnection without any limit ideally just like the actual device, no need to start device, just bring down both uplink and downlink and reconnect. | |
* @return | |
*/ | |
def handleDisconnect: StateFunction = { | |
case Event(Terminated(t: ActorRef), data) => | |
if (!terminatedChild) { | |
val name = t.path.name | |
log.info("Either uplink or downlink actor terminated with name {}! for mac: {}, restarting both uplink and downlink actor", name, mac.getAddressAsString) | |
terminatedChild = true | |
if (t.path.name.equals("downlink")) { | |
context stop data.uplink.get | |
context unwatch data.uplink.get | |
} else if (t.path.name.equals("uplink")) { | |
context stop data.downlink.get | |
context unwatch data.downlink.get | |
} | |
if (deviceInBedStats.get(self)) { | |
DeviceStatsCommon.decrementDeviceInBedCount(); | |
} | |
deviceInBedStats.remove(self); | |
if (connectedDeviceSet.contains(mac.getAddressAsString)) { | |
DeviceStatsCommon.getDisconnectedDevices.put(mac.getAddressAsString, new DisconnectDeviceProperty(0, null, null)) | |
connectedDeviceSet.remove(mac.getAddressAsString) | |
DeviceStatsCommon.decrementDeviceRunCount(); | |
} | |
log.info("Before stopping the DeviceFSM for device : {}", mac.toString) | |
stop(FSM.Failure(s"In handleDisconnect Stopping device actor as one of the child is failed to connect ! device ${mac.toString()}, will get restart from supervisor actor")) | |
} else { | |
log.info("Child already terminated for Devise FSM, stopping device fsm actor for mac: {}", mac.toString) | |
stop(FSM.Failure(s"In handleDisconnect Stopping device actor as one of the child is failed to connect ! device ${mac.toString()}, will get restart from supervisor actor")) | |
} | |
case Event(Terminated, data) => | |
if (!terminatedChild) { | |
log.info("Either uplink or downlink actor terminated without disconnect! for mac: {}, restarting both uplink and downlink actor ", mac.getAddressAsString) | |
terminatedChild = true; | |
context unwatch data.uplink.get | |
context unwatch data.downlink.get | |
context stop data.uplink.get | |
context stop data.downlink.get | |
if (deviceInBedStats.get(self)) { | |
DeviceStatsCommon.decrementDeviceInBedCount(); | |
} | |
deviceInBedStats.remove(self); | |
if (connectedDeviceSet.contains(mac.getAddressAsString)) { | |
DeviceStatsCommon.getDisconnectedDevices.put(mac.getAddressAsString, new DisconnectDeviceProperty(0, null, null)) | |
connectedDeviceSet.remove(mac.getAddressAsString) | |
DeviceStatsCommon.decrementDeviceRunCount(); | |
} | |
log.info("Before stopping the DeviceFSM for device : {}", mac.toString) | |
stop(FSM.Failure(s"In handleDisconnect Stopping device actor as one of the child is failed to connect ! device ${mac.toString()}, will get restart from supervisor actor")) | |
} else { | |
log.info("Child already terminated for Devise FSM, stopping device fsm actor for mac: {}", mac.toString) | |
stop(FSM.Failure(s"In handleDisconnect Stopping device actor as one of the child is failed to connect ! device ${mac.toString()}, will get restart from supervisor actor")) | |
} | |
} | |
/** | |
* | |
* @param connectionId | |
* @param blohMsg | |
*/ | |
def processClientMessage(connectionId: String, blohMsg: BlohMessage) = blohMsg.getType() match { | |
case BlohConstants.TYPE_NOOP => | |
log.info("Receive NOOP message! device={} connectionId={}", mac.getAddressAsString, connectionId); noopCounter += 1 | |
case BlohConstants.TYPE_DOWNLINK_COMMANDS => | |
log.info("Receive Device Command message! device={} connectionId={}", mac.getAddressAsString, connectionId) | |
processCommands(blohMsg.getBuf()) | |
} | |
/** | |
* | |
* @param buf | |
*/ | |
private[this] def processCommands(buf: Array[Byte]) { | |
var sensor = new SensorSerial(0) | |
TagParser.parse(buf).fold({ err => log.error("Error parsing tagged message: {}", err) }, _.foreach { | |
case Ack(seq) => | |
log.info("Receive ACK message! device={}", mac.getAddressAsString); | |
ackCounter += 1 | |
val index = seqNumBuffer.indexOf(seq) | |
seqNumBuffer = seqNumBuffer.drop(index + 1) | |
setTimer("downlinkAck", AckTimeout, ackThreshold, false) | |
case s: Sensor => sensor = new SensorSerial(s.serial); log.info("Set sensor message! device={}, sensor={}", mac.getAddressAsString, sensor.getSerialNumber()); | |
case RuntimeFlags(flags) => | |
log.info("Receive runtime flags! device={} sensor={} flags={}", mac.getAddressAsString, sensor.getSerialNumber(), flags) | |
try { | |
stubDevice.setRuntimeFlags(sensor, flags) | |
} catch { | |
case iae: IllegalArgumentException => log.info("Illegal Argument Exception, Invalid sensor or runtime flags! " + iae.getMessage) | |
case e: Exception => log.info("Exeption, Invalid sensor or runtime flags!" + e.getMessage) | |
} | |
sensorFlags += sensor -> flags | |
case tagId => log.info("Receive command tag:{} for mac {}", tagId, mac.getAddressAsString) | |
}) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment