Created
August 4, 2023 11:12
-
-
Save disintegrator/fe8db110e978a70bb366077c9413b78b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package functional | |
import ( | |
"context" | |
"errors" | |
"time" | |
"github.com/benthosdev/benthos/v4/public/service" | |
) | |
// collectInput reads all messages from an child input and pushes them out as a | |
// single batch. If the child input does not return service.ErrEndOfInput before | |
// timeout then this input will return an error. | |
type collectInput struct { | |
child *service.OwnedInput | |
timeout time.Duration | |
} | |
func (i *collectInput) ReadBatch(inCtx context.Context) (service.MessageBatch, service.AckFunc, error) { | |
ctx := inCtx | |
if i.timeout > 0 { | |
var cancel context.CancelFunc | |
ctx, cancel = context.WithTimeout(ctx, i.timeout) | |
defer cancel() | |
} | |
var batch service.MessageBatch | |
var ackFuncs []service.AckFunc | |
loop: | |
for { | |
b, ack, err := i.child.ReadBatch(ctx) | |
switch { | |
case errors.Is(err, service.ErrEndOfInput): | |
break loop | |
case err != nil: | |
return nil, nil, err | |
default: | |
batch = append(batch, b...) | |
ackFuncs = append(ackFuncs, ack) | |
} | |
} | |
ack := func(ctx context.Context, err error) error { | |
errs := make([]error, len(ackFuncs)) | |
for i, f := range ackFuncs { | |
errs[i] = f(ctx, err) | |
} | |
return errors.Join(errs...) | |
} | |
return batch, ack, nil | |
} | |
func (i *collectInput) Close(ctx context.Context) error { | |
return i.child.Close(ctx) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment