Created
October 21, 2014 17:45
-
-
Save ryanuber/f91217afbbd1f743076f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"sort" | |
"time" | |
"github.com/armon/relay" | |
) | |
// WeightedQueue represents a single message queue and its corresponding weight. | |
// The weight is used when sorting priority, highest weight = highest priority. | |
type WeightedQueue struct { | |
Name string | |
Queue *relay.Relay | |
Weight int | |
} | |
type PriorityQueue struct { | |
ConsumeWait time.Duration | |
ConsumeInterval time.Duration | |
Queues []*WeightedQueue | |
} | |
type Descending []*WeightedQueue | |
// Implement the sort interface | |
func (p Descending) Len() int { | |
return len(p) | |
} | |
func (p Descending) Less(i, j int) bool { | |
return p[i].Weight < p[j].Weight | |
} | |
func (p Descending) Swap(i, j int) { | |
p[i], p[j] = p[j], p[i] | |
} | |
// GetQueue retrieves a queue by name from the pool | |
func (p *PriorityQueue) GetQueue(name string) (*relay.Relay, error) { | |
for _, q := range p.Queues { | |
if q.Name == name { | |
return q.Queue, nil | |
} | |
} | |
return nil, fmt.Errorf("No queue named %s", name) | |
} | |
// Publish writes a message with a given queue priority and queue name | |
func (p *PriorityQueue) Publish(priority, qname string, in interface{}) error { | |
q, err := p.GetQueue(qname) | |
if err != nil { | |
return err | |
} | |
pub, err := q.Publisher(qname) | |
if err != nil { | |
return err | |
} | |
return pub.Publish(in) | |
} | |
// Consume consumes a single message from a priority queue. It will try each | |
// queue in the set from highest to lowest priority with a given timeout, and | |
// return the first message received. | |
func (p *PriorityQueue) Consume(qname string, out interface{}) error { | |
// Sort by descending weight | |
sort.Sort(Descending(p.Queues)) | |
for _, q := range p.Queues { | |
consumer, err := q.Queue.Consumer(qname) | |
if err != nil { | |
return err | |
} | |
if err := consumer.ConsumeTimeout(&out, p.ConsumeWait); err != nil { | |
if err.Error() == "Timeout" { | |
time.Sleep(p.ConsumeInterval) | |
continue | |
} else { | |
return err | |
} | |
} | |
return nil | |
} | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment