Last active
February 2, 2017 15:29
-
-
Save mpkocher/34c5408ca6977c7a1e44855da9a22cc3 to your computer and use it in GitHub Desktop.
Alarm Pad
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.github.mpkocher.alarmpad | |
import java.nio.file.{Path, Paths} | |
import akka.actor._ | |
import akka.util.Timeout | |
import com.typesafe.config.{Config, ConfigFactory} | |
import com.typesafe.scalalogging.LazyLogging | |
import scala.collection.mutable | |
import scala.concurrent.duration._ | |
import scala.concurrent.ExecutionContext.Implicits.global | |
import scala.util.Try | |
import scala.util.Random | |
case class AlarmMetric(id: String, value: Double, message: Option[String]) | |
/// 0 means good, 1 means bad (emulating linux exit codes). Setting the Default value to bad. The alarm state | |
// Skipping the Severity and Severity[Double, SeverityLevel] Map | |
case class Alarm(id: String, name: String, description: String, defaultValue: Double = 1.0) | |
/** | |
* Every Fundamental Alarm should implement 2 things. | |
* | |
* 1. this interface and implement a companion static method of loadFromConfig | |
* 2. Update the LoadAlarmRunners | |
* | |
* Testing should be trivial to write and test. | |
* | |
* Not really thrilled about the name. | |
*/ | |
trait AlarmRunner { | |
// Every implementation should define a globally unique alarm id. | |
val alarm: Alarm | |
// This should compute the Health/Alarm metric. 0 good -> 1 bad | |
def run(): Double | |
// Wrapped to communicate the alarm Id | |
def runMetric(): AlarmMetric = { | |
val value = run() | |
// This should use an internal method to look up the Severity Map and create | |
// a meaningful message, such as "Tmp directory is 98% full" | |
val msg: Option[String] = None | |
AlarmMetric(alarm.id, value, msg) | |
} | |
} | |
trait DiskUtils { | |
def computeFreeSpace(path: Path): Long = { | |
50000L | |
} | |
def freeSpaceToHealthMetric(path: Path, minSpace: Long = 10L): Double = { | |
val freeSpace = computeFreeSpace(path) | |
// stupid example | |
if (freeSpace < minSpace) 1.0 else 0 | |
} | |
} | |
object TmpDirectoryAlarmRunner { | |
/** | |
* Load and Validate config to -> Alarm | |
* @param config | |
* @return | |
*/ | |
def loadFromConfig(config: Config): TmpDirectoryAlarmRunner = { | |
// hardcoded for example purposes | |
new TmpDirectoryAlarmRunner(Paths.get("/tmp")) | |
} | |
} | |
class TmpDirectoryAlarmRunner(path: Path) extends AlarmRunner with DiskUtils{ | |
val alarm = Alarm("smrtlink.alarms.tmp_dir", "Tmp Directory", "Desc") | |
def run() = freeSpaceToHealthMetric(path, 10L) | |
} | |
object JobDirectoryAlarmRunner { | |
def loadFromConfig(config: Config): JobDirectoryAlarmRunner = | |
new JobDirectoryAlarmRunner(Paths.get("/path/to/job-dir")) | |
} | |
class JobDirectoryAlarmRunner(path: Path) extends AlarmRunner with DiskUtils { | |
val alarm = Alarm("smrtlink.alarms.job_dir", "SL Job Dir", "Desc") | |
def run() = freeSpaceToHealthMetric(path, 5L) | |
} | |
class SmrtViewStatusAlarmRunner(host: String, port: Int = 8084) extends AlarmRunner{ | |
val alarm = Alarm("smrtlink.alarms.smrtview_status", "SMRT View Status", "Desc") | |
val r = scala.util.Random | |
// This is an example of an alarm that might need an implicit actorSystem | |
// to call run. Might need to change the base AlarmRunner | |
def run() = { | |
// This doesn't really make sense in this context. It's either 0 (the status was successful) | |
// or 1 the status failed to reach the SMRT View Server | |
r.nextDouble() | |
} | |
} | |
object SmrtViewStatusAlarmRunner { | |
def loadFromConfig(conf: Config) = | |
new SmrtViewStatusAlarmRunner("localhost", 8085) | |
} | |
// Mock out Dao for demo purposes | |
class AlarmDao(alarms: Seq[Alarm] = Seq.empty[Alarm]) { | |
private val metrics = mutable.Map.empty[String, Double] | |
// this should really be a Set | |
private val registeredAlarms = mutable.ListBuffer.empty[Alarm] | |
def addAlarm(alarm: Alarm) = { | |
println(s"adding alarm $alarm") | |
registeredAlarms += alarm | |
} | |
def addAlarms(alarms: Seq[Alarm]) = | |
alarms.foreach(addAlarm) | |
def updateAlarmMetric(alarmMetric: AlarmMetric) = { | |
println(s"Updating $alarmMetric") | |
metrics += (alarmMetric.id -> alarmMetric.value) | |
} | |
def summary = { | |
val xs = metrics.map(x => s"${x._1} -> ${x._2}") | |
.reduceLeftOption(_ + "\n" + _) | |
.getOrElse("No Metrics") | |
s"Alarm Summary\nMetrics\n$xs" | |
} | |
} | |
object AlarmManager { | |
case object UpdateAlarms | |
case object AlarmSummary // for debugging | |
} | |
/** | |
* Alarm Manager is repsonsible for running alarms and updating | |
* the Dao. | |
* | |
* Simplifying this to only use a single time delta to call the runners. | |
* This could be expanded to loading from the config and spawning an Actor for each | |
* AlarmRunner with a specific periodicity. Nevertheless, this is the responsiblity | |
* of the AlarmManager to run the AlarmRunner | |
* | |
* @param runners | |
* @param dao | |
* @param frequency Rate at which to call the runners to update the AlarmMetric | |
*/ | |
class AlarmManager(runners: Seq[AlarmRunner], | |
dao: AlarmDao, | |
frequency: FiniteDuration = 5.seconds) extends Actor{ | |
import AlarmManager._ | |
private var wasRegistered = false | |
// Use the standard akka apis to trigger calls to self to call the AlarmRunners | |
val updaterRunnable = context.system.scheduler.schedule(5.seconds, frequency, self, UpdateAlarms) | |
private def registerAlarms(alarms: Seq[Alarm]) = | |
dao.addAlarms(alarms) | |
def receive: Receive = { | |
case UpdateAlarms => | |
if (!wasRegistered) { | |
//FIXME(mpkocher) When Alarms are added, then should initialized in an ERROR State (versus initializing | |
// the alarm in a good state, and due to a bug, or error in the code, the Alarm wasn't updated to | |
// reflect the actual state. | |
registerAlarms(runners.map(_.alarm)) | |
wasRegistered = true | |
println(s"${runners.map(_.alarm)} AlarmRunners were registered") | |
} | |
// The dt of the call relative to the update call frequency is such | |
// that this can be called sequentially. If this is a problem, then a parallel model could be | |
// trivially added | |
runners.foreach { runner => | |
val alarmId = runner.alarm.id | |
val failedAlarmMetric = AlarmMetric(alarmId, 1.0, Some("Failed to compute alarm metric")) | |
// If the metric is failed to computed for any reasons, set to Fail | |
val alarmMetric = Try { runner.runMetric() }.getOrElse(failedAlarmMetric) | |
dao.updateAlarmMetric(alarmMetric) | |
} | |
case AlarmSummary => | |
// for debugging | |
println(dao.summary) | |
case x => println(s"Unhandled Alarm message $x") | |
} | |
} | |
/** | |
* Users should *only* need to extend this to load | |
* there custom AlarmRunner in `alarmRunners` | |
*/ | |
trait LoadAlarmRunners { | |
lazy val config = ConfigFactory.load() | |
lazy val alarmRunners: Seq[AlarmRunner] = | |
Seq( | |
TmpDirectoryAlarmRunner.loadFromConfig(config), | |
JobDirectoryAlarmRunner.loadFromConfig(config), | |
SmrtViewStatusAlarmRunner.loadFromConfig(config) | |
) | |
} | |
object Demo extends LoadAlarmRunners with LazyLogging{ | |
def run = { | |
logger.info("Starting Demo") | |
import AlarmManager._ | |
implicit val timeout = Timeout(10.seconds) | |
implicit val system = ActorSystem("alarm-pad") | |
lazy val alarmDao = new AlarmDao() | |
lazy val alarmManagerActor = system.actorOf(Props(new AlarmManager(alarmRunners, alarmDao))) | |
val summaryRunnable = system.scheduler.schedule(5.seconds, 5.seconds, alarmManagerActor, AlarmSummary) | |
scala.io.StdIn.readLine("Press any key to exit ...\n") | |
summaryRunnable.cancel() | |
system.shutdown() | |
0 | |
} | |
} | |
object Program extends App { | |
val exitCode = Demo.run | |
println(s"Exiting main with $exitCode") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment