-
-
Save rafrombrc/52d610be2ab1bd10841c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
panic: runtime error: send on closed channel | |
goroutine 26 [running]: | |
runtime.panic(0x854940, 0x1017efe) | |
/usr/local/go/src/pkg/runtime/panic.c:266 +0xb6 | |
github.com/thoj/go-ircevent.(*Connection).Privmsg(0xc210050640, 0x0, 0x0, 0x0, 0x0) | |
/heka/build/heka/src/github.com/thoj/go-ircevent/irc.go:226 +0x13f | |
github.com/mozilla-services/heka/plugins/irc.(*IRCOutput).Privmsg(0xc210090320, 0x0, 0x0, 0x0, 0x0, ...) | |
/heka/build/heka/src/github.com/mozilla-services/heka/plugins/irc/irc_output.go:102 +0xab | |
github.com/mozilla-services/heka/plugins/irc.SendFromOutQueue(0xc210090320, 0x7f1195bf5060, 0xc21000fbe0, 0x0, 0x0, ...) | |
/heka/build/heka/src/github.com/mozilla-services/heka/plugins/irc/irc_output.go:144 +0x49 | |
github.com/mozilla-services/heka/plugins/irc.ProcessOutQueue(0xc210090320, 0x7f1195bf5060, 0xc21000fbe0) | |
/heka/build/heka/src/github.com/mozilla-services/heka/plugins/irc/irc_output.go:205 +0xae | |
created by github.com/mozilla-services/heka/plugins/irc.(*IRCOutput).Run | |
/heka/build/heka/src/github.com/mozilla-services/heka/plugins/irc/irc_output.go:312 +0x2e6 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package irc | |
import ( | |
"crypto/tls" | |
"errors" | |
"fmt" | |
"github.com/mozilla-services/heka/pipeline" | |
"github.com/mozilla-services/heka/plugins/tcp" | |
"github.com/thoj/go-ircevent" | |
"sync/atomic" | |
"time" | |
) | |
type IRCOutputConfig struct { | |
Server string `toml:"server"` | |
Nick string `toml:"nick"` | |
Ident string `toml:"ident"` | |
Password string `toml:"password"` | |
Channels []string `toml:"channels"` | |
UseTLS bool `toml:"use_tls"` | |
// Subsection for TLS configuration. | |
Tls tcp.TlsConfig | |
// Should we join and part an irc channel between sending messages? | |
JoinAndPart bool `toml:"join_and_part"` | |
// This controls the size of the OutQueue and Backlog queue for messages. | |
QueueSize int `toml:"queue_size"` | |
RejoinOnKick bool `toml:"rejoin_on_kick"` | |
// Default interval at which IRC messages will be sent is minimum of 2 | |
// seconds between messages. | |
TickerInterval uint `toml:"ticker_interval"` | |
} | |
type IrcMsgQueue chan IrcMsg | |
type IRCOutput struct { | |
*IRCOutputConfig | |
Conn *irc.Connection | |
OutQueue IrcMsgQueue | |
BacklogQueues []IrcMsgQueue | |
JoinedChannels []int32 | |
} | |
type IrcMsg struct { | |
Output []byte | |
IrcChannel string | |
Idx int | |
} | |
const ( | |
// These are replies from the IRC Server | |
CONNECTED = "001" | |
ERROR = "ERROR" // This is what we get on a disconnect | |
QUIT = "QUIT" | |
PART = "PART" | |
KICK = "KICK" | |
IRC_RPL_ENDOFNAMES = "366" | |
// These are to track our JoinedChannels slice of joined/not joined | |
NOTJOINED = 0 | |
JOINED = 1 | |
) | |
func (output *IRCOutput) ConfigStruct() interface{} { | |
return &IRCOutputConfig{ | |
Server: "irc.freenode.net", | |
Nick: "heka_bot", | |
Ident: "heka", | |
Channels: []string{"#heka_bot"}, | |
QueueSize: 100, | |
TickerInterval: uint(2), | |
} | |
} | |
// NewIRCConn creates an *irc.Connection. It handles using Heka's tcp plugin to | |
// create a cryto/tls config | |
func NewIRCConn(config *IRCOutputConfig) (*irc.Connection, error) { | |
conn := irc.IRC(config.Nick, config.Ident) | |
if conn == nil { | |
return nil, errors.New("Nick or Ident cannot be blank") | |
} | |
if config.Server == "" { | |
return nil, errors.New("IRC server cannot be blank.") | |
} | |
if len(config.Channels) < 1 { | |
return nil, errors.New("Need at least 1 channel to join.") | |
} | |
var tlsConf *tls.Config = nil | |
var err error = nil | |
if tlsConf, err = tcp.CreateGoTlsConfig(&config.Tls); err != nil { | |
return nil, fmt.Errorf("TLS init error: %s", err) | |
} | |
conn.UseTLS = config.UseTLS | |
conn.TLSConfig = tlsConf | |
return conn, nil | |
} | |
// Privmsg wraps the irc.Privmsg by accepting an ircMsg struct, and checking if | |
// we've joined a channel before trying to send a message to it. Returns whether | |
// or not the message was successfully sent. | |
func (output *IRCOutput) Privmsg(ircMsg IrcMsg) bool { | |
idx := ircMsg.Idx | |
if atomic.LoadInt32(&output.JoinedChannels[idx]) == JOINED { | |
output.Conn.Privmsg(ircMsg.IrcChannel, string(ircMsg.Output)) | |
} else { | |
return false | |
} | |
if output.JoinAndPart { | |
// Leave the channel if we're configured to part after sending messages. | |
output.Conn.Part(ircMsg.IrcChannel) | |
} | |
return true | |
} | |
// UpdateJoinList atomically updates our global slice of joined channels for a | |
// particular irc channel. It sets the irc channel's joined status to 'status'. | |
// Returns whether or not it found the IRC Channel in our slice. | |
func UpdateJoinList(output *IRCOutput, ircChan string, status int32) bool { | |
for i, channel := range output.Channels { | |
if ircChan == channel { | |
// Update if we have or haven't joined the channel | |
atomic.StoreInt32(&output.JoinedChannels[i], status) | |
return true | |
} | |
} | |
return false | |
} | |
// UpdateJoinListAll sets the status of all IRC Channels in our config to | |
// 'status' | |
func UpdateJoinListAll(output *IRCOutput, status int32) { | |
for channel := range output.Channels { | |
atomic.StoreInt32(&output.JoinedChannels[channel], status) | |
} | |
} | |
// SendFromOutQueue attempts to send a message to the IRC Channel specified in | |
// the ircMsg struct. If sending fails due to not being in the IRC channel, it | |
// will put the message into that IRC Channel's backlog queue. If the queue is | |
// full it will drop the message and log an error. | |
// It returns whether or not a message was successfully delivered to an | |
// IRC channel. | |
func SendFromOutQueue(output *IRCOutput, runner pipeline.OutputRunner, | |
ircMsg IrcMsg) bool { | |
if output.Privmsg(ircMsg) { | |
return true | |
} else { | |
// We haven't joined this channel yet, so we need to send | |
// the message to the backlog queue of messages | |
// Get the proper Channel for the backlog | |
idx := ircMsg.Idx | |
backlogQueue := output.BacklogQueues[idx] | |
select { | |
// try to put the message into the backlog queue | |
case backlogQueue <- ircMsg: | |
// Just putting | |
default: | |
// Failed to put, which means the backlog for this IRC | |
// channel is full. So drop it and log a message. | |
runner.LogError(fmt.Errorf("backlog queue for "+ | |
"IRC Channel %s, full. Dropping message.", | |
ircMsg.IrcChannel)) | |
} | |
return false | |
} | |
return false | |
} | |
// SendFromBacklogQueue attempts to send a message from the first backlog queue | |
// which has a message in it. It returns whether or not a message was | |
// successfully delivered to an IRC channel. | |
func SendFromBacklogQueue(output *IRCOutput, runner pipeline.OutputRunner, | |
ircMsg IrcMsg) bool { | |
// No messages in the out queue, so lets try the backlog queue | |
for i, queue := range output.BacklogQueues { | |
if atomic.LoadInt32(&output.JoinedChannels[i]) != JOINED { | |
continue | |
} | |
select { | |
case ircMsg = <-queue: | |
if output.Privmsg(ircMsg) { | |
return true | |
} | |
default: | |
// No backed up messages for this IRC Channel | |
} | |
} | |
return false | |
} | |
// ProcessOutQueue attempts to send an IRC message from the OutQueue, or the | |
// BacklogQueue if nothing is in the OutQueue. It is throttled by a ticker to | |
// prevent flooding the IRC server. | |
func ProcessOutQueue(output *IRCOutput, runner pipeline.OutputRunner) { | |
var delivered bool | |
var ircMsg ircMsg | |
ok := true | |
// ticker := runner.Ticker() | |
for ok { | |
delivered = false | |
// <-ticker | |
select { | |
case ircMsg, ok = <-output.OutQueue: | |
if !ok { | |
// We havent actually delivered but we want to escape that | |
// loop | |
delivered = true | |
break | |
} | |
delivered = SendFromOutQueue(output, runner, ircMsg) | |
default: | |
// Nothing | |
} | |
if !delivered { | |
SendFromBacklogQueue(...) | |
} | |
} | |
for _, queue := range output.BacklogQueues { | |
close(queue) | |
} | |
for _, queue := range output.BacklogQueues { | |
// drain | |
} | |
} | |
// RegisterCallbacks sets up all the event handler callbacks for recieving | |
// particular irc events. | |
func RegisterCallbacks(output *IRCOutput, runner pipeline.OutputRunner) { | |
// add a callback to check if we've gotten successfully connected | |
output.Conn.AddCallback(CONNECTED, func(event *irc.Event) { | |
for _, ircChan := range output.Channels { | |
// Only join on connect if we aren't going to join whenever we send | |
// a message | |
if !output.JoinAndPart { | |
output.Conn.Join(ircChan) | |
} | |
} | |
}) | |
// Once we've recieved the names list, we've successfully joined the channel | |
// And should begin processing Heka messages | |
output.Conn.AddCallback(IRC_RPL_ENDOFNAMES, func(event *irc.Event) { | |
// This is the actual IRC Channel name (ie: #heka) | |
ircChan := event.Arguments[1] | |
UpdateJoinList(output, ircChan, JOINED) | |
}) | |
// We want to handle errors (disconnects) ourself. | |
output.Conn.ClearCallback(ERROR) | |
output.Conn.AddCallback(ERROR, func(event *irc.Event) { | |
UpdateJoinListAll(output, NOTJOINED) | |
runner.LogMessage("Disconnected from IRC. Retrying to connect in 3 seconds..") | |
time.Sleep(3 * time.Second) | |
err := output.Conn.Reconnect() | |
if err != nil { | |
runner.LogError(fmt.Errorf("Error reconnecting:", err)) | |
output.Conn.Quit() | |
} | |
runner.LogMessage("Reconnected to IRC!") | |
}) | |
output.Conn.AddCallback(KICK, func(event *irc.Event) { | |
ircChan := event.Arguments[0] | |
UpdateJoinList(output, ircChan, NOTJOINED) | |
if output.RejoinOnKick { | |
output.Conn.Join(ircChan) | |
} | |
}) | |
// These next 2 events shouldn't really matter much, but we should update | |
// the JoinList anyways. | |
output.Conn.AddCallback(QUIT, func(event *irc.Event) { | |
UpdateJoinListAll(output, NOTJOINED) | |
}) | |
output.Conn.AddCallback(PART, func(event *irc.Event) { | |
ircChan := event.Arguments[1] | |
UpdateJoinList(output, ircChan, NOTJOINED) | |
}) | |
} | |
func (output *IRCOutput) Init(config interface{}) error { | |
conf := config.(*IRCOutputConfig) | |
output.IRCOutputConfig = conf | |
conn, err := NewIRCConn(conf) | |
if err != nil { | |
return fmt.Errorf("Error setting up IRC Connection: %s", err) | |
} | |
output.Conn = conn | |
// Create our chans for passing messages from the main runner InChan to | |
// the irc channels | |
numChannels := len(output.Channels) | |
output.JoinedChannels = make([]int32, numChannels) | |
output.OutQueue = make(IrcMsgQueue, output.QueueSize) | |
output.BacklogQueues = make([]IrcMsgQueue, numChannels) | |
for queue := range output.BacklogQueues { | |
output.BacklogQueues[queue] = make(IrcMsgQueue, output.QueueSize) | |
} | |
return nil | |
} | |
func (output *IRCOutput) Run(runner pipeline.OutputRunner, | |
helper pipeline.PluginHelper) error { | |
if runner.Encoder() == nil { | |
return errors.New("Encoder required.") | |
} | |
// Register callbacks to handle events | |
RegisterCallbacks(output, runner) | |
var err error | |
// Connect to the IRC Server | |
err = output.Conn.Connect(output.Server) | |
if err != nil { | |
return fmt.Errorf("Unable to connect to irc server %s: %s", | |
output.Server, err) | |
} | |
// Start a goroutine for recieving messages, and throttling before sending | |
// to the IRC Server | |
go ProcessOutQueue(output, runner) | |
var outgoing []byte | |
for pack := range runner.InChan() { | |
outgoing, err = runner.Encode(pack) | |
if err != nil { | |
runner.LogError(err) | |
} | |
// Send the message to each IRC Channel. | |
// If the out queue is full, then we need to drop the message and log | |
// an error. | |
for i, ircChannel := range output.Channels { | |
ircMsg := IrcMsg{outgoing, ircChannel, i} | |
select { | |
case output.OutQueue <- ircMsg: | |
if output.JoinAndPart { | |
// We wont have joined on connect in this case. | |
output.Conn.Join(ircChannel) | |
} | |
default: | |
runner.LogError(errors.New("Dropped message. " + | |
"irc_output OutQueue is full.")) | |
} | |
} | |
pack.Recycle() | |
} | |
close(output.OutQueue) | |
output.Conn.ClearCallback(ERROR) | |
output.Conn.Quit() | |
output.Conn.Disconnect() | |
return nil | |
} | |
func init() { | |
pipeline.RegisterPlugin("IRCOutput", func() interface{} { | |
return new(IRCOutput) | |
}) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
panic: runtime error: send on closed channel | |
goroutine 24 [running]: | |
runtime.panic(0x854940, 0x1017efe) | |
/usr/local/go/src/pkg/runtime/panic.c:266 +0xb6 | |
github.com/mozilla-services/heka/plugins/irc.SendFromOutQueue(0xc2100a4410, 0x7fb519a51340, 0xc21000f0a0, 0x0, 0x0, ...) | |
/heka/build/heka/src/github.com/mozilla-services/heka/plugins/irc/irc_output.go:155 +0xbf | |
github.com/mozilla-services/heka/plugins/irc.ProcessOutQueue(0xc2100a4410, 0x7fb519a51340, 0xc21000f0a0) | |
/heka/build/heka/src/github.com/mozilla-services/heka/plugins/irc/irc_output.go:205 +0xae | |
created by github.com/mozilla-services/heka/plugins/irc.(*IRCOutput).Run | |
/heka/build/heka/src/github.com/mozilla-services/heka/plugins/irc/irc_output.go:312 +0x2e6 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment