Last active
December 3, 2019 00:31
-
-
Save voronaam/1e984a2fef66b49d8dcc75872427455c to your computer and use it in GitHub Desktop.
Benthos background processor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package processor | |
import ( | |
"fmt" | |
"time" | |
"github.com/Jeffail/benthos/v3/lib/log" | |
"github.com/Jeffail/benthos/v3/lib/metrics" | |
"github.com/Jeffail/benthos/v3/lib/processor" | |
"github.com/Jeffail/benthos/v3/lib/types" | |
) | |
func init() { | |
processor.RegisterPlugin( | |
"background", | |
func() interface{} { | |
b := BackgroundConfig{} | |
return &b | |
}, | |
func( | |
iconf interface{}, | |
mgr types.Manager, | |
logger log.Modular, | |
stats metrics.Type, | |
) (types.Processor, error) { | |
mConf, ok := iconf.(*BackgroundConfig) | |
if !ok { | |
panic(fmt.Errorf("failed to cast config: %T", iconf)) | |
} | |
return NewBackground(mConf, mgr, logger, stats) | |
}, | |
) | |
processor.DocumentPlugin( | |
"background", | |
`Executes sub processors in background ignoring their results`, | |
nil, | |
) | |
} | |
//------------------------------------------------------------------------------ | |
// BackgroundConfig is a config struct containing fields for the Background | |
// processor. | |
type BackgroundConfig struct { | |
Processors []processor.Config `json:"processors" yaml:"processors"` | |
} | |
//------------------------------------------------------------------------------ | |
//Background allows children to run in background | |
type Background struct { | |
children []types.Processor | |
log log.Modular | |
mCount metrics.StatCounter | |
mErr metrics.StatCounter | |
mSent metrics.StatCounter | |
mBatchSent metrics.StatCounter | |
} | |
// NewBackground returns a Background processor. | |
func NewBackground( | |
conf *BackgroundConfig, mgr types.Manager, log log.Modular, stats metrics.Type, | |
) (processor.Type, error) { | |
var children []types.Processor | |
for i, pconf := range conf.Processors { | |
prefix := fmt.Sprintf("%v", i) | |
proc, err := processor.New(pconf, mgr, log.NewModule("."+prefix), metrics.Namespaced(stats, prefix)) | |
if err != nil { | |
return nil, err | |
} | |
children = append(children, proc) | |
} | |
return &Background{ | |
children: children, | |
log: log, | |
mCount: stats.GetCounter("count"), | |
mErr: stats.GetCounter("error"), | |
mSent: stats.GetCounter("sent"), | |
mBatchSent: stats.GetCounter("batch.sent"), | |
}, nil | |
} | |
//------------------------------------------------------------------------------ | |
// ProcessMessage applies the processor to a message | |
func (p *Background) ProcessMessage(msg types.Message) ([]types.Message, types.Response) { | |
msgs := [1]types.Message{msg} | |
go func() { | |
for _, msg := range msgs { | |
_, res := processor.ExecuteAll(p.children, msg) | |
if res != nil && res.Error() != nil { | |
// TODO: Log something | |
// TODO: Metrics | |
panic(res.Error()) | |
} | |
} | |
}() | |
return msgs[:], nil | |
} | |
// CloseAsync shuts down the processor and stops processing requests. | |
func (p *Background) CloseAsync() { | |
for _, c := range p.children { | |
c.CloseAsync() | |
} | |
} | |
// WaitForClose blocks until the processor has closed down. | |
func (p *Background) WaitForClose(timeout time.Duration) error { | |
stopBy := time.Now().Add(timeout) | |
for _, c := range p.children { | |
if err := c.WaitForClose(time.Until(stopBy)); err != nil { | |
return err | |
} | |
} | |
return nil | |
} | |
//------------------------------------------------------------------------------ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--- | |
# Sample YAML, which would timeout on a 30s sleep without background processor | |
input: | |
http_server: | |
timeout: 2s | |
pipeline: | |
processors: | |
- type: background | |
plugin: | |
processors: | |
- log: | |
message: "Sleeping" | |
- sleep: | |
duration: 30s | |
- log: | |
message: "Waking" | |
- log: | |
message: "Inside!!! ${!content}" | |
- log: | |
message: "Outside ${!content}" | |
output: | |
sync_response: {} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment