Created
February 16, 2023 11:23
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"encoding/base64" | |
"encoding/json" | |
"log" | |
"strings" | |
"sync" | |
"github.com/Shopify/sarama" | |
) | |
var mskTopic = "my-msk-topic" | |
var mskBrokerList = []string{"my-msk-broker-1:9092", "my-msk-broker-2:9092"} // Replace with your own broker list | |
func main() { | |
// Initialize a Sarama configuration with the specified broker list | |
config := sarama.NewConfig() | |
config.Producer.Return.Successes = true | |
config.Producer.RequiredAcks = sarama.WaitForAll | |
config.Producer.Compression = sarama.CompressionGZIP | |
producer, err := sarama.NewAsyncProducer(mskBrokerList, config) | |
if err != nil { | |
log.Fatal(err) | |
} | |
defer func() { | |
if err := producer.Close(); err != nil { | |
log.Fatal(err) | |
} | |
}() | |
// Define the Lambda function handler | |
handler := func(event json.RawMessage) error { | |
// Parse the CloudWatch Logs subscription event | |
var record struct { | |
Data string `json:"data"` | |
} | |
if err := json.Unmarshal(event, &record); err != nil { | |
return err | |
} | |
// Decode the base64-encoded log data | |
logData, err := base64.StdEncoding.DecodeString(record.Data) | |
if err != nil { | |
return err | |
} | |
// Extract the log events and publish them to the Kafka topic | |
var logs struct { | |
LogGroup string `json:"logGroup"` | |
LogStream string `json:"logStream"` | |
LogEvents []struct { | |
Message string `json:"message"` | |
Timestamp int64 `json:"timestamp"` | |
} `json:"logEvents"` | |
Owner string `json:"owner"` | |
} | |
if err := json.Unmarshal(logData, &logs); err != nil { | |
return err | |
} | |
// Use a wait group to wait for all messages to be published | |
var wg sync.WaitGroup | |
for _, logEvent := range logs.LogEvents { | |
wg.Add(1) | |
go func(message string) { | |
defer wg.Done() | |
msg := &sarama.ProducerMessage{ | |
Topic: mskTopic, | |
Value: sarama.StringEncoder(message), | |
} | |
producer.Input() <- msg | |
}(strings.TrimSpace(logEvent.Message)) | |
} | |
wg.Wait() | |
return nil | |
} | |
// Start the Lambda function with an event loop | |
lambda.Start(handler) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment