Skip to content

Instantly share code, notes, and snippets.

@snichme
Created March 20, 2026 13:20
Show Gist options
  • Select an option

  • Save snichme/cb23deb8e06363deb6d7b759c6eaff09 to your computer and use it in GitHub Desktop.

Select an option

Save snichme/cb23deb8e06363deb6d7b759c6eaff09 to your computer and use it in GitHub Desktop.
Tests for PR 1721 in lavinmq
#!/usr/bin/env ruby
# frozen_string_literal: true
# Test script to exercise the unified cleanup_message_channel code paths
# in LavinMQ's queue implementation.
#
# Triggers both CleanupReason::Overflow and CleanupReason::TTLChange
# through various AMQP operations.
#
# Requires: gem install amqp-client
require "amqp-client"
AMQP_URL = ENV.fetch("AMQP_URL", "amqp://guest:guest@localhost")
def with_connection(&block)
AMQP::Client.new(AMQP_URL).start(&block)
end
def separator(title)
puts "\n#{"=" * 60}"
puts " #{title}"
puts "=" * 60
end
# ---------------------------------------------------------------------------
# 1) Overflow via publish — max-length drops oldest messages
# ---------------------------------------------------------------------------
separator "1. Overflow on publish (max-length)"
with_connection do |conn|
ch = conn.channel
q = ch.queue("test.overflow.maxlen", arguments: {"x-max-length" => 3})
q.purge
5.times { |i| q.publish("msg-#{i}") }
sleep 0.2
count = q.message_count
puts " Published 5, max-length=3 → message_count=#{count}"
raise "expected <= 3, got #{count}" unless count <= 3
q.delete
puts " PASS"
end
# ---------------------------------------------------------------------------
# 2) Overflow via publish — max-length-bytes drops oldest messages
# ---------------------------------------------------------------------------
separator "2. Overflow on publish (max-length-bytes)"
with_connection do |conn|
ch = conn.channel
# Each message body is 100 bytes; allow ~250 bytes worth
q = ch.queue("test.overflow.maxlenbytes", arguments: {"x-max-length-bytes" => 250})
q.purge
5.times { |i| q.publish("x" * 100) }
sleep 0.2
count = q.message_count
puts " Published 5x100B, max-length-bytes=250 → message_count=#{count}"
raise "expected <= 3, got #{count}" unless count <= 3
q.delete
puts " PASS"
end
# ---------------------------------------------------------------------------
# 3) Overflow via reject/nack with requeue
# ---------------------------------------------------------------------------
separator "3. Overflow on requeue (basic.nack requeue=true)"
with_connection do |conn|
ch = conn.channel
q = ch.queue("test.overflow.requeue", arguments: {"x-max-length" => 3})
q.purge
3.times { |i| q.publish("msg-#{i}") }
sleep 0.1
# Consume one, then nack with requeue — triggers Overflow on requeue path
ch.prefetch(1)
msg = q.get(no_ack: false)
if msg
puts " Got message: #{msg.body}"
ch.basic_nack(msg.delivery_tag, requeue: true)
sleep 0.2
count = q.message_count
puts " After nack+requeue → message_count=#{count}"
else
puts " No message to nack"
end
q.delete
puts " PASS"
end
# ---------------------------------------------------------------------------
# 4) TTLChange via message-ttl — messages expire after TTL
# ---------------------------------------------------------------------------
separator "4. TTLChange via message-ttl (messages expire)"
with_connection do |conn|
ch = conn.channel
q = ch.queue("test.ttl.expire", arguments: {"x-message-ttl" => 500})
q.purge
3.times { |i| q.publish("ttl-msg-#{i}") }
puts " Published 3 messages with x-message-ttl=500ms"
before = q.message_count
puts " Before expiry: message_count=#{before}"
sleep 1.0
after = q.message_count
puts " After 1s: message_count=#{after}"
raise "expected 0, got #{after}" unless after == 0
q.delete
puts " PASS"
end
# ---------------------------------------------------------------------------
# 5) TTLChange via per-message TTL expiration
# ---------------------------------------------------------------------------
separator "5. TTLChange via per-message TTL (expiration property)"
with_connection do |conn|
ch = conn.channel
q = ch.queue("test.ttl.per_message")
q.purge
# Publish with short per-message TTL
3.times { |i| q.publish("per-ttl-#{i}", properties: {expiration: "500"}) }
puts " Published 3 messages with expiration=500ms"
sleep 1.0
count = q.message_count
puts " After 1s: message_count=#{count}"
raise "expected 0, got #{count}" unless count == 0
q.delete
puts " PASS"
end
# ---------------------------------------------------------------------------
# 6) TTLChange via purge (signals channel to recalculate)
# ---------------------------------------------------------------------------
separator "6. TTLChange via purge"
with_connection do |conn|
ch = conn.channel
q = ch.queue("test.ttl.purge", arguments: {"x-message-ttl" => 60_000})
q.purge
5.times { |i| q.publish("purge-msg-#{i}") }
sleep 0.1
before = q.message_count
puts " Before purge: message_count=#{before}"
q.purge
sleep 0.1
after = q.message_count
puts " After purge: message_count=#{after}"
raise "expected 0, got #{after}" unless after == 0
q.delete
puts " PASS"
end
# ---------------------------------------------------------------------------
# 7) Overflow via policy change (apply max-length policy via management API)
# Uses x-overflow=reject-publish to test the reject path too
# ---------------------------------------------------------------------------
separator "7. Overflow with x-overflow=reject-publish"
with_connection do |conn|
ch = conn.channel
q = ch.queue("test.overflow.reject",
arguments: {"x-max-length" => 2, "x-overflow" => "reject-publish"})
q.purge
ch.confirm_select
results = []
5.times do |i|
begin
q.publish("reject-msg-#{i}")
ch.wait_for_confirm
results << :confirmed
rescue => e
results << :nacked
end
end
count = q.message_count
puts " Published 5, max-length=2, overflow=reject-publish"
puts " Results: #{results.inspect}"
puts " message_count=#{count}"
q.delete
puts " PASS"
end
# ---------------------------------------------------------------------------
# 8) Dead-letter on overflow (max-length with DLX)
# ---------------------------------------------------------------------------
separator "8. Overflow with dead-letter exchange"
with_connection do |conn|
ch = conn.channel
# Set up DLX
ch.exchange_declare("test.dlx", "fanout")
dlq = ch.queue("test.dlq")
dlq.bind("test.dlx", "")
dlq.purge
q = ch.queue("test.overflow.dlx",
arguments: {
"x-max-length" => 2,
"x-dead-letter-exchange" => "test.dlx",
})
q.purge
5.times { |i| q.publish("dlx-msg-#{i}") }
sleep 0.3
main_count = q.message_count
dlq_count = dlq.message_count
puts " Published 5, max-length=2, DLX configured"
puts " Main queue: #{main_count}, DLQ: #{dlq_count}"
raise "expected main<=2, got #{main_count}" unless main_count <= 2
raise "expected dlq>=3, got #{dlq_count}" unless dlq_count >= 3
q.delete
dlq.delete
ch.exchange_delete("test.dlx")
puts " PASS"
end
# ---------------------------------------------------------------------------
# 9) TTLChange + DLX (message expires and lands in DLQ)
# ---------------------------------------------------------------------------
separator "9. Message TTL expiry with dead-letter exchange"
with_connection do |conn|
ch = conn.channel
ch.exchange_declare("test.ttl.dlx", "fanout")
dlq = ch.queue("test.ttl.dlq")
dlq.bind("test.ttl.dlx", "")
dlq.purge
q = ch.queue("test.ttl.dlx.source",
arguments: {
"x-message-ttl" => 500,
"x-dead-letter-exchange" => "test.ttl.dlx",
})
q.purge
3.times { |i| q.publish("ttl-dlx-#{i}") }
puts " Published 3 messages, TTL=500ms, DLX configured"
sleep 1.0
main_count = q.message_count
dlq_count = dlq.message_count
puts " Main queue: #{main_count}, DLQ: #{dlq_count}"
raise "expected main=0, got #{main_count}" unless main_count == 0
raise "expected dlq=3, got #{dlq_count}" unless dlq_count == 3
q.delete
dlq.delete
ch.exchange_delete("test.ttl.dlx")
puts " PASS"
end
separator "All tests passed!"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment