Last active
August 29, 2015 14:06
-
-
Save inlinestyle/76c698151c02de895eff to your computer and use it in GitHub Desktop.
HTTP/JSON API that consumes stream/event POSTs and publishes them to a local Kafka cluser
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package main | |
| import ( | |
| "encoding/json" | |
| "fmt" | |
| "github.com/codegangsta/martini-contrib/binding" | |
| "github.com/go-martini/martini" | |
| "github.com/shopify/sarama" | |
| "net/http" | |
| ) | |
| type Event struct { | |
| Id string `json:"_id"` | |
| Type string `json:"_type"` | |
| } | |
| type EventRef struct { | |
| Id string `json:"$id"` | |
| Type string `json:"$ref"` | |
| } | |
| type Stream struct { | |
| Id string `json:"_id"` | |
| Type string `json:"_type"` | |
| Events []EventRef `json:"events"` | |
| DeviceId string `json:"device_id"` | |
| } | |
| func publish(producer *sarama.Producer, topic string, id string, data interface{}) error { | |
| toPublish, err := json.Marshal(data) | |
| if err != nil { | |
| return err | |
| } | |
| return producer.QueueMessage(topic, sarama.StringEncoder(id), sarama.ByteEncoder(toPublish)) | |
| } | |
| func eventHandler(event Event, params martini.Params, producer *sarama.Producer) { | |
| err := publish(producer, "event", params["id"], event) | |
| if err != nil { | |
| panic(err) | |
| } | |
| } | |
| func streamHandler(stream Stream, params martini.Params, producer *sarama.Producer) { | |
| err := publish(producer, "stream", params["id"], stream) | |
| if err != nil { | |
| panic(err) | |
| } | |
| } | |
| func main() { | |
| client, err := sarama.NewClient("test_id", []string{"localhost:9092"}, sarama.NewClientConfig()) | |
| if err != nil { | |
| panic(err) | |
| } | |
| defer client.Close() | |
| producer, err := sarama.NewProducer(client, nil) | |
| if err != nil { | |
| panic(err) | |
| } | |
| defer producer.Close() | |
| go func() { | |
| for err := range producer.Errors() { | |
| if err != nil { | |
| fmt.Println(err) | |
| } | |
| } | |
| }() | |
| m := martini.Classic() | |
| m.Map(producer) | |
| m.Post("/stream/(?P<id>[A-Fa-f0-9]{24})", binding.Json(Stream{}), streamHandler) | |
| m.Post("/event/(?P<id>[A-Fa-f0-9]{24})", binding.Json(Event{}), eventHandler) | |
| http.ListenAndServe(":8011", m) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment