Created
July 16, 2025 21:41
-
-
Save kennytrytek-wf/a3af128d664e61fa55f14c68e532c8fe to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package crashreportprocessor | |
import ( | |
"context" | |
"encoding/json" | |
"fmt" | |
"go.opentelemetry.io/collector/component" | |
"go.opentelemetry.io/collector/consumer" | |
"go.opentelemetry.io/collector/pdata/pcommon" | |
"go.opentelemetry.io/collector/pdata/plog" | |
"go.opentelemetry.io/collector/processor" | |
"go.opentelemetry.io/otel/trace" | |
"go.uber.org/zap" | |
) | |
type Config struct{} | |
func createDefaultConfig() component.Config { | |
return &Config{} | |
} | |
func NewFactory() processor.Factory { | |
return processor.NewFactory( | |
component.MustNewType("crashreportprocessor"), | |
createDefaultConfig, | |
processor.WithLogs(createLogsProcessor, component.StabilityLevelDevelopment)) | |
} | |
func createLogsProcessor( | |
_ context.Context, | |
_ processor.Settings, | |
_ component.Config, | |
nextConsumer consumer.Logs, | |
) (processor.Logs, error) { | |
return &crashReportProcessor{next: nextConsumer}, nil | |
} | |
type crashReportProcessor struct { | |
next consumer.Logs | |
} | |
func (p *crashReportProcessor) Capabilities() consumer.Capabilities { | |
return consumer.Capabilities{MutatesData: true} | |
} | |
func (p *crashReportProcessor) Start(ctx context.Context, host component.Host) error { | |
return nil | |
} | |
func (p *crashReportProcessor) Shutdown(ctx context.Context) error { | |
return nil | |
} | |
// ConsumeLogs takes the JSON payload of a crash report, and forwards a new log for every | |
// crash report found in the body. By default, the payload contains a list of report objects. | |
func (p *crashReportProcessor) ConsumeLogs(ctx context.Context, logs plog.Logs) error { | |
var err error | |
newLogs := plog.NewLogs() | |
for _, rl := range logs.ResourceLogs().All() { | |
res := rl.Resource() | |
for _, sl := range rl.ScopeLogs().All() { | |
scope := sl.Scope() | |
for _, l := range sl.LogRecords().All() { | |
var body []byte | |
var items []map[string]any | |
var item map[string]any | |
switch l.Body().Type() { | |
case pcommon.ValueTypeStr: | |
body = []byte(l.Body().Str()) | |
case pcommon.ValueTypeBytes: | |
body = l.Body().Bytes().AsRaw() | |
default: | |
zap.L().Error("Invalid body type. Expected: Str or Bytes, Actual: " + l.Body().Type().String()) | |
continue | |
} | |
if err = json.Unmarshal(body, &items); err != nil { // Is it an array? | |
if err = json.Unmarshal(body, &item); err != nil { // Is it a map? | |
zap.Error(err) | |
continue | |
} else { | |
items = []map[string]any{item} | |
} | |
} | |
for _, i := range items { | |
if t, ok := i["type"]; ok { | |
if t != "crash" { | |
zap.L().Warn(fmt.Sprintf("Expected type: crash, Actual: %v", t)) | |
continue | |
} | |
} else { | |
zap.L().Warn(fmt.Sprintf("No 'type' field in body: %v", i)) | |
continue | |
} | |
// for each item in the list, add a new log record | |
nrl := newLogs.ResourceLogs().AppendEmpty() | |
res.CopyTo(nrl.Resource()) | |
nsl := nrl.ScopeLogs().AppendEmpty() | |
nsl.Scope().SetName(scope.Name()) | |
nsl.Scope().SetVersion(scope.Version()) | |
nl := nsl.LogRecords().AppendEmpty() | |
if err = nl.Body().SetEmptyMap().FromRaw(i); err != nil { | |
zap.Error(err) | |
continue | |
} | |
if err = nl.Attributes().FromRaw(i); err != nil { | |
zap.Error(err) | |
continue | |
} | |
nl.SetTimestamp(l.Timestamp()) | |
nl.SetSeverityNumber(l.SeverityNumber()) | |
nl.SetSeverityText(l.SeverityText()) | |
} | |
} | |
} | |
} | |
if err = p.next.ConsumeLogs(ctx, newLogs); err != nil { | |
trace.SpanFromContext(ctx).RecordError(err) | |
} | |
return err | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment