Skip to content

Instantly share code, notes, and snippets.

@ryanuber
Created October 21, 2014 17:45
Show Gist options
  • Save ryanuber/f91217afbbd1f743076f to your computer and use it in GitHub Desktop.
Save ryanuber/f91217afbbd1f743076f to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"sort"
"time"
"github.com/armon/relay"
)
// WeightedQueue represents a single message queue and its corresponding weight.
// The weight is used when sorting priority, highest weight = highest priority.
type WeightedQueue struct {
Name string
Queue *relay.Relay
Weight int
}
type PriorityQueue struct {
ConsumeWait time.Duration
ConsumeInterval time.Duration
Queues []*WeightedQueue
}
type Descending []*WeightedQueue
// Implement the sort interface
func (p Descending) Len() int {
return len(p)
}
func (p Descending) Less(i, j int) bool {
return p[i].Weight < p[j].Weight
}
func (p Descending) Swap(i, j int) {
p[i], p[j] = p[j], p[i]
}
// GetQueue retrieves a queue by name from the pool
func (p *PriorityQueue) GetQueue(name string) (*relay.Relay, error) {
for _, q := range p.Queues {
if q.Name == name {
return q.Queue, nil
}
}
return nil, fmt.Errorf("No queue named %s", name)
}
// Publish writes a message with a given queue priority and queue name
func (p *PriorityQueue) Publish(priority, qname string, in interface{}) error {
q, err := p.GetQueue(qname)
if err != nil {
return err
}
pub, err := q.Publisher(qname)
if err != nil {
return err
}
return pub.Publish(in)
}
// Consume consumes a single message from a priority queue. It will try each
// queue in the set from highest to lowest priority with a given timeout, and
// return the first message received.
func (p *PriorityQueue) Consume(qname string, out interface{}) error {
// Sort by descending weight
sort.Sort(Descending(p.Queues))
for _, q := range p.Queues {
consumer, err := q.Queue.Consumer(qname)
if err != nil {
return err
}
if err := consumer.ConsumeTimeout(&out, p.ConsumeWait); err != nil {
if err.Error() == "Timeout" {
time.Sleep(p.ConsumeInterval)
continue
} else {
return err
}
}
return nil
}
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment