Created
January 28, 2017 19:49
-
-
Save ik5/d6486f9c21380d4bfe1a842d655aa3a9 to your computer and use it in GitHub Desktop.
how to re-queue a scheduled item in amqp
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"net/url" | |
"github.com/streadway/amqp" | |
) | |
// InitByConfigServices loads configuration from a conf file | |
func InitByConfigServices() (*amqp.Connection, *amqp.Channel, string) { | |
name := "queue" | |
var err error | |
amqpConn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d%s", | |
url.QueryEscape("guest"), | |
url.QueryEscape(""), | |
"127.0.0.1", | |
5672, | |
"/")) | |
if err != nil { | |
panic(err) | |
} | |
amqpChan, err := amqpConn.Channel() | |
if err != nil { | |
panic(err) | |
} | |
err = amqpChan.Qos( | |
1, // prefetch count | |
0, // prefetch size | |
false, // global | |
) | |
if err != nil { | |
panic(err) | |
} | |
return amqpConn, amqpChan, name | |
} | |
// REQueueDelay srt a delay queue, and when the timeout arrives, it moves it to the worker queue | |
func REQueueDelay(channel *amqp.Channel, name, data string, waitTime uint64, autoDelete bool) { | |
queueName := fmt.Sprintf("%s-%d", name, waitTime) | |
tbl := make(amqp.Table) | |
tbl["x-dead-letter-exchange"] = "" | |
tbl["x-dead-letter-routing-key"] = name | |
tbl["x-message-ttl"] = int64(waitTime * 1000) | |
if autoDelete { | |
tbl["x-expires"] = int64(waitTime + 3600000) | |
} | |
amqpQueue, err := channel.QueueDeclare( | |
queueName, // name | |
false, // durable | |
autoDelete, // delete when unused | |
false, // exclusive | |
false, // no-wait | |
tbl, // arguments | |
) | |
if err != nil { | |
panic(err) | |
} | |
err = channel.Qos( | |
1, // prefetch count | |
0, // prefetch size | |
false, // global | |
) | |
if err != nil { | |
panic(err) | |
} | |
err = channel.Publish( | |
"", // exchange | |
amqpQueue.Name, // routing key | |
false, // mandatory | |
false, | |
amqp.Publishing{ | |
DeliveryMode: amqp.Persistent, | |
ContentType: "application/json", | |
Body: []byte(data), | |
}) | |
if err != nil { | |
panic(err) | |
} | |
} | |
func main() { | |
conn, channel, tbl := InitByConfigServices() | |
defer func() { | |
_ = channel.Close() | |
_ = conn.Close() | |
}() | |
REQueueDelay(channel, tbl, "{\"id\": 1}", 15, true) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment