This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package processor | |
import "sync" | |
type workerPool struct { | |
limit chan struct{} | |
wg sync.WaitGroup | |
} | |
func (l *workerPool) acquire() { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
func main() { | |
consumerCfg := &kafka.ConsumerConfig{ | |
Reader: kafka.ReaderConfig{ | |
Brokers: []string{"localhost:29092"}, | |
Topic: "standart-topic", | |
GroupID: "standart-cg", | |
}, | |
RetryEnabled: true, | |
RetryConfiguration: kafka.RetryConfiguration{ | |
Brokers: []string{"localhost:29092"}, |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
func main() { | |
consumerCfg := &kafka.ConsumerConfig{ | |
Reader: kafka.ReaderConfig{ | |
Brokers: []string{"localhost:29092"}, | |
Topic: "standart-topic", | |
GroupID: "standart-cg", | |
}, | |
RetryEnabled: true, | |
RetryConfiguration: kafka.RetryConfiguration{ | |
Topic: "retry-topic", |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
func main() { | |
consumerCfg := &kafka.ConsumerConfig{ | |
Reader: kafka.ReaderConfig{ | |
Brokers: []string{"localhost:29092"}, | |
Topic: "standart-topic", | |
GroupID: "standart-cg", | |
}, | |
ConsumeFn: consumeFn, | |
RetryEnabled: false, | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
func (s *IntegrationTestSuite) Test_Should_Consume_Successfully() { | |
// Given | |
cfg := &kafka.Config{ | |
Brokers: s.wrapper.GetBrokerAddresses(), | |
Consumer: kafka.ConsumerConfig{ | |
GroupID: "consumer-group-1", | |
Topic: "test-consume", | |
}, | |
} | |
producer := kafka.NewProducer(cfg) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
func retryFunc(hostPort int) func() error { | |
return func() error { | |
err := kafka.CheckHealth(hostPort) | |
if err != nil { | |
fmt.Printf("kafka connection not ready: %v \n", err) | |
} | |
return err | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
func (l *DockerTestWrapper) CleanUp() { | |
if err := l.pool.Purge(l.container); err != nil { | |
log.Printf("Could not purge container: %s\n", err) | |
} | |
} | |
func (l *DockerTestWrapper) GetBrokerAddresses() []string { | |
return []string{fmt.Sprintf("localhost:%d", l.hostPort)} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type DockerTestWrapper struct { | |
container *dockertest.Resource | |
pool *dockertest.Pool | |
hostPort int | |
} | |
func (l *DockerTestWrapper) RunContainer() error { | |
hostPort, err := getFreePort() | |
if err != nil { | |
return fmt.Errorf("could not get free hostPort: %w", err) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
func (t *TestContainerWrapper) CleanUp() { | |
ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second) | |
defer cancelFunc() | |
if err := t.container.Terminate(ctx); err != nil { | |
log.Printf("could not terminate container: %s\n", err) | |
} | |
} | |
func (t *TestContainerWrapper) GetBrokerAddresses() []string { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
func (t *TestContainerWrapper) RunContainer() error { | |
req := testcontainers.ContainerRequest{ | |
Image: fmt.Sprintf("%s:%s", RedpandaImage, RedpandaVersion), | |
ExposedPorts: []string{ | |
"9092:9092/tcp", | |
}, | |
Cmd: []string{"redpanda", "start"}, | |
WaitingFor: wait.ForLog("Successfully started Redpanda!"), | |
AutoRemove: true, | |
} |
NewerOlder