Skip to content

Instantly share code, notes, and snippets.

@romanblanco
Last active June 6, 2020 11:31
Show Gist options
  • Save romanblanco/489dbb0673f31c553c8f674a0886b278 to your computer and use it in GitHub Desktop.
Save romanblanco/489dbb0673f31c553c8f674a0886b278 to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"time"
"github.com/Shopify/sarama"
"github.com/pote/philote-go"
)
func main() {
kafkaConfig := sarama.NewConfig()
kafkaConfig.Consumer.Return.Errors = true
kafkaConfig.Producer.RequiredAcks = sarama.WaitForAll
kafkaConfig.Producer.Retry.Max = 5
kafkaConfig.Producer.Return.Successes = true
// Specify brokers address. This is default one
brokers := []string{"localhost:9092"}
// create a new consumer
master, err := sarama.NewConsumer(brokers, kafkaConfig)
if err != nil {
panic(err)
}
fmt.Printf("kafka consumer created: %T\n", master)
defer func() {
if err := master.Close(); err != nil {
panic(err)
}
}()
// create a new producer
producer, err := sarama.NewSyncProducer(brokers, kafkaConfig)
if err != nil {
// Should not reach here
panic(err)
}
fmt.Printf("kafka producer created: %T\n", producer)
defer func() {
if err := producer.Close(); err != nil {
panic(err)
}
}()
// set a kafka topic to follow
topic := "test"
// produce a kafka message
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder("Something Cool"),
}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
panic(err)
}
fmt.Printf("kafka message produced: p: %v, o: %v\n", partition, offset)
// consume a kafka message
consumer, err := master.ConsumePartition(topic, 0, sarama.OffsetOldest)
if err != nil {
panic(err)
}
select {
case err := <-consumer.Errors():
fmt.Println(err)
case msg := <-consumer.Messages():
fmt.Println("kafka message consumed: ", string(msg.Key), string(msg.Value))
}
// create a websocket token
token, _ := philote.NewToken(
"roman",
[]string{"read-write-channel"},
[]string{"read-write-channel"},
)
fmt.Printf("philote token created: %T\n", token)
// create a websocket client
c1, _ := philote.NewClient("ws://localhost:6380", token)
fmt.Printf("philote client 1 created: %T\n", c1)
c2, _ := philote.NewClient("ws://localhost:6380", token)
fmt.Printf("philote client 2 created: %T\n", c2)
// publish a websocket message
go func() {
time.Sleep(2*time.Second)
c1.Publish(
&philote.Message{
Channel: "read-write-channel",
Data: "My message to the world",
})
if err != nil {
panic(err)
}
fmt.Println("c1: philote message sent")
}()
// receive a websocket message
message, err := c2.Receive()
if err != nil {
panic(err)
}
fmt.Printf("c2: philote message received: %v\n", message.Data)
}
@romanblanco
Copy link
Author

panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x80 pc=0x7eb6c6]

goroutine 1 [running]:
github.com/gorilla/websocket.(*Conn).beginMessage(0x0, 0xc0001b21e0, 0x1, 0xc0000fdd78, 0x4172e6)
	/home/rblanco/go/src/github.com/gorilla/websocket/conn.go:472 +0x26
github.com/gorilla/websocket.(*Conn).NextWriter(0x0, 0x1, 0x18, 0x20, 0xc0001b60e0, 0x993020)
	/home/rblanco/go/src/github.com/gorilla/websocket/conn.go:513 +0x53
github.com/gorilla/websocket.(*Conn).WriteJSON(0x0, 0x8d4640, 0xc0001b60e0, 0x2, 0x2)
	/home/rblanco/go/src/github.com/gorilla/websocket/json.go:24 +0x49
github.com/pote/philote-go.(*Client).Publish(...)
	/home/rblanco/go/src/github.com/pote/philote-go/client.go:37
main.main()
	/home/rblanco/devel/go_kafka/kafka_websocket.go:60 +0x52e
exit status 2

@romanblanco
Copy link
Author

romanblanco commented Oct 1, 2019

export GOPATH="$PWD"/.dependencies:"$PWD"
export LOGLEVEL="debug"
go get github.com/pote/philote-go
go get github.com/Shopify/sarama

@romanblanco
Copy link
Author

romanblanco commented Oct 1, 2019

• ~/ SECRET=randomstring bin/philote
INFO[0000] Initializing Philotic Network                 cores=8 port=6380 version=0.3.1
INFO[0003] Message dropped due to insufficient write permissions  channel=write-channel-1 data="My message to the world" philote=9da9bf6a-1aac-4438-a308-68dbd364d3eb

https://github.com/pote/philote/blob/12ba9cb27a18876e4f7f52e475dd301e4736b569/philote.go#L58-L67

@romanblanco
Copy link
Author

kafka consumer created: *sarama.consumer
kafka producer created: *sarama.syncProducer
philote token created: string
philote client 1 created: *philote.Client
philote client 2 created: *philote.Client
philote message sent
philote message received: My message to the world

@romanblanco
Copy link
Author

romanblanco commented Oct 1, 2019

zookeeper, kafka, philote:

kafka () {
  tmux has-session -t kafka &> /dev/null
  if [ $? -eq 0 ] ; then
    tmux -2 attach-session -t kafka -d
  else
    tmux -2 new-session -A -s kafka -n zookeeper -d 'cd ~/devel/kafka/kafka/ ; bin/zookeeper-server-start.s
h config/zookeeper.properties'
    tmux new-window  -t kafka:2 -n kafka 'cd ~/devel/kafka/kafka/ ; bin/kafka-server-start.sh config/server
.properties'
    tmux new-window  -t kafka:3 -n philote 'SECRET=roman LOGLEVEL=debug ~/bin/philote'
    tmux select-window -t kafka:2
    tmux -2 attach-session -t kafka
  fi
}
• ~/devel/go_kafka/ go run kafka_websocket.go
kafka consumer created: *sarama.consumer
kafka producer created: *sarama.syncProducer
kafka message produced: p: 0, o: 7
kafka message consumed:   Something Cool
philote token created: string
philote client 1 created: *philote.Client
philote client 2 created: *philote.Client
c1: philote message sent
c2: philote message received: My message to the world

@romanblanco
Copy link
Author

@romanblanco
Copy link
Author

cd ~/devel
mkdir kafka
cd kafka
wget http://mirror.cc.columbia.edu/pub/software/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz
extract kafka_2.12-2.5.0.tgz
mv kafka_2.12-2.5.0/ kafka/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment