Last active
August 29, 2015 14:03
-
-
Save chancez/960fdae4bc34ecfff96e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
panic: runtime error: send on closed channel | |
goroutine 26 [running]: | |
runtime.panic(0x854940, 0x1017efe) | |
/usr/local/go/src/pkg/runtime/panic.c:266 +0xb6 | |
github.com/thoj/go-ircevent.(*Connection).Privmsg(0xc210050640, 0x0, 0x0, 0x0, 0x0) | |
/heka/build/heka/src/github.com/thoj/go-ircevent/irc.go:226 +0x13f | |
github.com/mozilla-services/heka/plugins/irc.(*IRCOutput).Privmsg(0xc210090320, 0x0, 0x0, 0x0, 0x0, ...) | |
/heka/build/heka/src/github.com/mozilla-services/heka/plugins/irc/irc_output.go:102 +0xab | |
github.com/mozilla-services/heka/plugins/irc.SendFromOutQueue(0xc210090320, 0x7f1195bf5060, 0xc21000fbe0, 0x0, 0x0, ...) | |
/heka/build/heka/src/github.com/mozilla-services/heka/plugins/irc/irc_output.go:144 +0x49 | |
github.com/mozilla-services/heka/plugins/irc.ProcessOutQueue(0xc210090320, 0x7f1195bf5060, 0xc21000fbe0) | |
/heka/build/heka/src/github.com/mozilla-services/heka/plugins/irc/irc_output.go:205 +0xae | |
created by github.com/mozilla-services/heka/plugins/irc.(*IRCOutput).Run | |
/heka/build/heka/src/github.com/mozilla-services/heka/plugins/irc/irc_output.go:312 +0x2e6 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package irc | |
import ( | |
"errors" | |
"fmt" | |
"sync/atomic" | |
"time" | |
"github.com/mozilla-services/heka/pipeline" | |
"github.com/thoj/go-ircevent" | |
) | |
// Privmsg wraps the irc.Privmsg by accepting an ircMsg struct, and checking if | |
// we've joined a channel before trying to send a message to it. Returns whether | |
// or not the message was successfully sent. | |
func (output *IrcOutput) Privmsg(ircMsg *IrcMsg) bool { | |
idx := ircMsg.Idx | |
if atomic.LoadInt32(&output.JoinedChannels[idx]) == JOINED { | |
output.Conn.Privmsg(ircMsg.IrcChannel, string(ircMsg.Output)) | |
} else { | |
return false | |
} | |
if output.JoinAndPart { | |
// Leave the channel if we're configured to part after sending messages. | |
output.Conn.Part(ircMsg.IrcChannel) | |
} | |
return true | |
} | |
// updateJoinList atomically updates our global slice of joined channels for a | |
// particular irc channel. It sets the irc channel's joined status to 'status'. | |
// Returns whether or not it found the irc channel in our slice. | |
func updateJoinList(output *IrcOutput, ircChan string, status int32) bool { | |
for i, channel := range output.Channels { | |
if ircChan == channel { | |
// Update if we have or haven't joined the channel | |
atomic.StoreInt32(&output.JoinedChannels[i], status) | |
return true | |
} | |
} | |
return false | |
} | |
// updateJoinListAll sets the status of all irc channels in our config to | |
// 'status' | |
func updateJoinListAll(output *IrcOutput, status int32) { | |
for channel := range output.Channels { | |
atomic.StoreInt32(&output.JoinedChannels[channel], status) | |
} | |
} | |
// sendFromOutQueue attempts to send a message to the irc channel specified in | |
// the ircMsg struct. If sending fails due to not being in the irc channel, it | |
// will put the message into that irc channel's backlog queue. If the queue is | |
// full it will drop the message and log an error. | |
// It returns whether or not a message was successfully delivered to an | |
// irc channel. | |
func sendFromOutQueue(output *IrcOutput, runner pipeline.OutputRunner, | |
ircMsg *IrcMsg) bool { | |
if output.Privmsg(ircMsg) { | |
return true | |
} else { | |
// We haven't joined this channel yet, so we need to send | |
// the message to the backlog queue of messages | |
// Get the proper Channel for the backlog | |
idx := ircMsg.Idx | |
backlogQueue := output.BacklogQueues[idx] | |
select { | |
// try to put the message into the backlog queue | |
case backlogQueue <- *ircMsg: | |
// Just putting | |
default: | |
// Failed to put, which means the backlog for this Irc | |
// channel is full. So drop it and log a message. | |
runner.LogError(fmt.Errorf("backlog queue for "+ | |
"Irc Channel %s, full. Dropping message.", | |
ircMsg.IrcChannel)) | |
} | |
return false | |
} | |
return false | |
} | |
// sendFromBacklogQueue attempts to send a message from the first backlog queue | |
// which has a message in it. It returns whether or not a message was | |
// successfully delivered to an irc channel. | |
func sendFromBacklogQueue(output *IrcOutput, runner pipeline.OutputRunner) bool { | |
var ircMsg IrcMsg | |
// No messages in the out queue, so lets try the backlog queue | |
for i, queue := range output.BacklogQueues { | |
if atomic.LoadInt32(&output.JoinedChannels[i]) != JOINED { | |
continue | |
} | |
select { | |
case ircMsg = <-queue: | |
if output.Privmsg(&ircMsg) { | |
return true | |
} | |
default: | |
// No backed up messages for this irc channel | |
} | |
} | |
return false | |
} | |
// processOutQueue attempts to send an Irc message from the OutQueue, or the | |
// BacklogQueue if nothing is in the OutQueue. It is throttled by a ticker to | |
// prevent flooding the Irc server. | |
func processOutQueue(output *IrcOutput, runner pipeline.OutputRunner) { | |
var delivered bool | |
var ircMsg IrcMsg | |
ok := true | |
ticker := runner.Ticker() | |
for ok { | |
delivered = false | |
<-ticker | |
select { | |
case ircMsg, ok = <-output.OutQueue: | |
if !ok { | |
// We havent actually delivered but we want to escape that | |
// loop. | |
delivered = true | |
// Time to cleanup, and close our chans | |
break | |
} | |
delivered = sendFromOutQueue(output, runner, &ircMsg) | |
default: | |
// Just here to prevent blocking | |
} | |
if !delivered { | |
sendFromBacklogQueue(output, runner) | |
} | |
} | |
// Cleanup heka | |
for _, queue := range output.BacklogQueues { | |
close(queue) | |
} | |
// Try to send the rest of our msgs in the backlog before quitting. | |
for _, queue := range output.BacklogQueues { | |
for msg := range queue { | |
output.Privmsg(&msg) | |
} | |
} | |
// Once we have no messages left, we can quit | |
output.Conn.Quit() | |
output.Conn.Disconnect() | |
} | |
// registerCallbacks sets up all the event handler callbacks for recieving | |
// particular irc events. | |
func registerCallbacks(output *IrcOutput, runner pipeline.OutputRunner) { | |
// add a callback to check if we've gotten successfully connected | |
output.Conn.AddCallback(CONNECTED, func(event *irc.Event) { | |
for _, ircChan := range output.Channels { | |
// Only join on connect if we aren't going to join whenever we send | |
// a message | |
if !output.JoinAndPart { | |
output.Conn.Join(ircChan) | |
} | |
} | |
}) | |
// Once we've recieved the names list, we've successfully joined the channel | |
// And should begin processing Heka messages | |
output.Conn.AddCallback(IRC_RPL_ENDOFNAMES, func(event *irc.Event) { | |
// This is the actual irc channel name (ie: #heka) | |
ircChan := event.Arguments[1] | |
updateJoinList(output, ircChan, JOINED) | |
}) | |
// We want to handle errors (disconnects) ourself. | |
output.Conn.ClearCallback(ERROR) | |
output.Conn.AddCallback(ERROR, func(event *irc.Event) { | |
updateJoinListAll(output, NOTJOINED) | |
runner.LogMessage("Disconnected from Irc. Retrying to connect in 3 seconds..") | |
time.Sleep(3 * time.Second) | |
err := output.Conn.Reconnect() | |
if err != nil { | |
runner.LogError(fmt.Errorf("Error reconnecting:", err)) | |
output.Conn.Quit() | |
} | |
runner.LogMessage("Reconnected to Irc!") | |
}) | |
output.Conn.AddCallback(KICK, func(event *irc.Event) { | |
ircChan := event.Arguments[0] | |
updateJoinList(output, ircChan, NOTJOINED) | |
if output.RejoinOnKick { | |
output.Conn.Join(ircChan) | |
} | |
}) | |
// These next 2 events shouldn't really matter much, but we should update | |
// the JoinList anyways. | |
output.Conn.AddCallback(QUIT, func(event *irc.Event) { | |
updateJoinListAll(output, NOTJOINED) | |
}) | |
output.Conn.AddCallback(PART, func(event *irc.Event) { | |
ircChan := event.Arguments[1] | |
updateJoinList(output, ircChan, NOTJOINED) | |
}) | |
} | |
func (output *IrcOutput) Init(config interface{}) error { | |
conf := config.(*IrcOutputConfig) | |
output.IrcOutputConfig = conf | |
conn, err := output.InitIrcCon(conf) | |
if err != nil { | |
return fmt.Errorf("Error setting up Irc Connection: %s", err) | |
} | |
output.Conn = conn | |
// Create our chans for passing messages from the main runner InChan to | |
// the irc channels | |
numChannels := len(output.Channels) | |
output.JoinedChannels = make([]int32, numChannels) | |
output.OutQueue = make(IrcMsgQueue, output.QueueSize) | |
output.BacklogQueues = make([]IrcMsgQueue, numChannels) | |
for queue := range output.BacklogQueues { | |
output.BacklogQueues[queue] = make(IrcMsgQueue, output.QueueSize) | |
} | |
return nil | |
} | |
func (output *IrcOutput) Run(runner pipeline.OutputRunner, | |
helper pipeline.PluginHelper) error { | |
if runner.Encoder() == nil { | |
return errors.New("Encoder required.") | |
} | |
// Register callbacks to handle events | |
registerCallbacks(output, runner) | |
var err error | |
// Connect to the Irc Server | |
err = output.Conn.Connect(output.Server) | |
if err != nil { | |
return fmt.Errorf("Unable to connect to irc server %s: %s", | |
output.Server, err) | |
} | |
// Start a goroutine for recieving messages, and throttling before sending | |
// to the Irc Server | |
go processOutQueue(output, runner) | |
var outgoing []byte | |
for pack := range runner.InChan() { | |
outgoing, err = runner.Encode(pack) | |
if err != nil { | |
runner.LogError(err) | |
} | |
// Send the message to each irc channel. | |
// If the out queue is full, then we need to drop the message and log | |
// an error. | |
for i, ircChannel := range output.Channels { | |
ircMsg := IrcMsg{outgoing, ircChannel, i} | |
select { | |
case output.OutQueue <- ircMsg: | |
if output.JoinAndPart { | |
// We wont have joined on connect in this case. | |
output.Conn.Join(ircChannel) | |
} | |
default: | |
runner.LogError(errors.New("Dropped message. " + | |
"irc_output OutQueue is full.")) | |
} | |
} | |
pack.Recycle() | |
} | |
close(output.OutQueue) | |
output.Conn.ClearCallback(ERROR) | |
return nil | |
} | |
func init() { | |
pipeline.RegisterPlugin("IrcOutput", func() interface{} { | |
output := new(IrcOutput) | |
output.InitIrcCon = NewIrcConn | |
return output | |
}) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
panic: runtime error: send on closed channel | |
goroutine 24 [running]: | |
runtime.panic(0x854940, 0x1017efe) | |
/usr/local/go/src/pkg/runtime/panic.c:266 +0xb6 | |
github.com/mozilla-services/heka/plugins/irc.SendFromOutQueue(0xc2100a4410, 0x7fb519a51340, 0xc21000f0a0, 0x0, 0x0, ...) | |
/heka/build/heka/src/github.com/mozilla-services/heka/plugins/irc/irc_output.go:155 +0xbf | |
github.com/mozilla-services/heka/plugins/irc.ProcessOutQueue(0xc2100a4410, 0x7fb519a51340, 0xc21000f0a0) | |
/heka/build/heka/src/github.com/mozilla-services/heka/plugins/irc/irc_output.go:205 +0xae | |
created by github.com/mozilla-services/heka/plugins/irc.(*IRCOutput).Run | |
/heka/build/heka/src/github.com/mozilla-services/heka/plugins/irc/irc_output.go:312 +0x2e6 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[100%] Built target flood | |
root@bcb1a1938d83:/heka/build# hekad -config /heka/config.toml | |
2014/07/08 22:32:43 Pre-loading: [Dashboard] | |
2014/07/08 22:32:43 Pre-loading: [CounterFilter] | |
2014/07/08 22:32:43 Pre-loading: [PayloadEncoder] | |
2014/07/08 22:32:43 Pre-loading: [test] | |
2014/07/08 22:32:43 Pre-loading: [TcpInput] | |
2014/07/08 22:32:43 Pre-loading: [StatAccumInput] | |
2014/07/08 22:32:43 Pre-loading: [LogOutput] | |
2014/07/08 22:32:43 Pre-loading: [IRCOutput] | |
2014/07/08 22:32:43 Pre-loading: [ProtobufDecoder] | |
2014/07/08 22:32:43 Pre-loading: [ProtobufEncoder] | |
2014/07/08 22:32:43 Loading: [test] | |
2014/07/08 22:32:43 Loading: [ProtobufDecoder] | |
2014/07/08 22:32:43 Loading: [PayloadEncoder] | |
2014/07/08 22:32:43 Loading: [ProtobufEncoder] | |
2014/07/08 22:32:43 Loading: [TcpInput] | |
2014/07/08 22:32:43 Loading: [StatAccumInput] | |
2014/07/08 22:32:43 Loading: [CounterFilter] | |
2014/07/08 22:32:43 Loading: [Dashboard] | |
2014/07/08 22:32:44 Loading: [LogOutput] | |
2014/07/08 22:32:44 Loading: [IRCOutput] | |
2014/07/08 22:32:44 Starting hekad... | |
2014/07/08 22:32:44 Output started: Dashboard | |
2014/07/08 22:32:44 Output started: LogOutput | |
2014/07/08 22:32:44 Output started: IRCOutput | |
2014/07/08 22:32:44 Filter started: CounterFilter | |
2014/07/08 22:32:44 MessageRouter started. | |
2014/07/08 22:32:44 Input started: TcpInput | |
2014/07/08 22:32:44 Input started: StatAccumInput | |
2014/07/08 22:32:44 Connected to irc.mozilla.org:6667 (63.245.216.214:6667) | |
Before okay: true | |
Okay1: false | |
Okay2: false | |
Sending from Backlog queue | |
Okay3: false | |
Out of loop | |
2014/07/08 22:32:46 Got 3 messages. 1.51 msg/sec | |
2014/07/08 22:32:47 Got 6 messages. 2.99 msg/sec | |
2014/07/08 22:32:48 Got 9 messages. 3.00 msg/sec | |
^C2014/07/08 22:32:49 Shutdown initiated. | |
2014/07/08 22:32:49 Stop message sent to input 'TcpInput' | |
2014/07/08 22:32:49 Stop message sent to input 'StatAccumInput' | |
2014/07/08 22:32:49 Input 'TcpInput': stopped | |
2014/07/08 22:32:49 Input 'StatAccumInput': stopped | |
2014/07/08 22:32:49 Waiting for decoders shutdown | |
2014/07/08 22:32:49 Decoders shutdown complete | |
2014/07/08 22:32:49 Stop message sent to filter 'CounterFilter' | |
2014/07/08 22:32:49 Plugin 'CounterFilter': stopped | |
2014/07/08 22:32:49 Stop message sent to output 'Dashboard' | |
2014/07/08 22:32:49 Plugin 'Dashboard': stopped | |
2014/07/08 22:32:49 Stop message sent to output 'LogOutput' | |
2014/07/08 22:32:49 Plugin 'LogOutput': stopped | |
2014/07/08 22:32:49 Stop message sent to output 'IRCOutput' | |
here | |
2014/07/08 22:32:49 Plugin 'IRCOutput': stopped | |
2014/07/08 22:32:49 Shutdown complete. | |
root@bcb1a1938d83:/heka/build# |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment