Last active
January 8, 2022 15:21
-
-
Save LearningJournal/48e57ed088dbca117327a207c1704eb2 to your computer and use it in GitHub Desktop.
Kafka Tutorial Code Samples for Learning Journal Website
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
kafka-topics.sh --list --zookeeper localhost:2181 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
kafka-console-producer.sh --broker-list localhost:9092 --topic test |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
tar -zxvf kafka_2.11-0.10.1.0.tgz |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
cd kafka_2.11-0.10.1.0 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
bin/zookeeper-server-start.sh config/zookeeper.properties |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
bin/kafka-server-start.sh config/server.properties |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic MyFirstTopic1 --partitions 1 --replication-factor 1 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic MyFirstTopic1 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic MyFirstTopic1 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
cp config/server.properties config/server-1.properties | |
cp config/server.properties config/server-2.properties |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
bin/kafka-server-start.sh config/server-1.properties | |
bin/kafka-server-start.sh config/server-2.properties |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic TestTopicXYZ --partitions 2 --replication-factor 3 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.*; | |
import org.apache.kafka.clients.consumer.KafkaConsumer; | |
import org.apache.kafka.clients.consumer.ConsumerRecords; | |
import org.apache.kafka.clients.consumer.ConsumerRecord; | |
public class SupplierConsumer { | |
public static void main(String[] args) throws Exception { | |
String topicName = "SupplierTopic"; | |
String groupName = "SupplierTopicGroup"; | |
Properties props = new Properties(); | |
props.put("bootstrap.servers", "localhost:9092,localhost:9093"); | |
props.put("group.id", groupName); | |
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); | |
props.put("value.deserializer", "SupplierDeserializer"); | |
KafkaConsumer consumer = new KafkaConsumer<>(props); | |
consumer.subscribe(Arrays.asList(topicName)); | |
while (true) { | |
ConsumerRecords records = consumer.poll(100); | |
for (ConsumerRecord record : records) { | |
System.out.println("Supplier id= " + | |
String.valueOf(record.value().getID()) + | |
" Supplier Name = " + record.value().getName() + | |
" Supplier Start Date = " + | |
record.value().getStartDate().toString()); | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic TestTopicXYZ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sudo apt-get install openjdk-8-jdk |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sudo apt-get update | |
sudo apt-get install sbt |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name := "KafkaTest" | |
scalaVersion := "2.11.8" | |
libraryDependencies ++= Seq( | |
"org.apache.kafka" % "kafka-clients" % "1.0.0") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//File Name-SimpleProducer.java | |
import java.util.*; | |
import org.apache.kafka.clients.producer.*; | |
public class SimpleProducer { | |
public static void main(String[] args) throws Exception { | |
String topicName = "SimpleProducerTopic"; | |
String key = "Key1"; | |
String value = "Value-1"; | |
Properties props = new Properties(); | |
props.put("bootstrap.servers", "localhost:9092,localhost:9093"); | |
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
Producer<String, String> producer = new KafkaProducer<>(props); | |
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, value); | |
producer.send(record); | |
producer.close(); | |
System.out.println("SimpleProducer Completed."); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sbt compile |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic SimpleProducerTopic --from-beginning |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.*; | |
import org.apache.kafka.clients.producer.*; | |
public class SimpleProducer { | |
public static void main(String[] args) throws Exception { | |
String key = "Key1"; | |
String value = "Value-1"; | |
String topicName = "SimpleProducerTopic"; | |
Properties props = new Properties(); | |
props.put("bootstrap.servers", "localhost:9092,localhost:9093"); | |
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
Producer<String, String> producer = new KafkaProducer<>(props); | |
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, value); | |
producer.send(record); | |
producer.close(); | |
System.out.println("SimpleProducer Completed."); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name := "KafkaTest" | |
libraryDependencies ++= Seq( | |
"org.apache.kafka" % "kafka-clients" % "0.10.1.0" | |
exclude("javax.jms", "jms") | |
exclude("com.sun.jdmk", "jmxtools") | |
exclude("com.sun.jmx", "jmxri") | |
exclude("org.slf4j", "slf4j-simple") | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.*; | |
import org.apache.kafka.clients.producer.*; | |
public class SynchronousProducer { | |
public static void main(String[] args) throws Exception { | |
String topicName = "SynchronousProducerTopic"; | |
String key = "Key-1"; | |
String value = "Value-1"; | |
Properties props = new Properties(); | |
props.put("bootstrap.servers", "localhost:9092,localhost:9093"); | |
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
Producer<String, String> producer = new KafkaProducer<>(props); | |
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, value); | |
try { | |
RecordMetadata metadata = producer.send(record).get(); | |
System.out.println("Message is sent to Partition no " + metadata.partition() + " and offset " + metadata.offset()); | |
System.out.println("SynchronousProducer Completed with success."); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
System.out.println("SynchronousProducer failed with an exception"); | |
} finally { | |
producer.close(); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.*; | |
import org.apache.kafka.clients.producer.*; | |
public class AsynchronousProducer { | |
public static void main(String[] args) throws Exception { | |
String topicName = "AsynchronousProducerTopic"; | |
String key = "Key1"; | |
String value = "Value-1"; | |
Properties props = new Properties(); | |
props.put("bootstrap.servers", "localhost:9092,localhost:9093"); | |
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
Producer<String, String> producer = new KafkaProducer<>(props); | |
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, value); | |
producer.send(record, new MyProducerCallback()); | |
System.out.println("AsynchronousProducer call completed"); | |
producer.close(); | |
} | |
} | |
class MyProducerCallback implements Callback { | |
@Override | |
public void onCompletion(RecordMetadata recordMetadata, Exception e) { | |
if (e != null) | |
System.out.println("AsynchronousProducer failed with an exception"); | |
else | |
System.out.println("AsynchronousProducer call Success:"); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.*; | |
import org.apache.kafka.clients.producer.*; | |
public class SensorProducer { | |
public static void main(String[] args) throws Exception { | |
String topicName = "SensorTopic"; | |
Properties props = new Properties(); | |
props.put("bootstrap.servers", "localhost:9092,localhost:9093"); | |
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
props.put("partitioner.class", "SensorPartitioner"); | |
props.put("speed.sensor.name", "TSS"); | |
Producer<String, String> producer = new KafkaProducer<>(props); | |
for (int i = 0; i < 10; i++) | |
producer.send(new ProducerRecord<>(topicName, "SSP" + i, "500" + i)); | |
for (int i = 0; i < 10; i++) | |
producer.send(new ProducerRecord<>(topicName, "TSS", "500" + i)); | |
producer.close(); | |
System.out.println("SimpleProducer Completed."); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.*; | |
import org.apache.kafka.clients.producer.*; | |
import org.apache.kafka.common.*; | |
import org.apache.kafka.common.utils.*; | |
import org.apache.kafka.common.record.*; | |
public class SensorPartitioner implements Partitioner { | |
private String speedSensorName; | |
public void configure(Map<String, ?> configs) { | |
speedSensorName = configs.get("speed.sensor.name").toString(); | |
} | |
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { | |
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); | |
int numPartitions = partitions.size(); | |
int sp = (int) Math.abs(numPartitions * 0.3); | |
int p = 0; | |
if ((keyBytes == null) || (!(key instanceof String))) | |
throw new InvalidRecordException("All messages must have sensor name as key"); | |
if (((String) key).equals(speedSensorName)) | |
p = Utils.toPositive(Utils.murmur2(valueBytes)) % sp; | |
else | |
p = Utils.toPositive(Utils.murmur2(keyBytes)) % (numPartitions - sp) + sp; | |
System.out.println("Key = " + (String) key + " Partition = " + p); | |
return p; | |
} | |
public void close() { | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.Date; | |
public class Supplier { | |
private int supplierId; | |
private String supplierName; | |
private Date supplierStartDate; | |
public Supplier(int id, String name, Date dt) { | |
this.supplierId = id; | |
this.supplierName = name; | |
this.supplierStartDate = dt; | |
} | |
public int getID() { | |
return supplierId; | |
} | |
public String getName() { | |
return supplierName; | |
} | |
public Date getStartDate() { | |
return supplierStartDate; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.kafka.common.serialization.Serializer; | |
import org.apache.kafka.common.errors.SerializationException; | |
import java.io.UnsupportedEncodingException; | |
import java.util.Map; | |
import java.nio.ByteBuffer; | |
public class SupplierSerializer implements Serializer<Supplier> { | |
private String encoding = "UTF8"; | |
@Override | |
public void configure(Map<String, ?> configs, booleanisKey) { | |
// nothing to configure | |
} | |
@Override | |
public byte[] serialize(String topic, Supplier data) { | |
int sizeOfName; | |
int sizeOfDate; | |
byte[] serializedName; | |
byte[] serializedDate; | |
try { | |
if (data == null) | |
return null; | |
serializedName = data.getName().getBytes(encoding); | |
sizeOfName = serializedName.length; | |
serializedDate = data.getStartDate().toString().getBytes(encoding); | |
sizeOfDate = serializedDate.length; | |
ByteBuffer buf = ByteBuffer.allocate(4 + 4 + sizeOfName + 4 + sizeOfDate); | |
buf.putInt(data.getID()); | |
buf.putInt(sizeOfName); | |
buf.put(serializedName); | |
buf.putInt(sizeOfDate); | |
buf.put(serializedDate); | |
return buf.array(); | |
} catch (Exception e) { | |
throw new SerializationException("Error when serializing Supplier to byte[]"); | |
} | |
} | |
@Override | |
public void close() { | |
// nothing to do | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.nio.ByteBuffer; | |
import java.util.Date; | |
import java.text.DateFormat; | |
import java.text.SimpleDateFormat; | |
import org.apache.kafka.common.errors.SerializationException; | |
import org.apache.kafka.common.serialization.Deserializer; | |
import java.io.UnsupportedEncodingException; | |
import java.util.Map; | |
public class SupplierDeserializer implements Deserializer<Supplier> { | |
private String encoding = "UTF8"; | |
@Override | |
public void configure(Map<String, ?> configs, boolean isKey) { | |
//Nothing to configure | |
} | |
@Override | |
public Supplier deserialize(String topic, byte[] data) { | |
try { | |
if (data == null) { | |
System.out.println("Null recieved at deserialize"); | |
return null; | |
} | |
ByteBuffer buf = ByteBuffer.wrap(data); | |
int id = buf.getInt(); | |
int sizeOfName = buf.getInt(); | |
byte[] nameBytes = new byte[sizeOfName]; | |
buf.get(nameBytes); | |
String deserializedName = new String(nameBytes, encoding); | |
int sizeOfDate = buf.getInt(); | |
byte[] dateBytes = new byte[sizeOfDate]; | |
buf.get(dateBytes); | |
String dateString = new String(dateBytes, encoding); | |
DateFormat df = new SimpleDateFormat("EEE MMM ddHH:mm:ss Z yyyy"); | |
return new Supplier(id, deserializedName, df.parse(dateString)); | |
} catch (Exception e) { | |
throw new SerializationException("Error when deserializing byte[] to Supplier"); | |
} | |
} | |
@Override | |
public void close() { | |
// nothing to do | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.*; | |
import java.text.DateFormat; | |
import java.text.SimpleDateFormat; | |
import org.apache.kafka.clients.producer.*; | |
public class SupplierProducer { | |
public static void main(String[] args) throws Exception { | |
String topicName = "SupplierTopic"; | |
Properties props = new Properties(); | |
props.put("bootstrap.servers", "localhost:9092,localhost:9093"); | |
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
props.put("value.serializer", "SupplierSerializer"); | |
Producer<String, Supplier> producer = new KafkaProducer<>(props); | |
DateFormat df = new SimpleDateFormat("yyyy-MM-dd"); | |
Supplier sp1 = new Supplier(101, "Xyz Pvt Ltd.", df.parse("2016-04-01")); | |
Supplier sp2 = new Supplier(102, "Abc Pvt Ltd.", df.parse("2012-01-01")); | |
producer.send(new ProducerRecord<String, Supplier>(topicName, "SUP", sp1)).get(); | |
producer.send(new ProducerRecord<String, Supplier>(topicName, "SUP", sp2)).get(); | |
System.out.println("SupplierProducer Completed."); | |
producer.close(); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.*; | |
import org.apache.kafka.clients.consumer.KafkaConsumer; | |
import org.apache.kafka.clients.consumer.ConsumerRecords; | |
import org.apache.kafka.clients.consumer.ConsumerRecord; | |
public class SupplierConsumer { | |
public static void main(String[] args) throws Exception { | |
String topicName = "SupplierTopic"; | |
String groupName = "SupplierTopicGroup"; | |
Properties props = new Properties(); | |
props.put("bootstrap.servers", "localhost:9092,localhost:9093"); | |
props.put("group.id", groupName); | |
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); | |
props.put("value.deserializer", "SupplierDeserializer"); | |
KafkaConsumer<String, Supplier> consumer = new KafkaConsumer<>(props); | |
consumer.subscribe(Arrays.asList(topicName)); | |
while (true) { | |
ConsumerRecords<String, Supplier> records = consumer.poll(100); | |
for (ConsumerRecord<String, Supplier> record : records) { | |
System.out.println("Supplier id= " + String.valueOf(record.value().getID()) + " Supplier Name = " + record.value().getName() + " Supplier Start Date = " + record.value().getStartDate().toString()); | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name := "KafkaTest" | |
libraryDependencies ++= Seq( | |
"org.apache.kafka" % "kafka-clients" % "0.10.1.0" | |
exclude("javax.jms", "jms") | |
exclude("com.sun.jdmk", "jmxtools") | |
exclude("com.sun.jmx", "jmxri") | |
exclude("org.slf4j", "slf4j-simple") | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.*; | |
import org.apache.kafka.clients.consumer.KafkaConsumer; | |
import org.apache.kafka.clients.consumer.ConsumerRecords; | |
import org.apache.kafka.clients.consumer.ConsumerRecord; | |
public class SupplierConsumer { | |
public static void main(String[] args) throws Exception { | |
String topicName = "SupplierTopic"; | |
String groupName = "SupplierTopicGroup"; | |
Properties props = newProperties(); | |
props.put("bootstrap.servers", "localhost:9092,localhost:9093"); | |
props.put("group.id", groupName); | |
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); | |
props.put("value.deserializer", "SupplierDeserializer"); | |
KafkaConsumer consumer = newKafkaConsumer <>(props); | |
consumer.subscribe(Arrays.asList(topicName)); | |
while (true) { | |
ConsumerRecords records = consumer.poll(100); | |
for (ConsumerRecordrecord: | |
records) { | |
System.out.println("Supplier id= " + | |
String.valueOf(record.value().getID()) + | |
" Supplier Name = " + record.value().getName() + | |
" Supplier Start Date = " + | |
record.value().getStartDate().toString()); | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
bootstrap.servers=localhost:9092,localhost:9093 | |
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer | |
value.deserializer=SupplierDeserializer | |
group.id=SupplierTopicGroup |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.*; | |
import java.io.*; | |
import org.apache.kafka.clients.consumer.KafkaConsumer; | |
import org.apache.kafka.clients.consumer.ConsumerRecords; | |
import org.apache.kafka.clients.consumer.ConsumerRecord; | |
public class NewSupplierConsumer { | |
public static void main(String[] args) throws Exception { | |
String topicName = "SupplierTopic"; | |
String groupName = "SupplierTopicGroup"; | |
Properties props = new Properties(); | |
//props.put("bootstrap.servers", "localhost:9092,localhost:9093"); | |
//props.put("group.id", groupName); | |
//props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); | |
//props.put("value.deserializer", "SupplierDeserializer"); | |
InputStream input = null; | |
KafkaConsumer<String, Supplier> consumer = null; | |
try { | |
input = new FileInputStream("SupplierConsumer.properties"); | |
props.load(input); | |
consumer = new KafkaConsumer<>(props); | |
consumer.subscribe(Arrays.asList(topicName)); | |
while (true) { | |
ConsumerRecords<String, Supplier> records = consumer.poll(100); | |
for (ConsumerRecord<String, Supplier> record : records) { | |
System.out.println("Supplier id= " + String.valueOf(record.value().getID()) + | |
" Supplier Name = " + record.value().getName() + | |
" Supplier Start Date = " + record.value().getStartDate().toString()); | |
} | |
} | |
} catch (Exception ex) { | |
ex.printStackTrace(); | |
} finally { | |
input.close(); | |
consumer.close(); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.*; | |
import java.io.*; | |
import org.apache.kafka.clients.consumer.KafkaConsumer; | |
import org.apache.kafka.clients.consumer.ConsumerRecords; | |
import org.apache.kafka.clients.consumer.ConsumerRecord; | |
public class ManualConsumer { | |
public static void main(String[] args) throws Exception { | |
String topicName = "SupplierTopic"; | |
String groupName = "SupplierTopicGroup"; | |
Properties props = new Properties(); | |
props.put("bootstrap.servers", "localhost:9092,localhost:9093"); | |
props.put("group.id", groupName); | |
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); | |
props.put("value.deserializer", "SupplierDeserializer"); | |
props.put("enable.auto.commit", "false"); | |
KafkaConsumer<String, Supplier> consumer = null; | |
try { | |
consumer = new KafkaConsumer<>(props); | |
consumer.subscribe(Arrays.asList(topicName)); | |
while (true) { | |
ConsumerRecords<String, Supplier> records = consumer.poll(100); | |
for (ConsumerRecord<String, Supplier> record : records) { | |
System.out.println("Supplier id= " + String.valueOf(record.value().getID()) + | |
" Supplier Name = " + record.value().getName() + | |
" Supplier Start Date = " + record.value().getStartDate().toString()); | |
} | |
consumer.commitAsync(); | |
} | |
} catch (Exception ex) { | |
ex.printStackTrace(); | |
} finally { | |
consumer.commitSync(); | |
consumer.close(); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.*; | |
import org.apache.kafka.clients.producer.*; | |
public class RandomProducer { | |
public static void main(String[] args) throws InterruptedException { | |
String topicName = "RandomProducerTopic"; | |
String msg; | |
Properties props = new Properties(); | |
props.put("bootstrap.servers", "localhost:9092,localhost:9093"); | |
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
Producer<String, String> producer = new KafkaProducer<>(props); | |
Random rg = new Random(); | |
Calendar dt = Calendar.getInstance(); | |
dt.set(2016, 1, 1); | |
try { | |
while (true) { | |
for (int i = 0; i < 100; i++) { | |
msg = dt.get(Calendar.YEAR) + "-" + | |
dt.get(Calendar.MONTH) + "-" + | |
dt.get(Calendar.DATE) + "," + | |
rg.nextInt(1000); | |
producer.send(new ProducerRecord<String, String>(topicName, 0, "Key", msg)).get(); | |
msg = dt.get(Calendar.YEAR) + "-" + | |
dt.get(Calendar.MONTH) + "-" + | |
dt.get(Calendar.DATE) + "," + | |
rg.nextInt(1000); | |
producer.send(new ProducerRecord<String, String>(topicName, 1, "Key", msg)).get(); | |
} | |
dt.add(Calendar.DATE, 1); | |
System.out.println("Data Sent for " + | |
dt.get(Calendar.YEAR) + "-" + | |
dt.get(Calendar.MONTH) + "-" + | |
dt.get(Calendar.DATE)); | |
} | |
} catch (Exception ex) { | |
System.out.println("Intrupted"); | |
} finally { | |
producer.close(); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.*; | |
import org.apache.kafka.clients.consumer.*; | |
import org.apache.kafka.common.*; | |
public class RandomConsumer { | |
public static void main(String[] args) throws Exception { | |
String topicName = "RandomProducerTopic"; | |
KafkaConsumer<String, String> consumer = null; | |
String groupName = "RG"; | |
Properties props = new Properties(); | |
props.put("bootstrap.servers", "localhost:9092,localhost:9093"); | |
props.put("group.id", groupName); | |
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); | |
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); | |
props.put("enable.auto.commit", "false"); | |
consumer = new KafkaConsumer<>(props); | |
RebalanceListnerrebalanceListner = new RebalanceListner(consumer); | |
consumer.subscribe(Arrays.asList(topicName), rebalanceListner); | |
try { | |
while (true) { | |
ConsumerRecords<String, String> records = consumer.poll(100); | |
for (ConsumerRecord<String, String> record : records) { | |
/*System.out.println("Topic:"+ record.topic() + | |
" Partition:" + record.partition() + | |
" Offset:" + record.offset() + " Value:"+ record.value());*/ | |
// Do some processing and save it to Database | |
rebalanceListner.addOffset(record.topic(), record.partition(), record.offset()); | |
} | |
//consumer.commitSync(rebalanceListner.getCurrentOffsets()); | |
} | |
} catch (Exception ex) { | |
System.out.println("Exception."); | |
ex.printStackTrace(); | |
} finally { | |
consumer.close(); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.*; | |
import org.apache.kafka.clients.consumer.*; | |
import org.apache.kafka.common.*; | |
public class RebalanceListner implements ConsumerRebalanceListener { | |
private KafkaConsumer consumer; | |
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap(); | |
public RebalanceListner(KafkaConsumer con) { | |
this.consumer = con; | |
} | |
public void addOffset(String topic, int partition, long offset) { | |
currentOffsets.put(new TopicPartition(topic, partition), new OffsetAndMetadata(offset, "Commit")); | |
} | |
public Map<TopicPartition, OffsetAndMetadata> getCurrentOffsets() { | |
return currentOffsets; | |
} | |
public void onPartitionsAssigned(Collection<TopicPartition> partitions) { | |
System.out.println("Following Partitions Assigned ...."); | |
for (TopicPartition partition : partitions) | |
System.out.println(partition.partition() + ","); | |
} | |
public void onPartitionsRevoked(Collection<TopicPartition> partitions) { | |
System.out.println("Following Partitions Revoked ...."); | |
for (TopicPartition partition : partitions) | |
System.out.println(partition.partition() + ","); | |
System.out.println("Following Partitions commited ...."); | |
for (TopicPartition tp : currentOffsets.keySet()) | |
System.out.println(tp.partition()); | |
consumer.commitSync(currentOffsets); | |
currentOffsets.clear(); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//Code from random consumer example | |
while(true){ | |
ConsumerRecords<String, String> records=consumer.poll(100); | |
for(ConsumerRecord<String, String>record:records){ | |
/*System.out.println("Topic:"+ record.topic() + | |
" Partition:" + record.partition() + | |
" Offset:" + record.offset() + | |
" Value:"+ record.value());*/ | |
//Step - 1 | |
// Do some processing and save it to Database | |
rebalanceListner.addOffset(record.topic(),record.partition(),record.offset()); | |
} | |
//Step - 2 | |
//consumer.commitSync(rebalanceListner.getCurrentOffsets()); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
yum install mysql-server |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
service mysqld start |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
mysql_secure_installation |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
source tss.sql |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
create database test; | |
use test; | |
create table tss_data(skey varchar(50), svalue varchar(50)); | |
create table tss_offsets(topic_name varchar(50),partition int, offset int); | |
insert into tss_offsets values('SensorTopic1',0,0); | |
insert into tss_offsets values('SensorTopic1',1,0); | |
insert into tss_offsets values('SensorTopic1',2,0); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.*; | |
import org.apache.kafka.clients.consumer.*; | |
import org.apache.kafka.common.*; | |
import java.sql.*; | |
public class SensorConsumer { | |
public static void main(String[] args) throws Exception { | |
String topicName = "SensorTopic"; | |
KafkaConsumer<String, String> consumer = null; | |
int rCount; | |
Properties props = new Properties(); | |
props.put("bootstrap.servers", "localhost:9092,localhost:9093"); | |
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); | |
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); | |
props.put("enable.auto.commit", "false"); | |
consumer = new KafkaConsumer<>(props); | |
TopicPartition p0 = new TopicPartition(topicName, 0); | |
TopicPartition p1 = new TopicPartition(topicName, 1); | |
TopicPartition p2 = new TopicPartition(topicName, 2); | |
consumer.assign(Arrays.asList(p0, p1, p2)); | |
System.out.println("Current position p0=" + consumer.position(p0) | |
+ " p1=" + consumer.position(p1) | |
+ " p2=" + consumer.position(p2)); | |
consumer.seek(p0, getOffsetFromDB(p0)); | |
consumer.seek(p1, getOffsetFromDB(p1)); | |
consumer.seek(p2, getOffsetFromDB(p2)); | |
System.out.println("New positions po=" + consumer.position(p0) | |
+ " p1=" + consumer.position(p1) | |
+ " p2=" + consumer.position(p2)); | |
System.out.println("Start Fetching Now"); | |
try { | |
do { | |
ConsumerRecords<String, String> records = consumer.poll(1000); | |
System.out.println("Record polled " + records.count()); | |
rCount = records.count(); | |
for (ConsumerRecord<String, String> record : records) { | |
saveAndCommit(consumer, record); | |
} | |
} while (rCount > 0); | |
} catch (Exception ex) { | |
System.out.println("Exception in main."); | |
} finally { | |
consumer.close(); | |
} | |
} | |
private static long getOffsetFromDB(TopicPartition p) { | |
long offset = 0; | |
try { | |
Class.forName("com.mysql.jdbc.Driver"); | |
Connection con = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "pandey"); | |
String sql = "select offset from tss_offsets where topic_name='" | |
+ p.topic() + "' and partition=" + p.partition(); | |
Statement stmt = con.createStatement(); | |
ResultSet rs = stmt.executeQuery(sql); | |
if (rs.next()) | |
offset = rs.getInt("offset"); | |
stmt.close(); | |
con.close(); | |
} catch (Exception e) { | |
System.out.println("Exception in getOffsetFromDB"); | |
} | |
return offset; | |
} | |
private static void saveAndCommit(KafkaConsumer<String, String> c, ConsumerRecord<String, String> r) { | |
System.out.println("Topic=" + r.topic() + " Partition=" + r.partition() + " Offset=" + r.offset() | |
+ " Key=" + r.key() + " Value=" + r.value()); | |
try { | |
Class.forName("com.mysql.jdbc.Driver"); | |
Connection con = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "pandey"); | |
con.setAutoCommit(false); | |
String insertSQL = "insert into tss_data values(?,?)"; | |
PreparedStatement psInsert = con.prepareStatement(insertSQL); | |
psInsert.setString(1, r.key()); | |
psInsert.setString(2, r.value()); | |
String updateSQL = "update tss_offsets set offset=? where topic_name=? and partition=?"; | |
PreparedStatement psUpdate = con.prepareStatement(updateSQL); | |
psUpdate.setLong(1, r.offset() + 1); | |
psUpdate.setString(2, r.topic()); | |
psUpdate.setInt(3, r.partition()); | |
psInsert.executeUpdate(); | |
psUpdate.executeUpdate(); | |
con.commit(); | |
con.close(); | |
} catch (Exception e) { | |
System.out.println("Exception in saveAndCommit"); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ "type": "record", | |
"name": "ClickRecord", | |
"fields": [ | |
{"name": "session_id", "type": "string"}, | |
{"name": "browser", "type": ["string", "null"]}, | |
{"name": "campaign", "type": ["string", "null"]}, | |
{"name": "channel", "type": "string"}, | |
{"name": "referrer", "type": ["string", "null"], "default": "None"}, | |
{"name": "ip", "type": ["string", "null"]} | |
] | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
java -jar avro-tools-1.8.1.jar compile schema ClickRecordV1.avsc |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.*; | |
import org.apache.kafka.clients.producer.*; | |
public class AvroProducer { | |
public static void main(String[] args) throws Exception { | |
String topicName = "AvroClicks"; | |
String msg; | |
Properties props = new Properties(); | |
props.put("bootstrap.servers", "localhost:9092,localhost:9093"); | |
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); | |
props.put("schema.registry.url", "http://localhost:8081"); | |
Producer<String, ClickRecord> producer = new KafkaProducer<>(props); | |
ClickRecord cr = new ClickRecord(); | |
try { | |
cr.setSessionId("10001"); | |
cr.setChannel("HomePage"); | |
cr.setIp("192.168.0.1"); | |
producer.send(new ProducerRecord<String, ClickRecord>(topicName, cr.getSessionId().toString(), cr)).get(); | |
System.out.println("Complete"); | |
} catch (Exception ex) { | |
ex.printStackTrace(System.out); | |
} finally { | |
producer.close(); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.*; | |
import org.apache.kafka.clients.consumer.*; | |
public class AvroConsumer { | |
public static void main(String[] args) throws Exception { | |
String topicName = "AvroClicks"; | |
String groupName = "RG"; | |
Properties props = new Properties(); | |
props.put("bootstrap.servers", "localhost:9092,localhost:9093"); | |
props.put("group.id", groupName); | |
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); | |
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer"); | |
props.put("schema.registry.url", "http://localhost:8081"); | |
props.put("specific.avro.reader", "true"); | |
KafkaConsumer<String, ClickRecord> consumer = new KafkaConsumer<>(props); | |
consumer.subscribe(Arrays.asList(topicName)); | |
try { | |
while (true) { | |
ConsumerRecords<String, ClickRecord> records = consumer.poll(100); | |
for (ConsumerRecord<String, ClickRecord> record : records) { | |
System.out.println("Session id=" + record.value().getSessionId() | |
+ " Channel=" + record.value().getChannel() | |
+ " Referrer=" + record.value().getReferrer()); | |
} | |
} | |
} catch (Exception ex) { | |
ex.printStackTrace(); | |
} finally { | |
consumer.close(); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name := "AvroTest" | |
val repositories = Seq( | |
"confluent" at "http://packages.confluent.io/maven/", | |
Resolver.sonatypeRepo("public") | |
) | |
libraryDependencies ++= Seq( | |
"org.apache.avro" % "avro" % "1.8.1", | |
"io.confluent" % "kafka-avro-serializer" % "3.1.1", | |
"org.apache.kafka" % "kafka-clients" % "0.10.1.0" | |
exclude("javax.jms", "jms") | |
exclude("com.sun.jdmk", "jmxtools") | |
exclude("com.sun.jmx", "jmxri") | |
exclude("org.slf4j", "slf4j-simple") | |
) | |
resolvers += "confluent" at "http://packages.confluent.io/maven/" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{"type": "record", | |
"name": "ClickRecord", | |
"fields": [ | |
{"name": "session_id", "type": "string"}, | |
{"name": "browser", "type": ["string", "null"]}, | |
{"name": "campaign", "type": ["string", "null"]}, | |
{"name": "channel", "type": "string"}, | |
{"name": "entry_url", "type": ["string", "null"], "default": "None"}, | |
{"name": "ip", "type": ["string", "null"]}, | |
{"name": "language", "type": ["string", "null"], "default": "None"}, | |
{"name": "os", "type": ["string", "null"],"default": "None"} | |
] | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
java -jar avro-tools-1.8.1.jar compile schema ClickRecordV2.avsc |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.*; | |
import org.apache.kafka.clients.producer.*; | |
public class ClickRecordProducerV2 { | |
public static void main(String[] args) throws Exception { | |
String topicName = "AvroClicks"; | |
String msg; | |
Properties props = new Properties(); | |
props.put("bootstrap.servers", "localhost:9092,localhost:9093"); | |
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); | |
props.put("schema.registry.url", "http://localhost:8081"); | |
Producer<String, ClickRecord> producer = new KafkaProducer<>(props); | |
ClickRecord cr = new ClickRecord(); | |
try { | |
cr.setSessionId("10001"); | |
cr.setChannel("HomePage"); | |
cr.setIp("192.168.0.1"); | |
cr.setLanguage("Spanish"); | |
cr.setOs("Mac"); | |
cr.setEntryUrl("http://facebook.com/myadd"); | |
producer.send(new ProducerRecord<String, ClickRecord>(topicName, cr.getSessionId().toString(), cr)).get(); | |
System.out.println("Complete"); | |
} catch (Exception ex) { | |
ex.printStackTrace(System.out); | |
} finally { | |
producer.close(); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.*; | |
import org.apache.kafka.clients.consumer.*; | |
public class ClickRecordConsumerV2 { | |
public static void main(String[] args) throws Exception { | |
String topicName = "AvroClicks"; | |
String groupName = "RG"; | |
Properties props = new Properties(); | |
props.put("bootstrap.servers", "localhost:9092,localhost:9093"); | |
props.put("group.id", groupName); | |
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); | |
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer"); | |
props.put("schema.registry.url", "http://localhost:8081"); | |
props.put("specific.avro.reader", "true"); | |
KafkaConsumer<String, ClickRecord> consumer = new KafkaConsumer<>(props); | |
consumer.subscribe(Arrays.asList(topicName)); | |
try { | |
while (true) { | |
ConsumerRecords<String, ClickRecord> records = consumer.poll(100); | |
for (ConsumerRecord<String, ClickRecord> record : records) { | |
System.out.println("Session id=" + record.value().getSessionId() | |
+ " Channel=" + record.value().getChannel() | |
+ " Entry URL=" + record.value().getEntryUrl() | |
+ " Language=" + record.value().getLanguage()); | |
} | |
} | |
} catch (Exception ex) { | |
ex.printStackTrace(); | |
} finally { | |
consumer.close(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment