Skip to content

Instantly share code, notes, and snippets.

@kennytrytek-wf
Created July 16, 2025 21:41
Show Gist options
  • Save kennytrytek-wf/a3af128d664e61fa55f14c68e532c8fe to your computer and use it in GitHub Desktop.
Save kennytrytek-wf/a3af128d664e61fa55f14c68e532c8fe to your computer and use it in GitHub Desktop.
package crashreportprocessor
import (
"context"
"encoding/json"
"fmt"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
type Config struct{}
func createDefaultConfig() component.Config {
return &Config{}
}
func NewFactory() processor.Factory {
return processor.NewFactory(
component.MustNewType("crashreportprocessor"),
createDefaultConfig,
processor.WithLogs(createLogsProcessor, component.StabilityLevelDevelopment))
}
func createLogsProcessor(
_ context.Context,
_ processor.Settings,
_ component.Config,
nextConsumer consumer.Logs,
) (processor.Logs, error) {
return &crashReportProcessor{next: nextConsumer}, nil
}
type crashReportProcessor struct {
next consumer.Logs
}
func (p *crashReportProcessor) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: true}
}
func (p *crashReportProcessor) Start(ctx context.Context, host component.Host) error {
return nil
}
func (p *crashReportProcessor) Shutdown(ctx context.Context) error {
return nil
}
// ConsumeLogs takes the JSON payload of a crash report, and forwards a new log for every
// crash report found in the body. By default, the payload contains a list of report objects.
func (p *crashReportProcessor) ConsumeLogs(ctx context.Context, logs plog.Logs) error {
var err error
newLogs := plog.NewLogs()
for _, rl := range logs.ResourceLogs().All() {
res := rl.Resource()
for _, sl := range rl.ScopeLogs().All() {
scope := sl.Scope()
for _, l := range sl.LogRecords().All() {
var body []byte
var items []map[string]any
var item map[string]any
switch l.Body().Type() {
case pcommon.ValueTypeStr:
body = []byte(l.Body().Str())
case pcommon.ValueTypeBytes:
body = l.Body().Bytes().AsRaw()
default:
zap.L().Error("Invalid body type. Expected: Str or Bytes, Actual: " + l.Body().Type().String())
continue
}
if err = json.Unmarshal(body, &items); err != nil { // Is it an array?
if err = json.Unmarshal(body, &item); err != nil { // Is it a map?
zap.Error(err)
continue
} else {
items = []map[string]any{item}
}
}
for _, i := range items {
if t, ok := i["type"]; ok {
if t != "crash" {
zap.L().Warn(fmt.Sprintf("Expected type: crash, Actual: %v", t))
continue
}
} else {
zap.L().Warn(fmt.Sprintf("No 'type' field in body: %v", i))
continue
}
// for each item in the list, add a new log record
nrl := newLogs.ResourceLogs().AppendEmpty()
res.CopyTo(nrl.Resource())
nsl := nrl.ScopeLogs().AppendEmpty()
nsl.Scope().SetName(scope.Name())
nsl.Scope().SetVersion(scope.Version())
nl := nsl.LogRecords().AppendEmpty()
if err = nl.Body().SetEmptyMap().FromRaw(i); err != nil {
zap.Error(err)
continue
}
if err = nl.Attributes().FromRaw(i); err != nil {
zap.Error(err)
continue
}
nl.SetTimestamp(l.Timestamp())
nl.SetSeverityNumber(l.SeverityNumber())
nl.SetSeverityText(l.SeverityText())
}
}
}
}
if err = p.next.ConsumeLogs(ctx, newLogs); err != nil {
trace.SpanFromContext(ctx).RecordError(err)
}
return err
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment