Last active
September 22, 2015 20:32
-
-
Save ORBAT/fd5786828f1d6b2ae1e0 to your computer and use it in GitHub Desktop.
HTTP -> Kafka server
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"errors" | |
"fmt" | |
"io" | |
"io/ioutil" | |
"log" | |
"net/http" | |
"os" | |
"strconv" | |
"time" | |
"gopkg.in/Shopify/sarama.v1" | |
"gopkg.in/alecthomas/kingpin.v2" | |
"gopkg.in/bahlo/goat.v1" | |
) | |
var ( | |
okMsg = []byte("{\"ok\": true}") | |
logger *log.Logger | |
) | |
var ( | |
listenAddr = kingpin.Flag("listen", "Address to listen on").Short('l').Default(":8080").String() | |
brokerAddrs = kingpin.Flag("brokers", "Kafka broker hostname list").Short('b').Default("localhost:9092").OverrideDefaultFromEnvar("KAFKA_PEERS").Strings() | |
topicList = kingpin.Flag("topic", "Topic to create route for. Can be given multiple times").Short('t').Required().Strings() | |
verbose = kingpin.Flag("verbose", "Output debug information to stderr").Short('v').Bool() | |
) | |
func writeError(w http.ResponseWriter, code int, err error) error { | |
w.Header().Set("Content-Type", "application/json; charset=utf-8") | |
w.WriteHeader(code) | |
return goat.WriteJSON(w, map[string]interface{}{"ok": false, "error": err.Error()}) | |
} | |
func newSyncProducer(topic string, reqAcks sarama.RequiredAcks) goat.Handle { | |
config := sarama.NewConfig() | |
config.Producer.Partitioner = sarama.NewRandomPartitioner | |
config.Producer.RequiredAcks = reqAcks | |
sp, err := sarama.NewSyncProducer(*brokerAddrs, config) | |
if err != nil { | |
panic(err) | |
} | |
return func(w http.ResponseWriter, r *http.Request, p goat.Params) { | |
body, err := ioutil.ReadAll(r.Body) | |
if err != nil { | |
writeError(w, 400, errors.New("Error reading request body")) | |
return | |
} | |
msg := &sarama.ProducerMessage{ | |
Topic: topic, | |
Key: sarama.StringEncoder(strconv.Itoa(int(time.Now().Unix()))), | |
Value: sarama.ByteEncoder(body), | |
} | |
log.Printf("Sync send of %p with RequiredAcks %d", msg, reqAcks) | |
if p, o, err := sp.SendMessage(msg); err != nil { | |
writeError(w, 500, err) | |
} else { | |
log.Printf("Sent %p", msg) | |
goat.WriteJSON(w, map[string]interface{}{"ok": true, "partition": p, "offset": o}) | |
} | |
} | |
} | |
func newAsyncProducer(topic string) goat.Handle { | |
config := sarama.NewConfig() | |
config.Producer.Partitioner = sarama.NewRandomPartitioner | |
config.Producer.RequiredAcks = sarama.NoResponse | |
config.Producer.Return.Errors = false | |
ap, err := sarama.NewAsyncProducer(*brokerAddrs, config) | |
if err != nil { | |
panic(err) | |
} | |
return func(w http.ResponseWriter, r *http.Request, p goat.Params) { | |
body, err := ioutil.ReadAll(r.Body) | |
if err != nil { | |
writeError(w, 400, errors.New("Error reading request body")) | |
return | |
} | |
log.Printf("Async write of %d bytes to topic %s", len(body), topic) | |
ap.Input() <- &sarama.ProducerMessage{ | |
Topic: topic, | |
Key: sarama.StringEncoder(strconv.Itoa(int(time.Now().Unix()))), | |
Value: sarama.ByteEncoder(body), | |
} | |
w.Header().Set("Content-Type", "application/json; charset=utf-8") | |
w.Write(okMsg) | |
} | |
} | |
func main() { | |
kingpin.Parse() | |
var logOut io.Writer | |
if *verbose { | |
logOut = os.Stderr | |
} else { | |
logOut = ioutil.Discard | |
} | |
logger = log.New(logOut, "[http-kafka] ", log.LstdFlags) | |
r := goat.New() | |
v1 := r.Subrouter("/v1/") | |
replicaVals := []sarama.RequiredAcks{sarama.WaitForLocal, sarama.WaitForAll, sarama.NoResponse} | |
for _, topic := range *topicList { | |
tr := v1.Subrouter(topic) | |
log.Printf("Creating routes for topic %s", topic) | |
for _, raq := range replicaVals { | |
tr.Post(fmt.Sprintf("/send/sync/%d", raq), "send_sync", newSyncProducer(topic, raq)) | |
} | |
tr.Post("/send/async", "send_async", newAsyncProducer(topic)) | |
} | |
if err := r.Run(*listenAddr); err != nil { | |
panic(err) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
go run http-kafka.go --topic topic1 --topic topic2 -v &
curl -X POST http://localhost:8080/v1/topic1/send/sync/1 -d "DOHOI"