Last active
June 6, 2020 11:31
-
-
Save romanblanco/489dbb0673f31c553c8f674a0886b278 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"time" | |
"github.com/Shopify/sarama" | |
"github.com/pote/philote-go" | |
) | |
func main() { | |
kafkaConfig := sarama.NewConfig() | |
kafkaConfig.Consumer.Return.Errors = true | |
kafkaConfig.Producer.RequiredAcks = sarama.WaitForAll | |
kafkaConfig.Producer.Retry.Max = 5 | |
kafkaConfig.Producer.Return.Successes = true | |
// Specify brokers address. This is default one | |
brokers := []string{"localhost:9092"} | |
// create a new consumer | |
master, err := sarama.NewConsumer(brokers, kafkaConfig) | |
if err != nil { | |
panic(err) | |
} | |
fmt.Printf("kafka consumer created: %T\n", master) | |
defer func() { | |
if err := master.Close(); err != nil { | |
panic(err) | |
} | |
}() | |
// create a new producer | |
producer, err := sarama.NewSyncProducer(brokers, kafkaConfig) | |
if err != nil { | |
// Should not reach here | |
panic(err) | |
} | |
fmt.Printf("kafka producer created: %T\n", producer) | |
defer func() { | |
if err := producer.Close(); err != nil { | |
panic(err) | |
} | |
}() | |
// set a kafka topic to follow | |
topic := "test" | |
// produce a kafka message | |
msg := &sarama.ProducerMessage{ | |
Topic: topic, | |
Value: sarama.StringEncoder("Something Cool"), | |
} | |
partition, offset, err := producer.SendMessage(msg) | |
if err != nil { | |
panic(err) | |
} | |
fmt.Printf("kafka message produced: p: %v, o: %v\n", partition, offset) | |
// consume a kafka message | |
consumer, err := master.ConsumePartition(topic, 0, sarama.OffsetOldest) | |
if err != nil { | |
panic(err) | |
} | |
select { | |
case err := <-consumer.Errors(): | |
fmt.Println(err) | |
case msg := <-consumer.Messages(): | |
fmt.Println("kafka message consumed: ", string(msg.Key), string(msg.Value)) | |
} | |
// create a websocket token | |
token, _ := philote.NewToken( | |
"roman", | |
[]string{"read-write-channel"}, | |
[]string{"read-write-channel"}, | |
) | |
fmt.Printf("philote token created: %T\n", token) | |
// create a websocket client | |
c1, _ := philote.NewClient("ws://localhost:6380", token) | |
fmt.Printf("philote client 1 created: %T\n", c1) | |
c2, _ := philote.NewClient("ws://localhost:6380", token) | |
fmt.Printf("philote client 2 created: %T\n", c2) | |
// publish a websocket message | |
go func() { | |
time.Sleep(2*time.Second) | |
c1.Publish( | |
&philote.Message{ | |
Channel: "read-write-channel", | |
Data: "My message to the world", | |
}) | |
if err != nil { | |
panic(err) | |
} | |
fmt.Println("c1: philote message sent") | |
}() | |
// receive a websocket message | |
message, err := c2.Receive() | |
if err != nil { | |
panic(err) | |
} | |
fmt.Printf("c2: philote message received: %v\n", message.Data) | |
} |
Author
romanblanco
commented
Sep 30, 2019
export GOPATH="$PWD"/.dependencies:"$PWD"
export LOGLEVEL="debug"
go get github.com/pote/philote-go
go get github.com/Shopify/sarama
• ~/ SECRET=randomstring bin/philote
INFO[0000] Initializing Philotic Network cores=8 port=6380 version=0.3.1
INFO[0003] Message dropped due to insufficient write permissions channel=write-channel-1 data="My message to the world" philote=9da9bf6a-1aac-4438-a308-68dbd364d3eb
kafka consumer created: *sarama.consumer
kafka producer created: *sarama.syncProducer
philote token created: string
philote client 1 created: *philote.Client
philote client 2 created: *philote.Client
philote message sent
philote message received: My message to the world
zookeeper, kafka, philote:
kafka () {
tmux has-session -t kafka &> /dev/null
if [ $? -eq 0 ] ; then
tmux -2 attach-session -t kafka -d
else
tmux -2 new-session -A -s kafka -n zookeeper -d 'cd ~/devel/kafka/kafka/ ; bin/zookeeper-server-start.s
h config/zookeeper.properties'
tmux new-window -t kafka:2 -n kafka 'cd ~/devel/kafka/kafka/ ; bin/kafka-server-start.sh config/server
.properties'
tmux new-window -t kafka:3 -n philote 'SECRET=roman LOGLEVEL=debug ~/bin/philote'
tmux select-window -t kafka:2
tmux -2 attach-session -t kafka
fi
}
• ~/devel/go_kafka/ go run kafka_websocket.go
kafka consumer created: *sarama.consumer
kafka producer created: *sarama.syncProducer
kafka message produced: p: 0, o: 7
kafka message consumed: Something Cool
philote token created: string
philote client 1 created: *philote.Client
philote client 2 created: *philote.Client
c1: philote message sent
c2: philote message received: My message to the world
cd ~/devel
mkdir kafka
cd kafka
wget http://mirror.cc.columbia.edu/pub/software/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz
extract kafka_2.12-2.5.0.tgz
mv kafka_2.12-2.5.0/ kafka/
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment