Skip to content

Instantly share code, notes, and snippets.

@ervitis
Last active April 8, 2020 14:21
Show Gist options
  • Save ervitis/13935dc07573ff360221bcb3fcfd8f44 to your computer and use it in GitHub Desktop.
Save ervitis/13935dc07573ff360221bcb3fcfd8f44 to your computer and use it in GitHub Desktop.
UnkownProducerIdException
package app.java.kafka.poc;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.UnknownProducerIdException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binding.BinderAwareChannelResolver;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import app.java.kafka.poc.bindings.Bindings;
@SpringBootApplication
@EnableBinding(Bindings.class)
public class PocApplication {
private static final Logger log = LoggerFactory.getLogger(PocApplication.class);
public static void main(String[] args) {
SpringApplication.run(PocApplication.class, args);
}
@Autowired
private BinderAwareChannelResolver binderAwareChannelResolver;
@StreamListener("test-input")
public void listen(Message<?> in, String headerKey) {
final MessageBuilder builder;
MessageChannel messageChannel;
messageChannel = this.binderAwareChannelResolver.resolveDestination("test-output");
Object payload = in.getPayload();
builder = MessageBuilder.withPayload(payload);
try {
log.info("Event received: {}", in);
if (!messageChannel.send(builder.build())) {
log.error("Something happend trying send the message! {}", in.getPayload());
}
} catch (UnknownProducerIdException e) {
log.error("UnkownProducerIdException catched ", e);
System.out.println(e);
messageChannel.send(builder.build());
} catch (KafkaException e) {
log.error("KafkaException catched ", e);
System.out.println(e);
messageChannel.send(builder.build());
}catch (Exception e) {
System.out.println("Commit failed " + e.getMessage());
}
}
}
server:
port: 8086
spring:
cloud:
stream:
kafka:
binder:
auto-create-topics: true
brokers: ${KAFKA_HOST:`kafka-1:19092,kafka-2:29092,kafka-3:39092`}
transaction:
producer:
error-channel-enabled: true
sync: true
producer-properties:
acks: all
retry.backoff.ms: 200
linger.ms: 100
max.in.flight.requests.per.connection: 1
enable.idempotence: true
retries: 3
compression.type: snappy
request.timeout.ms: 5000
key.serializer: org.apache.kafka.common.serialization.StringSerializer
consumer-properties:
session.timeout.ms: 20000
max.poll.interval.ms: 350000
enable.auto.commit: true
allow.auto.create.topics: true
auto.commit.interval.ms: 12000
max.poll.records: 5
isolation.level: read_committed
configuration:
auto.offset.reset: latest
bindings:
test-input:
# contentType: text/plain
destination: test.produce
group: test-kafka-consumer-2
consumer:
maxAttempts: 3
startOffset: latest
autoCommitOnError: true
queueBufferingMaxMessages: 100000
autoCommitOffset: true
test-output:
# contentType: text/plain
destination: test.produce.another
group: test-produce-out
producer:
acks: all
package app.java.kafka.poc.bindings;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface Bindings {
@Input("test-input")
SubscribableChannel testListener();
@Output("test-output")
MessageChannel testProduce();
}
---
version: '2'
services:
zookeeper:
hostname: zookeeper
container_name: zookeeper
image: confluentinc/cp-zookeeper:4.0.0
network_mode: host
environment:
ZOOKEEPER_CLIENT_PORT: 22181
extra_hosts:
- "moby:127.0.0.1"
kafka-1:
image: confluentinc/cp-kafka:4.0.0
network_mode: host
hostname: localhost
container_name: kafka-1
environment:
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_MIN_INSYNC_REPLICAS: 1
KAFKA_BROKER_ID: 1
KAFKA_BROKER_RACK: rack-a
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
KAFKA_OFFSET_STORAGE_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_NUM_REPLICA_FETCHERS: 1
KAFKA_NUM_IO_THREADS: 8
KAFKA_NUM_NETWORK_THREADS: 5
KAFKA_AUTO_LEADER_REBALANCE_ENABLE: "true"
KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE: "true"
KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 102400
KAFKA_SOCKET_SEND_BUFFER_BYTES: 102400
KAFKA_ZOOKEEPER_CONNECT: localhost:22181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:19092
KAFKA_TRANSACTIONAL_ID_EXPIRATION_MS: 3000
extra_hosts:
- "moby:127.0.0.1"
kafka-2:
image: confluentinc/cp-kafka:4.0.0
network_mode: host
hostname: localhost
container_name: kafka-2
environment:
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_MIN_INSYNC_REPLICAS: 1
KAFKA_BROKER_ID: 2
KAFKA_BROKER_RACK: rack-a
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
KAFKA_NUM_PARTITIONS: 3
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_OFFSET_STORAGE_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 102400
KAFKA_SOCKET_SEND_BUFFER_BYTES: 102400
KAFKA_NUM_REPLICA_FETCHERS: 1
KAFKA_NUM_IO_THREADS: 8
KAFKA_NUM_NETWORK_THREADS: 5
KAFKA_AUTO_LEADER_REBALANCE_ENABLE: "true"
KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE: "true"
KAFKA_ZOOKEEPER_CONNECT: localhost:22181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092
KAFKA_TRANSACTIONAL_ID_EXPIRATION_MS: 3000
extra_hosts:
- "moby:127.0.0.1"
kafka-3:
image: confluentinc/cp-kafka:4.0.0
network_mode: host
hostname: localhost
container_name: kafka-3
environment:
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_MIN_INSYNC_REPLICAS: 1
KAFKA_BROKER_ID: 3
KAFKA_NUM_PARTITIONS: 3
KAFKA_BROKER_RACK: rack-a
KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 102400
KAFKA_SOCKET_SEND_BUFFER_BYTES: 102400
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_OFFSET_STORAGE_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_NUM_REPLICA_FETCHERS: 1
KAFKA_NUM_IO_THREADS: 8
KAFKA_NUM_NETWORK_THREADS: 5
KAFKA_AUTO_LEADER_REBALANCE_ENABLE: "true"
KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE: "true"
KAFKA_ZOOKEEPER_CONNECT: localhost:22181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:39092
KAFKA_TRANSACTIONAL_ID_EXPIRATION_MS: 3000
extra_hosts:
- "moby:127.0.0.1"
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>app.java.kafka</groupId>
<artifactId>poc</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>poc</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.4.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Hoxton.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
</repository>
</repositories>
</project>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment