Skip to content

Instantly share code, notes, and snippets.

@rafrombrc
Last active August 29, 2015 14:04
Show Gist options
  • Save rafrombrc/bfdac4c319c68fae21a8 to your computer and use it in GitHub Desktop.
Save rafrombrc/bfdac4c319c68fae21a8 to your computer and use it in GitHub Desktop.
/***** BEGIN LICENSE BLOCK *****
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# The Initial Developer of the Original Code is the Mozilla Foundation.
# Portions created by the Initial Developer are Copyright (C) 2014
# the Initial Developer. All Rights Reserved.
#
# Contributor(s):
# Christian Vozar ([email protected])
#
# ***** END LICENSE BLOCK *****/
package http
import (
"code.google.com/p/go-uuid/uuid"
"fmt"
"github.com/mozilla-services/heka/message"
. "github.com/mozilla-services/heka/pipeline"
"io/ioutil"
"net"
"net/http"
"net/url"
"os"
"time"
)
type HttpListenInput struct {
conf *HttpListenInputConfig
listener net.Listener
stopChan chan bool
ir InputRunner
dRunner DecoderRunner
pConfig *PipelineConfig
decoderName string
}
// HTTP Listen Input config struct
type HttpListenInputConfig struct {
// TCP Address to listen to for SNS notifications.
// Defaults to "0.0.0.0:8325".
Address string
// Name of configured decoder instance used to decode the messages.
// Defaults to request body as payload.
Decoder string
}
func (hli *HttpListenInput) ConfigStruct() interface{} {
return &HttpListenInputConfig{
Address: "127.0.0.1:8325",
}
}
type handler struct {
active bool
hli *HttpListenInput
}
func (h *handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if h.active {
h.serveHTTP(w, req)
}
}
func (h *handler) serveHTTP(w http.ResponseWriter, req *http.Request) {
hli := h.hli
body, err := ioutil.ReadAll(req.Body)
if err != nil {
fmt.Errorf("[HttpListenInput] Read HTTP request body fail: %s\n", err.Error())
}
req.Body.Close()
unEscapedBody, _ := url.QueryUnescape(string(body))
fmt.Println("Asking for inchan")
pack := <-hli.ir.InChan()
pack.Message.SetUuid(uuid.NewRandom())
pack.Message.SetTimestamp(time.Now().UnixNano())
pack.Message.SetType("heka.httpdata.request")
pack.Message.SetLogger(hli.ir.Name())
pack.Message.SetHostname(req.RemoteAddr)
pack.Message.SetPid(int32(os.Getpid()))
pack.Message.SetSeverity(int32(6))
pack.Message.SetPayload(unEscapedBody)
if field, err := message.NewField("Protocol", req.Proto, ""); err == nil {
pack.Message.AddField(field)
} else {
hli.ir.LogError(fmt.Errorf("can't add field: %s", err))
}
if field, err := message.NewField("UserAgent", req.UserAgent(), ""); err == nil {
pack.Message.AddField(field)
} else {
hli.ir.LogError(fmt.Errorf("can't add field: %s", err))
}
if field, err := message.NewField("ContentType", req.Header.Get("Content-Type"), ""); err == nil {
pack.Message.AddField(field)
} else {
hli.ir.LogError(fmt.Errorf("can't add field: %s", err))
}
for key, values := range req.URL.Query() {
for i := range values {
value := values[i]
if field, err := message.NewField(key, value, ""); err == nil {
pack.Message.AddField(field)
} else {
hli.ir.LogError(fmt.Errorf("can't add field: %s", err))
}
}
}
if hli.dRunner == nil {
hli.ir.Inject(pack)
} else {
fmt.Println("Putting on drunnerchan")
hli.dRunner.InChan() <- pack
}
}
func (hli *HttpListenInput) Init(config interface{}) (err error) {
hli.stopChan = make(chan bool, 1)
hli.conf = config.(*HttpListenInputConfig)
hli.decoderName = hli.conf.Decoder
return nil
}
func (hli *HttpListenInput) Run(ir InputRunner, h PluginHelper) (err error) {
var ok bool
hli.ir = ir
hli.pConfig = h.PipelineConfig()
if hli.decoderName != "" {
fullName := fmt.Sprintf("%s-%s", ir.Name(), hli.decoderName)
fmt.Println("Asking for: ", fullName)
if hli.dRunner, ok = h.DecoderRunner(hli.decoderName, fullName); !ok {
return fmt.Errorf("Decoder not found: %s", hli.decoderName)
}
}
hli.listener, err = net.Listen("tcp", hli.conf.Address)
if err != nil {
return fmt.Errorf("[HttpListenInput] Listener [%s] start fail: %s\n",
hli.conf.Address, err.Error())
} else {
hli.ir.LogMessage(fmt.Sprintf("[HttpListenInput (%s)] Listening.",
hli.conf.Address))
}
hdlr := &handler{
active: true,
hli: hli,
}
err = http.Serve(hli.listener, hdlr)
hdlr.active = false
if err != nil {
return fmt.Errorf("[HttpListenInput] Serve fail: %s\n", err.Error())
}
<-hli.stopChan
return nil
}
func (hli *HttpListenInput) Stop() {
hli.listener.Close()
close(hli.stopChan)
}
func init() {
RegisterPlugin("HttpListenInput", func() interface{} {
return new(HttpListenInput)
})
}
/***** BEGIN LICENSE BLOCK *****
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# The Initial Developer of the Original Code is the Mozilla Foundation.
# Portions created by the Initial Developer are Copyright (C) 2014
# the Initial Developer. All Rights Reserved.
#
# ***** END LICENSE BLOCK *****/
package http
import (
"fmt"
. "github.com/mozilla-services/heka/pipeline"
pipeline_ts "github.com/mozilla-services/heka/pipeline/testsupport"
"github.com/mozilla-services/heka/pipelinemock"
plugins_ts "github.com/mozilla-services/heka/plugins/testsupport"
"github.com/rafrombrc/gomock/gomock"
gs "github.com/rafrombrc/gospec/src/gospec"
"net/http"
)
var i = 0
func HttpListenInputSpec(c gs.Context) {
t := &pipeline_ts.SimpleT{}
i++
fmt.Println("i: ", i)
ctrl := gomock.NewController(t)
defer ctrl.Finish()
c.Specify("A HttpListenInput", func() {
pConfig := NewPipelineConfig(nil)
fmt.Println("creating input")
httpListenInput := HttpListenInput{}
ith := new(plugins_ts.InputTestHelper)
ith.MockHelper = pipelinemock.NewMockPluginHelper(ctrl)
ith.MockInputRunner = pipelinemock.NewMockInputRunner(ctrl)
errChan := make(chan error)
startInput := func() {
go func() {
errChan <- httpListenInput.Run(ith.MockInputRunner, ith.MockHelper)
}()
}
ith.PackSupply = make(chan *PipelinePack, 1)
ith.Pack = NewPipelinePack(ith.PackSupply)
ith.PackSupply <- ith.Pack
//defer ith.Pack.Recycle()
config := httpListenInput.ConfigStruct().(*HttpListenInputConfig)
config.Address = "127.0.0.1:8325"
config.Decoder = "PayloadJsonDecoder"
ith.MockHelper.EXPECT().PipelineConfig().Return(pConfig)
ith.MockInputRunner.EXPECT().InChan().Return(ith.PackSupply)
ith.MockInputRunner.EXPECT().Name().Return("HttpListenInput").Times(2)
ith.MockInputRunner.EXPECT().LogMessage(gomock.Any())
dRunnerInChan := make(chan *PipelinePack, 1)
mockDecoderRunner := pipelinemock.NewMockDecoderRunner(ctrl)
mockDecoderRunner.EXPECT().InChan().Return(dRunnerInChan)
ith.MockHelper.EXPECT().DecoderRunner("PayloadJsonDecoder",
"HttpListenInput-PayloadJsonDecoder").Return(mockDecoderRunner, true)
err := httpListenInput.Init(config)
c.Assume(err, gs.IsNil)
c.Specify("Adds query parameters to the message pack as fields", func() {
fmt.Println("test1")
startInput()
resp, err := http.Get("http://127.0.0.1:8325/?test=Hello%20World")
fmt.Println("after resp 1")
c.Assume(err, gs.IsNil)
resp.Body.Close()
c.Assume(resp.StatusCode, gs.Equals, 200)
fmt.Println("waiting for pack on drunner 1")
pack := <-dRunnerInChan
fmt.Println("rec'd pack on drunner 1")
fieldValue, ok := pack.Message.GetFieldValue("test")
c.Assume(ok, gs.IsTrue)
c.Expect(fieldValue, gs.Equals, "Hello World")
fmt.Println("before stop 1")
httpListenInput.Stop()
fmt.Println("after stop 1")
err = <-errChan
fmt.Println("err: ", err.Error())
})
// c.Specify("Adds custom http headers to the request", func() {
// fmt.Println("test2")
// startInput()
// // httpListenInput.conf.Headers = http.Header{
// // "One": []string{"two", "three"},
// // "Four": []string{"five", "six", "seven"},
// // }
// // startInput()
// resp, err := http.Get("http://127.0.0.1:8325/?test=Hello%20World")
// fmt.Println("after resp 2")
// c.Assume(err, gs.IsNil)
// resp.Body.Close()
// c.Assume(resp.StatusCode, gs.Equals, 200)
// fmt.Println("waiting for pack on drunner 2")
// pack := <-dRunnerInChan
// fmt.Println("rec'd pack on drunner 2")
// fieldValue, ok := pack.Message.GetFieldValue("test")
// c.Assume(ok, gs.IsTrue)
// c.Expect(fieldValue, gs.Equals, "Hello World")
// // fmt.Println("header:", resp.Header)
// fmt.Println("before stop 2")
// httpListenInput.Stop()
// fmt.Println("after stop 2")
// err = <-errChan
// fmt.Println("err: ", err.Error())
// })
c.Specify("test", func() {
resp, err := http.Get("http://127.0.0.1:8325/?test=Hello%30World")
c.Assume(err, gs.IsNil)
fmt.Println("after resp 2")
resp.Body.Close()
c.Assume(resp.StatusCode, gs.Equals, 200)
})
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment