Last active
April 8, 2020 14:21
-
-
Save ervitis/13935dc07573ff360221bcb3fcfd8f44 to your computer and use it in GitHub Desktop.
UnkownProducerIdException
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package app.java.kafka.poc; | |
import org.apache.kafka.common.KafkaException; | |
import org.apache.kafka.common.errors.UnknownProducerIdException; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.boot.SpringApplication; | |
import org.springframework.boot.autoconfigure.SpringBootApplication; | |
import org.springframework.cloud.stream.annotation.EnableBinding; | |
import org.springframework.cloud.stream.annotation.StreamListener; | |
import org.springframework.cloud.stream.binding.BinderAwareChannelResolver; | |
import org.springframework.messaging.Message; | |
import org.springframework.messaging.MessageChannel; | |
import org.springframework.messaging.support.MessageBuilder; | |
import app.java.kafka.poc.bindings.Bindings; | |
@SpringBootApplication | |
@EnableBinding(Bindings.class) | |
public class PocApplication { | |
private static final Logger log = LoggerFactory.getLogger(PocApplication.class); | |
public static void main(String[] args) { | |
SpringApplication.run(PocApplication.class, args); | |
} | |
@Autowired | |
private BinderAwareChannelResolver binderAwareChannelResolver; | |
@StreamListener("test-input") | |
public void listen(Message<?> in, String headerKey) { | |
final MessageBuilder builder; | |
MessageChannel messageChannel; | |
messageChannel = this.binderAwareChannelResolver.resolveDestination("test-output"); | |
Object payload = in.getPayload(); | |
builder = MessageBuilder.withPayload(payload); | |
try { | |
log.info("Event received: {}", in); | |
if (!messageChannel.send(builder.build())) { | |
log.error("Something happend trying send the message! {}", in.getPayload()); | |
} | |
} catch (UnknownProducerIdException e) { | |
log.error("UnkownProducerIdException catched ", e); | |
System.out.println(e); | |
messageChannel.send(builder.build()); | |
} catch (KafkaException e) { | |
log.error("KafkaException catched ", e); | |
System.out.println(e); | |
messageChannel.send(builder.build()); | |
}catch (Exception e) { | |
System.out.println("Commit failed " + e.getMessage()); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
server: | |
port: 8086 | |
spring: | |
cloud: | |
stream: | |
kafka: | |
binder: | |
auto-create-topics: true | |
brokers: ${KAFKA_HOST:`kafka-1:19092,kafka-2:29092,kafka-3:39092`} | |
transaction: | |
producer: | |
error-channel-enabled: true | |
sync: true | |
producer-properties: | |
acks: all | |
retry.backoff.ms: 200 | |
linger.ms: 100 | |
max.in.flight.requests.per.connection: 1 | |
enable.idempotence: true | |
retries: 3 | |
compression.type: snappy | |
request.timeout.ms: 5000 | |
key.serializer: org.apache.kafka.common.serialization.StringSerializer | |
consumer-properties: | |
session.timeout.ms: 20000 | |
max.poll.interval.ms: 350000 | |
enable.auto.commit: true | |
allow.auto.create.topics: true | |
auto.commit.interval.ms: 12000 | |
max.poll.records: 5 | |
isolation.level: read_committed | |
configuration: | |
auto.offset.reset: latest | |
bindings: | |
test-input: | |
# contentType: text/plain | |
destination: test.produce | |
group: test-kafka-consumer-2 | |
consumer: | |
maxAttempts: 3 | |
startOffset: latest | |
autoCommitOnError: true | |
queueBufferingMaxMessages: 100000 | |
autoCommitOffset: true | |
test-output: | |
# contentType: text/plain | |
destination: test.produce.another | |
group: test-produce-out | |
producer: | |
acks: all |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package app.java.kafka.poc.bindings; | |
import org.springframework.cloud.stream.annotation.Input; | |
import org.springframework.cloud.stream.annotation.Output; | |
import org.springframework.messaging.MessageChannel; | |
import org.springframework.messaging.SubscribableChannel; | |
public interface Bindings { | |
@Input("test-input") | |
SubscribableChannel testListener(); | |
@Output("test-output") | |
MessageChannel testProduce(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--- | |
version: '2' | |
services: | |
zookeeper: | |
hostname: zookeeper | |
container_name: zookeeper | |
image: confluentinc/cp-zookeeper:4.0.0 | |
network_mode: host | |
environment: | |
ZOOKEEPER_CLIENT_PORT: 22181 | |
extra_hosts: | |
- "moby:127.0.0.1" | |
kafka-1: | |
image: confluentinc/cp-kafka:4.0.0 | |
network_mode: host | |
hostname: localhost | |
container_name: kafka-1 | |
environment: | |
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" | |
KAFKA_MIN_INSYNC_REPLICAS: 1 | |
KAFKA_BROKER_ID: 1 | |
KAFKA_BROKER_RACK: rack-a | |
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 | |
KAFKA_NUM_PARTITIONS: 3 | |
KAFKA_DEFAULT_REPLICATION_FACTOR: 1 | |
KAFKA_OFFSET_STORAGE_REPLICATION_FACTOR: 1 | |
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 | |
KAFKA_NUM_REPLICA_FETCHERS: 1 | |
KAFKA_NUM_IO_THREADS: 8 | |
KAFKA_NUM_NETWORK_THREADS: 5 | |
KAFKA_AUTO_LEADER_REBALANCE_ENABLE: "true" | |
KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE: "true" | |
KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 102400 | |
KAFKA_SOCKET_SEND_BUFFER_BYTES: 102400 | |
KAFKA_ZOOKEEPER_CONNECT: localhost:22181 | |
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:19092 | |
KAFKA_TRANSACTIONAL_ID_EXPIRATION_MS: 3000 | |
extra_hosts: | |
- "moby:127.0.0.1" | |
kafka-2: | |
image: confluentinc/cp-kafka:4.0.0 | |
network_mode: host | |
hostname: localhost | |
container_name: kafka-2 | |
environment: | |
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" | |
KAFKA_MIN_INSYNC_REPLICAS: 1 | |
KAFKA_BROKER_ID: 2 | |
KAFKA_BROKER_RACK: rack-a | |
KAFKA_DEFAULT_REPLICATION_FACTOR: 1 | |
KAFKA_NUM_PARTITIONS: 3 | |
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 | |
KAFKA_OFFSET_STORAGE_REPLICATION_FACTOR: 1 | |
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 | |
KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 102400 | |
KAFKA_SOCKET_SEND_BUFFER_BYTES: 102400 | |
KAFKA_NUM_REPLICA_FETCHERS: 1 | |
KAFKA_NUM_IO_THREADS: 8 | |
KAFKA_NUM_NETWORK_THREADS: 5 | |
KAFKA_AUTO_LEADER_REBALANCE_ENABLE: "true" | |
KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE: "true" | |
KAFKA_ZOOKEEPER_CONNECT: localhost:22181 | |
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092 | |
KAFKA_TRANSACTIONAL_ID_EXPIRATION_MS: 3000 | |
extra_hosts: | |
- "moby:127.0.0.1" | |
kafka-3: | |
image: confluentinc/cp-kafka:4.0.0 | |
network_mode: host | |
hostname: localhost | |
container_name: kafka-3 | |
environment: | |
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" | |
KAFKA_MIN_INSYNC_REPLICAS: 1 | |
KAFKA_BROKER_ID: 3 | |
KAFKA_NUM_PARTITIONS: 3 | |
KAFKA_BROKER_RACK: rack-a | |
KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 102400 | |
KAFKA_SOCKET_SEND_BUFFER_BYTES: 102400 | |
KAFKA_DEFAULT_REPLICATION_FACTOR: 1 | |
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 | |
KAFKA_OFFSET_STORAGE_REPLICATION_FACTOR: 1 | |
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 | |
KAFKA_NUM_REPLICA_FETCHERS: 1 | |
KAFKA_NUM_IO_THREADS: 8 | |
KAFKA_NUM_NETWORK_THREADS: 5 | |
KAFKA_AUTO_LEADER_REBALANCE_ENABLE: "true" | |
KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE: "true" | |
KAFKA_ZOOKEEPER_CONNECT: localhost:22181 | |
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:39092 | |
KAFKA_TRANSACTIONAL_ID_EXPIRATION_MS: 3000 | |
extra_hosts: | |
- "moby:127.0.0.1" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<parent> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-parent</artifactId> | |
<version>2.2.4.RELEASE</version> | |
<relativePath/> <!-- lookup parent from repository --> | |
</parent> | |
<groupId>app.java.kafka</groupId> | |
<artifactId>poc</artifactId> | |
<version>0.0.1-SNAPSHOT</version> | |
<name>poc</name> | |
<description>Demo project for Spring Boot</description> | |
<properties> | |
<java.version>11</java.version> | |
</properties> | |
<dependencies> | |
<dependency> | |
<groupId>org.springframework.cloud</groupId> | |
<artifactId>spring-cloud-stream</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.cloud</groupId> | |
<artifactId>spring-cloud-stream-binder-kafka</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.kafka</groupId> | |
<artifactId>spring-kafka</artifactId> | |
<version>2.4.5.RELEASE</version> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-configuration-processor</artifactId> | |
<optional>true</optional> | |
</dependency> | |
<dependency> | |
<groupId>org.projectlombok</groupId> | |
<artifactId>lombok</artifactId> | |
<optional>true</optional> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-test</artifactId> | |
<scope>test</scope> | |
<exclusions> | |
<exclusion> | |
<groupId>org.junit.vintage</groupId> | |
<artifactId>junit-vintage-engine</artifactId> | |
</exclusion> | |
</exclusions> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.cloud</groupId> | |
<artifactId>spring-cloud-stream-test-support</artifactId> | |
<scope>test</scope> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.kafka</groupId> | |
<artifactId>spring-kafka-test</artifactId> | |
<scope>test</scope> | |
</dependency> | |
</dependencies> | |
<dependencyManagement> | |
<dependencies> | |
<dependency> | |
<groupId>org.springframework.cloud</groupId> | |
<artifactId>spring-cloud-dependencies</artifactId> | |
<version>Hoxton.RELEASE</version> | |
<type>pom</type> | |
<scope>import</scope> | |
</dependency> | |
</dependencies> | |
</dependencyManagement> | |
<build> | |
<plugins> | |
<plugin> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-maven-plugin</artifactId> | |
</plugin> | |
</plugins> | |
</build> | |
<repositories> | |
<repository> | |
<id>spring-milestones</id> | |
<name>Spring Milestones</name> | |
<url>https://repo.spring.io/milestone</url> | |
</repository> | |
</repositories> | |
</project> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment