Created
March 20, 2026 13:20
-
-
Save snichme/cb23deb8e06363deb6d7b759c6eaff09 to your computer and use it in GitHub Desktop.
Tests for PR 1721 in lavinmq
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env ruby | |
| # frozen_string_literal: true | |
| # Test script to exercise the unified cleanup_message_channel code paths | |
| # in LavinMQ's queue implementation. | |
| # | |
| # Triggers both CleanupReason::Overflow and CleanupReason::TTLChange | |
| # through various AMQP operations. | |
| # | |
| # Requires: gem install amqp-client | |
| require "amqp-client" | |
| AMQP_URL = ENV.fetch("AMQP_URL", "amqp://guest:guest@localhost") | |
| def with_connection(&block) | |
| AMQP::Client.new(AMQP_URL).start(&block) | |
| end | |
| def separator(title) | |
| puts "\n#{"=" * 60}" | |
| puts " #{title}" | |
| puts "=" * 60 | |
| end | |
| # --------------------------------------------------------------------------- | |
| # 1) Overflow via publish — max-length drops oldest messages | |
| # --------------------------------------------------------------------------- | |
| separator "1. Overflow on publish (max-length)" | |
| with_connection do |conn| | |
| ch = conn.channel | |
| q = ch.queue("test.overflow.maxlen", arguments: {"x-max-length" => 3}) | |
| q.purge | |
| 5.times { |i| q.publish("msg-#{i}") } | |
| sleep 0.2 | |
| count = q.message_count | |
| puts " Published 5, max-length=3 → message_count=#{count}" | |
| raise "expected <= 3, got #{count}" unless count <= 3 | |
| q.delete | |
| puts " PASS" | |
| end | |
| # --------------------------------------------------------------------------- | |
| # 2) Overflow via publish — max-length-bytes drops oldest messages | |
| # --------------------------------------------------------------------------- | |
| separator "2. Overflow on publish (max-length-bytes)" | |
| with_connection do |conn| | |
| ch = conn.channel | |
| # Each message body is 100 bytes; allow ~250 bytes worth | |
| q = ch.queue("test.overflow.maxlenbytes", arguments: {"x-max-length-bytes" => 250}) | |
| q.purge | |
| 5.times { |i| q.publish("x" * 100) } | |
| sleep 0.2 | |
| count = q.message_count | |
| puts " Published 5x100B, max-length-bytes=250 → message_count=#{count}" | |
| raise "expected <= 3, got #{count}" unless count <= 3 | |
| q.delete | |
| puts " PASS" | |
| end | |
| # --------------------------------------------------------------------------- | |
| # 3) Overflow via reject/nack with requeue | |
| # --------------------------------------------------------------------------- | |
| separator "3. Overflow on requeue (basic.nack requeue=true)" | |
| with_connection do |conn| | |
| ch = conn.channel | |
| q = ch.queue("test.overflow.requeue", arguments: {"x-max-length" => 3}) | |
| q.purge | |
| 3.times { |i| q.publish("msg-#{i}") } | |
| sleep 0.1 | |
| # Consume one, then nack with requeue — triggers Overflow on requeue path | |
| ch.prefetch(1) | |
| msg = q.get(no_ack: false) | |
| if msg | |
| puts " Got message: #{msg.body}" | |
| ch.basic_nack(msg.delivery_tag, requeue: true) | |
| sleep 0.2 | |
| count = q.message_count | |
| puts " After nack+requeue → message_count=#{count}" | |
| else | |
| puts " No message to nack" | |
| end | |
| q.delete | |
| puts " PASS" | |
| end | |
| # --------------------------------------------------------------------------- | |
| # 4) TTLChange via message-ttl — messages expire after TTL | |
| # --------------------------------------------------------------------------- | |
| separator "4. TTLChange via message-ttl (messages expire)" | |
| with_connection do |conn| | |
| ch = conn.channel | |
| q = ch.queue("test.ttl.expire", arguments: {"x-message-ttl" => 500}) | |
| q.purge | |
| 3.times { |i| q.publish("ttl-msg-#{i}") } | |
| puts " Published 3 messages with x-message-ttl=500ms" | |
| before = q.message_count | |
| puts " Before expiry: message_count=#{before}" | |
| sleep 1.0 | |
| after = q.message_count | |
| puts " After 1s: message_count=#{after}" | |
| raise "expected 0, got #{after}" unless after == 0 | |
| q.delete | |
| puts " PASS" | |
| end | |
| # --------------------------------------------------------------------------- | |
| # 5) TTLChange via per-message TTL expiration | |
| # --------------------------------------------------------------------------- | |
| separator "5. TTLChange via per-message TTL (expiration property)" | |
| with_connection do |conn| | |
| ch = conn.channel | |
| q = ch.queue("test.ttl.per_message") | |
| q.purge | |
| # Publish with short per-message TTL | |
| 3.times { |i| q.publish("per-ttl-#{i}", properties: {expiration: "500"}) } | |
| puts " Published 3 messages with expiration=500ms" | |
| sleep 1.0 | |
| count = q.message_count | |
| puts " After 1s: message_count=#{count}" | |
| raise "expected 0, got #{count}" unless count == 0 | |
| q.delete | |
| puts " PASS" | |
| end | |
| # --------------------------------------------------------------------------- | |
| # 6) TTLChange via purge (signals channel to recalculate) | |
| # --------------------------------------------------------------------------- | |
| separator "6. TTLChange via purge" | |
| with_connection do |conn| | |
| ch = conn.channel | |
| q = ch.queue("test.ttl.purge", arguments: {"x-message-ttl" => 60_000}) | |
| q.purge | |
| 5.times { |i| q.publish("purge-msg-#{i}") } | |
| sleep 0.1 | |
| before = q.message_count | |
| puts " Before purge: message_count=#{before}" | |
| q.purge | |
| sleep 0.1 | |
| after = q.message_count | |
| puts " After purge: message_count=#{after}" | |
| raise "expected 0, got #{after}" unless after == 0 | |
| q.delete | |
| puts " PASS" | |
| end | |
| # --------------------------------------------------------------------------- | |
| # 7) Overflow via policy change (apply max-length policy via management API) | |
| # Uses x-overflow=reject-publish to test the reject path too | |
| # --------------------------------------------------------------------------- | |
| separator "7. Overflow with x-overflow=reject-publish" | |
| with_connection do |conn| | |
| ch = conn.channel | |
| q = ch.queue("test.overflow.reject", | |
| arguments: {"x-max-length" => 2, "x-overflow" => "reject-publish"}) | |
| q.purge | |
| ch.confirm_select | |
| results = [] | |
| 5.times do |i| | |
| begin | |
| q.publish("reject-msg-#{i}") | |
| ch.wait_for_confirm | |
| results << :confirmed | |
| rescue => e | |
| results << :nacked | |
| end | |
| end | |
| count = q.message_count | |
| puts " Published 5, max-length=2, overflow=reject-publish" | |
| puts " Results: #{results.inspect}" | |
| puts " message_count=#{count}" | |
| q.delete | |
| puts " PASS" | |
| end | |
| # --------------------------------------------------------------------------- | |
| # 8) Dead-letter on overflow (max-length with DLX) | |
| # --------------------------------------------------------------------------- | |
| separator "8. Overflow with dead-letter exchange" | |
| with_connection do |conn| | |
| ch = conn.channel | |
| # Set up DLX | |
| ch.exchange_declare("test.dlx", "fanout") | |
| dlq = ch.queue("test.dlq") | |
| dlq.bind("test.dlx", "") | |
| dlq.purge | |
| q = ch.queue("test.overflow.dlx", | |
| arguments: { | |
| "x-max-length" => 2, | |
| "x-dead-letter-exchange" => "test.dlx", | |
| }) | |
| q.purge | |
| 5.times { |i| q.publish("dlx-msg-#{i}") } | |
| sleep 0.3 | |
| main_count = q.message_count | |
| dlq_count = dlq.message_count | |
| puts " Published 5, max-length=2, DLX configured" | |
| puts " Main queue: #{main_count}, DLQ: #{dlq_count}" | |
| raise "expected main<=2, got #{main_count}" unless main_count <= 2 | |
| raise "expected dlq>=3, got #{dlq_count}" unless dlq_count >= 3 | |
| q.delete | |
| dlq.delete | |
| ch.exchange_delete("test.dlx") | |
| puts " PASS" | |
| end | |
| # --------------------------------------------------------------------------- | |
| # 9) TTLChange + DLX (message expires and lands in DLQ) | |
| # --------------------------------------------------------------------------- | |
| separator "9. Message TTL expiry with dead-letter exchange" | |
| with_connection do |conn| | |
| ch = conn.channel | |
| ch.exchange_declare("test.ttl.dlx", "fanout") | |
| dlq = ch.queue("test.ttl.dlq") | |
| dlq.bind("test.ttl.dlx", "") | |
| dlq.purge | |
| q = ch.queue("test.ttl.dlx.source", | |
| arguments: { | |
| "x-message-ttl" => 500, | |
| "x-dead-letter-exchange" => "test.ttl.dlx", | |
| }) | |
| q.purge | |
| 3.times { |i| q.publish("ttl-dlx-#{i}") } | |
| puts " Published 3 messages, TTL=500ms, DLX configured" | |
| sleep 1.0 | |
| main_count = q.message_count | |
| dlq_count = dlq.message_count | |
| puts " Main queue: #{main_count}, DLQ: #{dlq_count}" | |
| raise "expected main=0, got #{main_count}" unless main_count == 0 | |
| raise "expected dlq=3, got #{dlq_count}" unless dlq_count == 3 | |
| q.delete | |
| dlq.delete | |
| ch.exchange_delete("test.ttl.dlx") | |
| puts " PASS" | |
| end | |
| separator "All tests passed!" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment