Last active
June 30, 2020 08:13
-
-
Save jehiah/45af9135b6ecf5537646 to your computer and use it in GitHub Desktop.
NSQ Producer testing abstraction
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package nsqutils | |
import ( | |
"sync" | |
"time" | |
nsq "github.com/nsqio/go-nsq" | |
) | |
// Producer is an interface that nsq.Producer fulfills | |
type Producer interface { | |
Publish(topic string, body []byte) error | |
PublishAsync(topic string, body []byte, doneChan chan *nsq.ProducerTransaction, args ...interface{}) error | |
} | |
// TestProducer implements Producer and just counts the number of messages | |
// optionally if it includes a nsq.Producer it will also send messages | |
type TestProducer struct { | |
Counters map[string]int32 | |
Producer *nsq.Producer | |
LastMessage []byte | |
LastTopic string | |
Messages [][]byte | |
sync.Mutex | |
} | |
// Reset the counters | |
func (p *TestProducer) Reset() { | |
p.Lock() | |
p.Counters = nil | |
p.LastMessage = make([]byte, 0) | |
p.LastTopic = "" | |
p.Messages = make([][]byte, 0) | |
p.Unlock() | |
} | |
// Count the total number of events | |
func (p *TestProducer) Count() int32 { | |
p.Lock() | |
defer p.Unlock() | |
var i int32 | |
for _, c := range p.Counters { | |
i += c | |
} | |
return i | |
} | |
// Publish tracks publishing to a topic | |
func (p *TestProducer) Publish(topic string, body []byte) error { | |
p.Lock() | |
defer p.Unlock() | |
if p.Counters == nil { | |
p.Counters = make(map[string]int32) | |
} | |
p.Counters[topic]++ | |
p.LastMessage = body | |
p.LastTopic = topic | |
p.Messages = append(p.Messages, body) | |
if p.Producer != nil { | |
return p.Producer.Publish(topic, body) | |
} | |
return nil | |
} | |
// PublishAsync is a wrapper on Publish | |
func (p *TestProducer) PublishAsync(topic string, body []byte, doneChan chan *nsq.ProducerTransaction, args ...interface{}) error { | |
return p.Publish(topic, body) | |
} | |
// NoOpMessageDelegate can be used when constructing a test nsq.Message to allow Finish/Requeue | |
// messages to be a NoOp | |
type NoOpMessageDelegate struct{} | |
func (d *NoOpMessageDelegate) OnFinish(m *nsq.Message) {} | |
func (d *NoOpMessageDelegate) OnRequeue(m *nsq.Message, delay time.Duration, backoff bool) {} | |
func (d *NoOpMessageDelegate) OnTouch(m *nsq.Message) {} | |
// NoOpMessage makes a message using NoOpMessageDelegate | |
func NoOpMessage(body string) *nsq.Message { | |
return &nsq.Message{Body: []byte(body), Delegate: &NoOpMessageDelegate{}} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment