Created
August 25, 2012 01:36
-
-
Save araddon/3458566 to your computer and use it in GitHub Desktop.
Http to Kafka Go example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package main | |
| import ( | |
| "flag" | |
| kafka "github.com/araddon/kafka/clients/go/src" | |
| "io" | |
| "log" | |
| "net/http" | |
| "github.com/araddon/gou" | |
| ) | |
| var ( | |
| host *string = flag.String("host", "localhost:9092", "Kafka Host") | |
| httpToKafka = make(chan []byte, 2000) | |
| ) | |
| func main() { | |
| startKafkaProducer() | |
| startHttp() | |
| } | |
| func startKafkaProducer() { | |
| // make single channel finished | |
| finished := make(chan bool, 1) | |
| // required by kafka | |
| kafkaChan := make(chan *kafka.MessageTopic, 2000) | |
| // watch for finish | |
| gou.RegisterEventHandler("onexit", func() { | |
| finished <- true | |
| log.Println("shutting down kafka channel ") | |
| close(kafkaChan) | |
| }) | |
| //NewPartitionedProducer(hostname string, topic string, partitions []int) | |
| broker := kafka.NewPartitionedProducer(*host, "test", []int{1}) | |
| go broker.PublishOnChannel(kafkaChan, 2000, 200, finished) | |
| go func() { | |
| // block waiting for messages from http | |
| for line := range httpToKafka { | |
| // forward message to kafka | |
| log.Println("sending to kafka ", string(line)) | |
| kafkaChan <- kafka.NewMessageTopic("test", line) | |
| } | |
| }() | |
| } | |
| // a blocking http server | |
| func startHttp() { | |
| http.Handle("/", http.HandlerFunc(homeGet)) | |
| http.HandleFunc("/collect", http.HandlerFunc(collector)) | |
| log.Fatal(http.ListenAndServe(":8080", nil)) | |
| } | |
| func collector(w http.ResponseWriter, r *http.Request) { | |
| httpToKafka <- []byte(r.URL.RawQuery) | |
| w.WriteHeader(202) | |
| } | |
| func homeGet(w http.ResponseWriter, r *http.Request) { | |
| io.WriteString(w, `<html><body> | |
| <h2>Welcome to the Http Kafka Service Server</h2> | |
| <a href="/collect">Send a get query to collect and it will forward name/value pairs</a></br> | |
| </body></html>`) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment