Skip to content

Instantly share code, notes, and snippets.

@dsouzajude
Created July 14, 2019 13:13
Show Gist options
  • Select an option

  • Save dsouzajude/3dc9fbd0cf5746d6f304c27d79c39140 to your computer and use it in GitHub Desktop.

Select an option

Save dsouzajude/3dc9fbd0cf5746d6f304c27d79c39140 to your computer and use it in GitHub Desktop.
Fluentd Adapter Constructor Function
package fluentd
import (
"log"
"net"
"os"
"strconv"
"time"
"github.com/fluent/fluent-logger-golang/fluent"
"github.com/gliderlabs/logspout/router"
"github.com/pkg/errors"
)
const (
defaultProtocol = "tcp"
defaultBufferLimit = 1024 * 1024
defaultWriteTimeout = 3
defaultRetryWait = 1000
defaultMaxRetries = math.MaxInt32
)
func getenv(key, fallback string) string {
value := os.Getenv(key)
if len(value) == 0 {
return fallback
}
return value
}
// NewAdapter creates a Logspout fluentd adapter instance.
func NewAdapter(route *router.Route) (router.LogAdapter, error) {
transport, found := router.AdapterTransports.Lookup(route.AdapterTransport("tcp"))
if !found {
return nil, errors.New("Unable to find adapter: " + route.Adapter)
}
_, err := transport.Dial(route.Address, route.Options)
if err != nil {
return nil, err
}
log.Println("Connectivity successful to fluentd @ " + route.Address)
// Construct fluentd config object
host, port, err := net.SplitHostPort(route.Address)
portNum, err := strconv.Atoi(port)
if err != nil {
return nil, errors.Wrapf(err, "Invalid fluentd-address %s", route.Address)
}
bufferLimit, err := strconv.Atoi(getenv("FLUENTD_BUFFER_LIMIT", strconv.Itoa(defaultBufferLimit)))
if err != nil {
return nil, err
}
retryWait, err := strconv.Atoi(getenv("FLUENTD_RETRY_WAIT", strconv.Itoa(defaultRetryWait)))
if err != nil {
return nil, err
}
maxRetries, err := strconv.Atoi(getenv("FLUENTD_MAX_RETRIES", strconv.Itoa(defaultMaxRetries)))
if err != nil {
return nil, err
}
asyncConnect, err := strconv.ParseBool(getenv("FLUENTD_ASYNC_CONNECT", "false"))
if err != nil {
return nil, err
}
subSecondPrecision, err := strconv.ParseBool(getenv("FLUENTD_SUBSECOND_PRECISION", "false"))
if err != nil {
return nil, err
}
requestAck, err := strconv.ParseBool(getenv("FLUENTD_REQUEST_ACK", "false"))
if err != nil {
return nil, err
}
writeTimeout, err := strconv.Atoi(getenv("FLUENTD_WRITE_TIMEOUT", strconv.Itoa(defaultWriteTimeout)))
if err != nil {
return nil, err
}
fluentConfig := fluent.Config{
FluentHost: host,
FluentPort: portNum,
FluentNetwork: defaultProtocol,
FluentSocketPath: "",
BufferLimit: bufferLimit,
RetryWait: retryWait,
MaxRetry: maxRetries,
Async: asyncConnect,
SubSecondPrecision: subSecondPrecision,
RequestAck: requestAck,
WriteTimeout: time.Duration(writeTimeout) * time.Second,
}
writer, err := fluent.New(fluentConfig)
if err != nil {
return nil, errors.Wrapf(err, "Unable to create fluentd logger")
}
return &Adapter{
writer: writer,
tagPrefix: getenv("TAG_PREFIX", "docker"),
tagSuffixLabel: getenv("TAG_SUFFIX_LABEL", ""),
}, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment