Skip to content

Instantly share code, notes, and snippets.

@mpkocher
Last active February 2, 2017 15:29
Show Gist options
  • Save mpkocher/34c5408ca6977c7a1e44855da9a22cc3 to your computer and use it in GitHub Desktop.
Save mpkocher/34c5408ca6977c7a1e44855da9a22cc3 to your computer and use it in GitHub Desktop.
Alarm Pad
package com.github.mpkocher.alarmpad
import java.nio.file.{Path, Paths}
import akka.actor._
import akka.util.Timeout
import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.scalalogging.LazyLogging
import scala.collection.mutable
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.Try
import scala.util.Random
case class AlarmMetric(id: String, value: Double, message: Option[String])
/// 0 means good, 1 means bad (emulating linux exit codes). Setting the Default value to bad. The alarm state
// Skipping the Severity and Severity[Double, SeverityLevel] Map
case class Alarm(id: String, name: String, description: String, defaultValue: Double = 1.0)
/**
* Every Fundamental Alarm should implement 2 things.
*
* 1. this interface and implement a companion static method of loadFromConfig
* 2. Update the LoadAlarmRunners
*
* Testing should be trivial to write and test.
*
* Not really thrilled about the name.
*/
trait AlarmRunner {
// Every implementation should define a globally unique alarm id.
val alarm: Alarm
// This should compute the Health/Alarm metric. 0 good -> 1 bad
def run(): Double
// Wrapped to communicate the alarm Id
def runMetric(): AlarmMetric = {
val value = run()
// This should use an internal method to look up the Severity Map and create
// a meaningful message, such as "Tmp directory is 98% full"
val msg: Option[String] = None
AlarmMetric(alarm.id, value, msg)
}
}
trait DiskUtils {
def computeFreeSpace(path: Path): Long = {
50000L
}
def freeSpaceToHealthMetric(path: Path, minSpace: Long = 10L): Double = {
val freeSpace = computeFreeSpace(path)
// stupid example
if (freeSpace < minSpace) 1.0 else 0
}
}
object TmpDirectoryAlarmRunner {
/**
* Load and Validate config to -> Alarm
* @param config
* @return
*/
def loadFromConfig(config: Config): TmpDirectoryAlarmRunner = {
// hardcoded for example purposes
new TmpDirectoryAlarmRunner(Paths.get("/tmp"))
}
}
class TmpDirectoryAlarmRunner(path: Path) extends AlarmRunner with DiskUtils{
val alarm = Alarm("smrtlink.alarms.tmp_dir", "Tmp Directory", "Desc")
def run() = freeSpaceToHealthMetric(path, 10L)
}
object JobDirectoryAlarmRunner {
def loadFromConfig(config: Config): JobDirectoryAlarmRunner =
new JobDirectoryAlarmRunner(Paths.get("/path/to/job-dir"))
}
class JobDirectoryAlarmRunner(path: Path) extends AlarmRunner with DiskUtils {
val alarm = Alarm("smrtlink.alarms.job_dir", "SL Job Dir", "Desc")
def run() = freeSpaceToHealthMetric(path, 5L)
}
class SmrtViewStatusAlarmRunner(host: String, port: Int = 8084) extends AlarmRunner{
val alarm = Alarm("smrtlink.alarms.smrtview_status", "SMRT View Status", "Desc")
val r = scala.util.Random
// This is an example of an alarm that might need an implicit actorSystem
// to call run. Might need to change the base AlarmRunner
def run() = {
// This doesn't really make sense in this context. It's either 0 (the status was successful)
// or 1 the status failed to reach the SMRT View Server
r.nextDouble()
}
}
object SmrtViewStatusAlarmRunner {
def loadFromConfig(conf: Config) =
new SmrtViewStatusAlarmRunner("localhost", 8085)
}
// Mock out Dao for demo purposes
class AlarmDao(alarms: Seq[Alarm] = Seq.empty[Alarm]) {
private val metrics = mutable.Map.empty[String, Double]
// this should really be a Set
private val registeredAlarms = mutable.ListBuffer.empty[Alarm]
def addAlarm(alarm: Alarm) = {
println(s"adding alarm $alarm")
registeredAlarms += alarm
}
def addAlarms(alarms: Seq[Alarm]) =
alarms.foreach(addAlarm)
def updateAlarmMetric(alarmMetric: AlarmMetric) = {
println(s"Updating $alarmMetric")
metrics += (alarmMetric.id -> alarmMetric.value)
}
def summary = {
val xs = metrics.map(x => s"${x._1} -> ${x._2}")
.reduceLeftOption(_ + "\n" + _)
.getOrElse("No Metrics")
s"Alarm Summary\nMetrics\n$xs"
}
}
object AlarmManager {
case object UpdateAlarms
case object AlarmSummary // for debugging
}
/**
* Alarm Manager is repsonsible for running alarms and updating
* the Dao.
*
* Simplifying this to only use a single time delta to call the runners.
* This could be expanded to loading from the config and spawning an Actor for each
* AlarmRunner with a specific periodicity. Nevertheless, this is the responsiblity
* of the AlarmManager to run the AlarmRunner
*
* @param runners
* @param dao
* @param frequency Rate at which to call the runners to update the AlarmMetric
*/
class AlarmManager(runners: Seq[AlarmRunner],
dao: AlarmDao,
frequency: FiniteDuration = 5.seconds) extends Actor{
import AlarmManager._
private var wasRegistered = false
// Use the standard akka apis to trigger calls to self to call the AlarmRunners
val updaterRunnable = context.system.scheduler.schedule(5.seconds, frequency, self, UpdateAlarms)
private def registerAlarms(alarms: Seq[Alarm]) =
dao.addAlarms(alarms)
def receive: Receive = {
case UpdateAlarms =>
if (!wasRegistered) {
//FIXME(mpkocher) When Alarms are added, then should initialized in an ERROR State (versus initializing
// the alarm in a good state, and due to a bug, or error in the code, the Alarm wasn't updated to
// reflect the actual state.
registerAlarms(runners.map(_.alarm))
wasRegistered = true
println(s"${runners.map(_.alarm)} AlarmRunners were registered")
}
// The dt of the call relative to the update call frequency is such
// that this can be called sequentially. If this is a problem, then a parallel model could be
// trivially added
runners.foreach { runner =>
val alarmId = runner.alarm.id
val failedAlarmMetric = AlarmMetric(alarmId, 1.0, Some("Failed to compute alarm metric"))
// If the metric is failed to computed for any reasons, set to Fail
val alarmMetric = Try { runner.runMetric() }.getOrElse(failedAlarmMetric)
dao.updateAlarmMetric(alarmMetric)
}
case AlarmSummary =>
// for debugging
println(dao.summary)
case x => println(s"Unhandled Alarm message $x")
}
}
/**
* Users should *only* need to extend this to load
* there custom AlarmRunner in `alarmRunners`
*/
trait LoadAlarmRunners {
lazy val config = ConfigFactory.load()
lazy val alarmRunners: Seq[AlarmRunner] =
Seq(
TmpDirectoryAlarmRunner.loadFromConfig(config),
JobDirectoryAlarmRunner.loadFromConfig(config),
SmrtViewStatusAlarmRunner.loadFromConfig(config)
)
}
object Demo extends LoadAlarmRunners with LazyLogging{
def run = {
logger.info("Starting Demo")
import AlarmManager._
implicit val timeout = Timeout(10.seconds)
implicit val system = ActorSystem("alarm-pad")
lazy val alarmDao = new AlarmDao()
lazy val alarmManagerActor = system.actorOf(Props(new AlarmManager(alarmRunners, alarmDao)))
val summaryRunnable = system.scheduler.schedule(5.seconds, 5.seconds, alarmManagerActor, AlarmSummary)
scala.io.StdIn.readLine("Press any key to exit ...\n")
summaryRunnable.cancel()
system.shutdown()
0
}
}
object Program extends App {
val exitCode = Demo.run
println(s"Exiting main with $exitCode")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment