This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // Create a new request for the call. | |
| req, err := http.NewRequest("GET", "https://google.com.tr", nil) | |
| if err != nil { | |
| log.Fatalln(err) | |
| } | |
| // Create a ClientTrace value for the events we care about. | |
| trace := httptrace.ClientTrace{ | |
| GetConn: func(hostPort string) { | |
| log.Printf("Get Conn: %s\n", hostPort) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package processor | |
| import "sync" | |
| type workerPool struct { | |
| limit chan struct{} | |
| wg sync.WaitGroup | |
| } | |
| func (l *workerPool) acquire() { |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| func main() { | |
| consumerCfg := &kafka.ConsumerConfig{ | |
| Reader: kafka.ReaderConfig{ | |
| Brokers: []string{"localhost:29092"}, | |
| Topic: "standart-topic", | |
| GroupID: "standart-cg", | |
| }, | |
| RetryEnabled: true, | |
| RetryConfiguration: kafka.RetryConfiguration{ | |
| Brokers: []string{"localhost:29092"}, |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| func main() { | |
| consumerCfg := &kafka.ConsumerConfig{ | |
| Reader: kafka.ReaderConfig{ | |
| Brokers: []string{"localhost:29092"}, | |
| Topic: "standart-topic", | |
| GroupID: "standart-cg", | |
| }, | |
| RetryEnabled: true, | |
| RetryConfiguration: kafka.RetryConfiguration{ | |
| Topic: "retry-topic", |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| func main() { | |
| consumerCfg := &kafka.ConsumerConfig{ | |
| Reader: kafka.ReaderConfig{ | |
| Brokers: []string{"localhost:29092"}, | |
| Topic: "standart-topic", | |
| GroupID: "standart-cg", | |
| }, | |
| ConsumeFn: consumeFn, | |
| RetryEnabled: false, | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| func (s *IntegrationTestSuite) Test_Should_Consume_Successfully() { | |
| // Given | |
| cfg := &kafka.Config{ | |
| Brokers: s.wrapper.GetBrokerAddresses(), | |
| Consumer: kafka.ConsumerConfig{ | |
| GroupID: "consumer-group-1", | |
| Topic: "test-consume", | |
| }, | |
| } | |
| producer := kafka.NewProducer(cfg) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| func retryFunc(hostPort int) func() error { | |
| return func() error { | |
| err := kafka.CheckHealth(hostPort) | |
| if err != nil { | |
| fmt.Printf("kafka connection not ready: %v \n", err) | |
| } | |
| return err | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| func (l *DockerTestWrapper) CleanUp() { | |
| if err := l.pool.Purge(l.container); err != nil { | |
| log.Printf("Could not purge container: %s\n", err) | |
| } | |
| } | |
| func (l *DockerTestWrapper) GetBrokerAddresses() []string { | |
| return []string{fmt.Sprintf("localhost:%d", l.hostPort)} | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| type DockerTestWrapper struct { | |
| container *dockertest.Resource | |
| pool *dockertest.Pool | |
| hostPort int | |
| } | |
| func (l *DockerTestWrapper) RunContainer() error { | |
| hostPort, err := getFreePort() | |
| if err != nil { | |
| return fmt.Errorf("could not get free hostPort: %w", err) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| func (t *TestContainerWrapper) CleanUp() { | |
| ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second) | |
| defer cancelFunc() | |
| if err := t.container.Terminate(ctx); err != nil { | |
| log.Printf("could not terminate container: %s\n", err) | |
| } | |
| } | |
| func (t *TestContainerWrapper) GetBrokerAddresses() []string { |
NewerOlder