Skip to content

Instantly share code, notes, and snippets.

@kolobaev
Last active August 14, 2019 11:39
Show Gist options
  • Save kolobaev/0c722b298084cc65a10ec00265d17135 to your computer and use it in GitHub Desktop.
Save kolobaev/0c722b298084cc65a10ec00265d17135 to your computer and use it in GitHub Desktop.
moira/notifier/escalations/escalation.go
package escalations
import (
"fmt"
"sync"
"time"
"gopkg.in/tomb.v2"
"github.com/moira-alert/moira"
"github.com/moira-alert/moira/notifier"
)
// FetchEscalationsWorker - check for new notifications and send it using notifier
type FetchEscalationsWorker struct {
Logger moira.Logger
Database moira.Database
Notifier notifier.Notifier
tomb tomb.Tomb
}
// Start is a cycle that fetches scheduled notifications from database
func (w *FetchEscalationsWorker) Start() {
w.tomb.Go(func() error {
checkTicker := time.NewTicker(time.Second * 5) // TODO to config
for {
select {
case <-w.tomb.Dying():
w.Logger.Info("Moira Notifier Fetching scheduled escalations stopped")
return nil
case <-checkTicker.C:
if err := w.processScheduledEscalations(); err != nil {
w.Logger.Warningf("Failed to fetch scheduled escalations: %s", err.Error())
}
}
}
})
w.Logger.Info("Moira Notifier Fetching scheduled escalations started")
}
// Stop stops new notifications fetching and wait for finish
func (w *FetchEscalationsWorker) Stop() error {
w.tomb.Kill(nil)
return w.tomb.Wait()
}
func (w *FetchEscalationsWorker) processScheduledEscalations() error {
escalationEvents, err := w.Database.FetchScheduledEscalationEvents(time.Now().Unix())
if err != nil {
return err
}
notificationPackages := make(map[string]*notifier.NotificationPackage)
for _, escalation := range escalationEvents {
w.Logger.Debugf("processing trigger %s escalation", escalation.Trigger.ID)
hasEscalations, err := w.Database.TriggerHasPendingEscalations(escalation.Trigger.ID)
if err != nil {
return err
}
if !hasEscalations {
w.Logger.Debugf("skip trigger %s no pending escalations", escalation.Trigger.ID)
continue
}
lastCheck, err := w.Database.GetTriggerLastCheck(escalation.Trigger.ID)
if err != nil {
return err
}
// TODO what if send failed?
if escalation.IsFinal {
w.Database.AckTriggerEscalations(escalation.Trigger.ID)
}
if v, ok := lastCheck.Metrics[escalation.Event.Metric]; ok && v.State == "OK" {
w.Logger.Debugf("escalation trigger last check was ok")
continue
}
contacts, err := w.Database.GetContacts(escalation.Escalation.Contacts)
if err != nil {
return err
}
for _, contact := range contacts {
packageKey := fmt.Sprintf("%s:%s:%s", contact.Type, contact.Value, escalation.Trigger.ID)
p, found := notificationPackages[packageKey]
if !found {
p = &notifier.NotificationPackage{
Events: make([]moira.NotificationEvent, 0), // TODO check capacity , len(scheduledEscalations)),
Trigger: escalation.Trigger,
Contact: *contact,
Throttled: false,
FailCount: 0,
NeedAck: !escalation.IsFinal,
}
}
p.Events = append(p.Events, escalation.Event)
notificationPackages[packageKey] = p
}
}
var wg sync.WaitGroup
for _, pkg := range notificationPackages {
w.Notifier.Send(pkg, &wg)
}
wg.Wait()
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment