Last active
August 14, 2019 11:39
-
-
Save kolobaev/0c722b298084cc65a10ec00265d17135 to your computer and use it in GitHub Desktop.
moira/notifier/escalations/escalation.go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package escalations | |
import ( | |
"fmt" | |
"sync" | |
"time" | |
"gopkg.in/tomb.v2" | |
"github.com/moira-alert/moira" | |
"github.com/moira-alert/moira/notifier" | |
) | |
// FetchEscalationsWorker - check for new notifications and send it using notifier | |
type FetchEscalationsWorker struct { | |
Logger moira.Logger | |
Database moira.Database | |
Notifier notifier.Notifier | |
tomb tomb.Tomb | |
} | |
// Start is a cycle that fetches scheduled notifications from database | |
func (w *FetchEscalationsWorker) Start() { | |
w.tomb.Go(func() error { | |
checkTicker := time.NewTicker(time.Second * 5) // TODO to config | |
for { | |
select { | |
case <-w.tomb.Dying(): | |
w.Logger.Info("Moira Notifier Fetching scheduled escalations stopped") | |
return nil | |
case <-checkTicker.C: | |
if err := w.processScheduledEscalations(); err != nil { | |
w.Logger.Warningf("Failed to fetch scheduled escalations: %s", err.Error()) | |
} | |
} | |
} | |
}) | |
w.Logger.Info("Moira Notifier Fetching scheduled escalations started") | |
} | |
// Stop stops new notifications fetching and wait for finish | |
func (w *FetchEscalationsWorker) Stop() error { | |
w.tomb.Kill(nil) | |
return w.tomb.Wait() | |
} | |
func (w *FetchEscalationsWorker) processScheduledEscalations() error { | |
escalationEvents, err := w.Database.FetchScheduledEscalationEvents(time.Now().Unix()) | |
if err != nil { | |
return err | |
} | |
notificationPackages := make(map[string]*notifier.NotificationPackage) | |
for _, escalation := range escalationEvents { | |
w.Logger.Debugf("processing trigger %s escalation", escalation.Trigger.ID) | |
hasEscalations, err := w.Database.TriggerHasPendingEscalations(escalation.Trigger.ID) | |
if err != nil { | |
return err | |
} | |
if !hasEscalations { | |
w.Logger.Debugf("skip trigger %s no pending escalations", escalation.Trigger.ID) | |
continue | |
} | |
lastCheck, err := w.Database.GetTriggerLastCheck(escalation.Trigger.ID) | |
if err != nil { | |
return err | |
} | |
// TODO what if send failed? | |
if escalation.IsFinal { | |
w.Database.AckTriggerEscalations(escalation.Trigger.ID) | |
} | |
if v, ok := lastCheck.Metrics[escalation.Event.Metric]; ok && v.State == "OK" { | |
w.Logger.Debugf("escalation trigger last check was ok") | |
continue | |
} | |
contacts, err := w.Database.GetContacts(escalation.Escalation.Contacts) | |
if err != nil { | |
return err | |
} | |
for _, contact := range contacts { | |
packageKey := fmt.Sprintf("%s:%s:%s", contact.Type, contact.Value, escalation.Trigger.ID) | |
p, found := notificationPackages[packageKey] | |
if !found { | |
p = ¬ifier.NotificationPackage{ | |
Events: make([]moira.NotificationEvent, 0), // TODO check capacity , len(scheduledEscalations)), | |
Trigger: escalation.Trigger, | |
Contact: *contact, | |
Throttled: false, | |
FailCount: 0, | |
NeedAck: !escalation.IsFinal, | |
} | |
} | |
p.Events = append(p.Events, escalation.Event) | |
notificationPackages[packageKey] = p | |
} | |
} | |
var wg sync.WaitGroup | |
for _, pkg := range notificationPackages { | |
w.Notifier.Send(pkg, &wg) | |
} | |
wg.Wait() | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment