-
-
Save etsangsplk/eb70983ee6f87bcd92e2af3c04968fd8 to your computer and use it in GitHub Desktop.
github.com/Sirupsen/logrus hook for logging to CloudWatch Logs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package logrus_ext | |
import ( | |
"encoding/json" | |
"fmt" | |
"github.com/Sirupsen/logrus" | |
"github.com/aws/aws-sdk-go/aws" | |
"github.com/aws/aws-sdk-go/aws/session" | |
"github.com/aws/aws-sdk-go/service/cloudwatchlogs" | |
"github.com/oleiade/lane" | |
"os" | |
"os/signal" | |
"strings" | |
"sync" | |
"syscall" | |
"time" | |
) | |
var ( | |
MaxPutEvents = 1000 | |
limit = int64(1) | |
logQueue = lane.NewQueue() | |
mutex sync.Mutex | |
) | |
type CloudWatchLogsHook struct { | |
LogGroupName string | |
LogStreamName string | |
Region string | |
} | |
func NewCloudWatchLogsHook(region, groupName, streamName string, flushInterval time.Duration) *CloudWatchLogsHook { | |
hook := &CloudWatchLogsHook{ | |
LogGroupName: groupName, | |
LogStreamName: streamName, | |
Region: region, | |
} | |
// 定期的にログを送信する。 | |
go func() { | |
ticker := time.NewTicker(flushInterval) | |
for { | |
select { | |
case <-ticker.C: | |
err := hook.flushLogs() | |
if err != nil { | |
logrus.WithField("error", err.Error()).Error("ログの出力に失敗しました") | |
} | |
} | |
} | |
ticker.Stop() | |
}() | |
// 終了時にログを送信する。 | |
signalChannel := make(chan os.Signal, 1) | |
signal.Notify(signalChannel, syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP, syscall.SIGKILL) | |
go func() { | |
sig := <-signalChannel | |
logrus.WithField("signal", sig).Info("サーバー終了シグナルを受信しました") | |
err := hook.flushLogs() | |
if err != nil { | |
panic(fmt.Sprintf("終了時ログ出力に失敗しました: %s", err)) | |
} | |
os.Exit(1) | |
}() | |
return hook | |
} | |
func (hook *CloudWatchLogsHook) Fire(entry *logrus.Entry) error { | |
data := make(map[string]interface{}) | |
data["level"] = strings.ToUpper(entry.Level.String()) | |
data["time"] = entry.Time.UTC() | |
data["message"] = entry.Message | |
data["detail"] = entry.Data | |
j, err := json.Marshal(data) | |
if err != nil { | |
panic(fmt.Sprintf("JSON マーシャリングに失敗しました: %s", err)) | |
} | |
timestamp := entry.Time.UTC().UnixNano() / 1000000 | |
message := string(j) | |
e := &cloudwatchlogs.InputLogEvent{ | |
Timestamp: ×tamp, | |
Message: &message, | |
} | |
logQueue.Enqueue(e) | |
return nil | |
} | |
func (hook *CloudWatchLogsHook) flushLogs() error { | |
// ログストリームに出力する時間順序整合性を保つためにロックする。 | |
mutex.Lock() | |
defer mutex.Unlock() | |
logEvents := make([]*cloudwatchlogs.InputLogEvent, 0, MaxPutEvents) | |
for len(logEvents) < MaxPutEvents && logQueue.Head() != nil { | |
e := logQueue.Dequeue() | |
logEvents = append(logEvents, e.(*cloudwatchlogs.InputLogEvent)) | |
} | |
if len(logEvents) == 0 { | |
return nil | |
} | |
config := &aws.Config{Region: &hook.Region} | |
service := cloudwatchlogs.New(session.New(), config) | |
streamDescription, err := service.DescribeLogStreams(&cloudwatchlogs.DescribeLogStreamsInput{ | |
LogGroupName: &hook.LogGroupName, | |
LogStreamNamePrefix: &hook.LogStreamName, | |
Limit: &limit, | |
}) | |
if err != nil { | |
return err | |
} | |
record := &cloudwatchlogs.PutLogEventsInput{ | |
LogGroupName: &hook.LogGroupName, | |
LogStreamName: &hook.LogStreamName, | |
LogEvents: logEvents, | |
SequenceToken: streamDescription.LogStreams[0].UploadSequenceToken, | |
} | |
_, err = service.PutLogEvents(record) | |
return err | |
} | |
func (hook *CloudWatchLogsHook) Levels() []logrus.Level { | |
return logrus.AllLevels | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment