Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save etsangsplk/eb70983ee6f87bcd92e2af3c04968fd8 to your computer and use it in GitHub Desktop.
Save etsangsplk/eb70983ee6f87bcd92e2af3c04968fd8 to your computer and use it in GitHub Desktop.
github.com/Sirupsen/logrus hook for logging to CloudWatch Logs
package logrus_ext
import (
"encoding/json"
"fmt"
"github.com/Sirupsen/logrus"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/oleiade/lane"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
)
var (
MaxPutEvents = 1000
limit = int64(1)
logQueue = lane.NewQueue()
mutex sync.Mutex
)
type CloudWatchLogsHook struct {
LogGroupName string
LogStreamName string
Region string
}
func NewCloudWatchLogsHook(region, groupName, streamName string, flushInterval time.Duration) *CloudWatchLogsHook {
hook := &CloudWatchLogsHook{
LogGroupName: groupName,
LogStreamName: streamName,
Region: region,
}
// 定期的にログを送信する。
go func() {
ticker := time.NewTicker(flushInterval)
for {
select {
case <-ticker.C:
err := hook.flushLogs()
if err != nil {
logrus.WithField("error", err.Error()).Error("ログの出力に失敗しました")
}
}
}
ticker.Stop()
}()
// 終了時にログを送信する。
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel, syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP, syscall.SIGKILL)
go func() {
sig := <-signalChannel
logrus.WithField("signal", sig).Info("サーバー終了シグナルを受信しました")
err := hook.flushLogs()
if err != nil {
panic(fmt.Sprintf("終了時ログ出力に失敗しました: %s", err))
}
os.Exit(1)
}()
return hook
}
func (hook *CloudWatchLogsHook) Fire(entry *logrus.Entry) error {
data := make(map[string]interface{})
data["level"] = strings.ToUpper(entry.Level.String())
data["time"] = entry.Time.UTC()
data["message"] = entry.Message
data["detail"] = entry.Data
j, err := json.Marshal(data)
if err != nil {
panic(fmt.Sprintf("JSON マーシャリングに失敗しました: %s", err))
}
timestamp := entry.Time.UTC().UnixNano() / 1000000
message := string(j)
e := &cloudwatchlogs.InputLogEvent{
Timestamp: &timestamp,
Message: &message,
}
logQueue.Enqueue(e)
return nil
}
func (hook *CloudWatchLogsHook) flushLogs() error {
// ログストリームに出力する時間順序整合性を保つためにロックする。
mutex.Lock()
defer mutex.Unlock()
logEvents := make([]*cloudwatchlogs.InputLogEvent, 0, MaxPutEvents)
for len(logEvents) < MaxPutEvents && logQueue.Head() != nil {
e := logQueue.Dequeue()
logEvents = append(logEvents, e.(*cloudwatchlogs.InputLogEvent))
}
if len(logEvents) == 0 {
return nil
}
config := &aws.Config{Region: &hook.Region}
service := cloudwatchlogs.New(session.New(), config)
streamDescription, err := service.DescribeLogStreams(&cloudwatchlogs.DescribeLogStreamsInput{
LogGroupName: &hook.LogGroupName,
LogStreamNamePrefix: &hook.LogStreamName,
Limit: &limit,
})
if err != nil {
return err
}
record := &cloudwatchlogs.PutLogEventsInput{
LogGroupName: &hook.LogGroupName,
LogStreamName: &hook.LogStreamName,
LogEvents: logEvents,
SequenceToken: streamDescription.LogStreams[0].UploadSequenceToken,
}
_, err = service.PutLogEvents(record)
return err
}
func (hook *CloudWatchLogsHook) Levels() []logrus.Level {
return logrus.AllLevels
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment