Skip to content

Instantly share code, notes, and snippets.

@romanblanco
Last active June 6, 2020 11:31
Show Gist options
  • Save romanblanco/489dbb0673f31c553c8f674a0886b278 to your computer and use it in GitHub Desktop.
Save romanblanco/489dbb0673f31c553c8f674a0886b278 to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"time"
"github.com/Shopify/sarama"
"github.com/pote/philote-go"
)
func main() {
kafkaConfig := sarama.NewConfig()
kafkaConfig.Consumer.Return.Errors = true
kafkaConfig.Producer.RequiredAcks = sarama.WaitForAll
kafkaConfig.Producer.Retry.Max = 5
kafkaConfig.Producer.Return.Successes = true
// Specify brokers address. This is default one
brokers := []string{"localhost:9092"}
// create a new consumer
master, err := sarama.NewConsumer(brokers, kafkaConfig)
if err != nil {
panic(err)
}
fmt.Printf("kafka consumer created: %T\n", master)
defer func() {
if err := master.Close(); err != nil {
panic(err)
}
}()
// create a new producer
producer, err := sarama.NewSyncProducer(brokers, kafkaConfig)
if err != nil {
// Should not reach here
panic(err)
}
fmt.Printf("kafka producer created: %T\n", producer)
defer func() {
if err := producer.Close(); err != nil {
panic(err)
}
}()
// set a kafka topic to follow
topic := "test"
// produce a kafka message
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder("Something Cool"),
}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
panic(err)
}
fmt.Printf("kafka message produced: p: %v, o: %v\n", partition, offset)
// consume a kafka message
consumer, err := master.ConsumePartition(topic, 0, sarama.OffsetOldest)
if err != nil {
panic(err)
}
select {
case err := <-consumer.Errors():
fmt.Println(err)
case msg := <-consumer.Messages():
fmt.Println("kafka message consumed: ", string(msg.Key), string(msg.Value))
}
// create a websocket token
token, _ := philote.NewToken(
"roman",
[]string{"read-write-channel"},
[]string{"read-write-channel"},
)
fmt.Printf("philote token created: %T\n", token)
// create a websocket client
c1, _ := philote.NewClient("ws://localhost:6380", token)
fmt.Printf("philote client 1 created: %T\n", c1)
c2, _ := philote.NewClient("ws://localhost:6380", token)
fmt.Printf("philote client 2 created: %T\n", c2)
// publish a websocket message
go func() {
time.Sleep(2*time.Second)
c1.Publish(
&philote.Message{
Channel: "read-write-channel",
Data: "My message to the world",
})
if err != nil {
panic(err)
}
fmt.Println("c1: philote message sent")
}()
// receive a websocket message
message, err := c2.Receive()
if err != nil {
panic(err)
}
fmt.Printf("c2: philote message received: %v\n", message.Data)
}
@romanblanco
Copy link
Author

kafka consumer created: *sarama.consumer
kafka producer created: *sarama.syncProducer
philote token created: string
philote client 1 created: *philote.Client
philote client 2 created: *philote.Client
philote message sent
philote message received: My message to the world

@romanblanco
Copy link
Author

romanblanco commented Oct 1, 2019

zookeeper, kafka, philote:

kafka () {
  tmux has-session -t kafka &> /dev/null
  if [ $? -eq 0 ] ; then
    tmux -2 attach-session -t kafka -d
  else
    tmux -2 new-session -A -s kafka -n zookeeper -d 'cd ~/devel/kafka/kafka/ ; bin/zookeeper-server-start.s
h config/zookeeper.properties'
    tmux new-window  -t kafka:2 -n kafka 'cd ~/devel/kafka/kafka/ ; bin/kafka-server-start.sh config/server
.properties'
    tmux new-window  -t kafka:3 -n philote 'SECRET=roman LOGLEVEL=debug ~/bin/philote'
    tmux select-window -t kafka:2
    tmux -2 attach-session -t kafka
  fi
}
• ~/devel/go_kafka/ go run kafka_websocket.go
kafka consumer created: *sarama.consumer
kafka producer created: *sarama.syncProducer
kafka message produced: p: 0, o: 7
kafka message consumed:   Something Cool
philote token created: string
philote client 1 created: *philote.Client
philote client 2 created: *philote.Client
c1: philote message sent
c2: philote message received: My message to the world

@romanblanco
Copy link
Author

@romanblanco
Copy link
Author

cd ~/devel
mkdir kafka
cd kafka
wget http://mirror.cc.columbia.edu/pub/software/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz
extract kafka_2.12-2.5.0.tgz
mv kafka_2.12-2.5.0/ kafka/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment