Skip to content

Instantly share code, notes, and snippets.

@araddon
Created August 25, 2012 01:36
Show Gist options
  • Select an option

  • Save araddon/3458566 to your computer and use it in GitHub Desktop.

Select an option

Save araddon/3458566 to your computer and use it in GitHub Desktop.
Http to Kafka Go example
package main
import (
"flag"
kafka "github.com/araddon/kafka/clients/go/src"
"io"
"log"
"net/http"
"github.com/araddon/gou"
)
var (
host *string = flag.String("host", "localhost:9092", "Kafka Host")
httpToKafka = make(chan []byte, 2000)
)
func main() {
startKafkaProducer()
startHttp()
}
func startKafkaProducer() {
// make single channel finished
finished := make(chan bool, 1)
// required by kafka
kafkaChan := make(chan *kafka.MessageTopic, 2000)
// watch for finish
gou.RegisterEventHandler("onexit", func() {
finished <- true
log.Println("shutting down kafka channel ")
close(kafkaChan)
})
//NewPartitionedProducer(hostname string, topic string, partitions []int)
broker := kafka.NewPartitionedProducer(*host, "test", []int{1})
go broker.PublishOnChannel(kafkaChan, 2000, 200, finished)
go func() {
// block waiting for messages from http
for line := range httpToKafka {
// forward message to kafka
log.Println("sending to kafka ", string(line))
kafkaChan <- kafka.NewMessageTopic("test", line)
}
}()
}
// a blocking http server
func startHttp() {
http.Handle("/", http.HandlerFunc(homeGet))
http.HandleFunc("/collect", http.HandlerFunc(collector))
log.Fatal(http.ListenAndServe(":8080", nil))
}
func collector(w http.ResponseWriter, r *http.Request) {
httpToKafka <- []byte(r.URL.RawQuery)
w.WriteHeader(202)
}
func homeGet(w http.ResponseWriter, r *http.Request) {
io.WriteString(w, `<html><body>
<h2>Welcome to the Http Kafka Service Server</h2>
<a href="/collect">Send a get query to collect and it will forward name/value pairs</a></br>
</body></html>`)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment