Created
June 2, 2014 04:49
-
-
Save shouichi/eb540705c7e8402cbd4d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
kafka "github.com/Shopify/sarama" | |
zmq "github.com/pebbe/zmq4" | |
"os" | |
"os/signal" | |
"time" | |
) | |
// Read messages from kafka and publish them to workers. | |
func main() { | |
client, err := kafka.NewClient( | |
"my_client", | |
[]string{"localhost:9092"}, | |
nil, | |
) | |
if err != nil { | |
panic(err) | |
} | |
defer client.Close() | |
consumer, err := kafka.NewConsumer( | |
client, | |
"my_topic", | |
0, | |
"my_consumer_group", | |
kafka.NewConsumerConfig(), | |
) | |
if err != nil { | |
panic(err) | |
} | |
defer consumer.Close() | |
socket, err := zmq.NewSocket(zmq.PUB) | |
if err != nil { | |
panic(err) | |
} | |
defer socket.Close() | |
err = socket.Bind("tcp://*:3000") | |
if err != nil { | |
panic(err) | |
} | |
c := make(chan os.Signal, 1) | |
signal.Notify(c, os.Interrupt) | |
time.Sleep(1 * time.Second) | |
consumerLoop: | |
for { | |
select { | |
case event := <-consumer.Events(): | |
if event.Err != nil { | |
panic(event.Err) | |
} | |
_, err := socket.SendBytes(event.Value, 0) | |
if err != nil { | |
fmt.Println(err) | |
} | |
case <-c: | |
break consumerLoop | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
zmq "github.com/pebbe/zmq4" | |
"fmt" | |
) | |
// Read messages from zmq and do some work. | |
func main() { | |
subscriber, err := zmq.NewSocket(zmq.SUB) | |
if err != nil { | |
panic(err) | |
} | |
defer subscriber.Close() | |
err = subscriber.Connect("tcp://localhost:3000") | |
if err != nil { | |
panic(err) | |
} | |
subscriber.SetSubscribe("") | |
for { | |
msg, err := subscriber.Recv(0) | |
if err != nil { | |
panic(err) | |
} | |
fmt.Println(msg) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment