Skip to content

Instantly share code, notes, and snippets.

@inlinestyle
Last active August 29, 2015 14:06
Show Gist options
  • Select an option

  • Save inlinestyle/76c698151c02de895eff to your computer and use it in GitHub Desktop.

Select an option

Save inlinestyle/76c698151c02de895eff to your computer and use it in GitHub Desktop.
HTTP/JSON API that consumes stream/event POSTs and publishes them to a local Kafka cluser
package main
import (
"encoding/json"
"fmt"
"github.com/codegangsta/martini-contrib/binding"
"github.com/go-martini/martini"
"github.com/shopify/sarama"
"net/http"
)
type Event struct {
Id string `json:"_id"`
Type string `json:"_type"`
}
type EventRef struct {
Id string `json:"$id"`
Type string `json:"$ref"`
}
type Stream struct {
Id string `json:"_id"`
Type string `json:"_type"`
Events []EventRef `json:"events"`
DeviceId string `json:"device_id"`
}
func publish(producer *sarama.Producer, topic string, id string, data interface{}) error {
toPublish, err := json.Marshal(data)
if err != nil {
return err
}
return producer.QueueMessage(topic, sarama.StringEncoder(id), sarama.ByteEncoder(toPublish))
}
func eventHandler(event Event, params martini.Params, producer *sarama.Producer) {
err := publish(producer, "event", params["id"], event)
if err != nil {
panic(err)
}
}
func streamHandler(stream Stream, params martini.Params, producer *sarama.Producer) {
err := publish(producer, "stream", params["id"], stream)
if err != nil {
panic(err)
}
}
func main() {
client, err := sarama.NewClient("test_id", []string{"localhost:9092"}, sarama.NewClientConfig())
if err != nil {
panic(err)
}
defer client.Close()
producer, err := sarama.NewProducer(client, nil)
if err != nil {
panic(err)
}
defer producer.Close()
go func() {
for err := range producer.Errors() {
if err != nil {
fmt.Println(err)
}
}
}()
m := martini.Classic()
m.Map(producer)
m.Post("/stream/(?P<id>[A-Fa-f0-9]{24})", binding.Json(Stream{}), streamHandler)
m.Post("/event/(?P<id>[A-Fa-f0-9]{24})", binding.Json(Event{}), eventHandler)
http.ListenAndServe(":8011", m)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment