Skip to content

Instantly share code, notes, and snippets.

@gmallard
Created July 22, 2019 11:35
Show Gist options
  • Save gmallard/0b1b6cc27d18c4c71b3649f5a3db409d to your computer and use it in GitHub Desktop.
Save gmallard/0b1b6cc27d18c4c71b3649f5a3db409d to your computer and use it in GitHub Desktop.
Client Program for Testing / Recreating Issue #142
# -*- encoding: utf-8 -*-
#
require 'stomp'
if ENV['STOMP_NOSSL']
use_ssl = false
$stderr.write "#{Time.now()} use_ssl=false\n"
else
urc = ENV['STOMP_RUBYCIPHERS'] ? true : false
# Certs
if ENV['STOMP_OSX']
pkey = "/Users/guyallard/gitwork/sslwork/2017-01/client.key"
pcert = "/Users/guyallard/gitwork/sslwork/2017-01/client.crt"
else # Linux
pkey = "/ad3/gma/ad3/sslwork/2017-01/client.key"
pcert = "/ad3/gma/ad3/sslwork/2017-01/client.crt"
end
use_ssl = Stomp::SSLParams.new(:key_file => pkey,
:cert_file => pcert,
:use_ruby_ciphers => urc,
:fsck => true)
$stderr.write "#{Time.now()} use_ssl=yes\n"
end
# Ports are:
# AMQ 61611 is SSL, noCLientAuth
# AMQ 61612 is SSL, ckientAuthRequired
# Apollo 62610 - no client auth
# Artemis 50612 clientAuth
# Artemis 50611 no clientAuth
use_port = ENV['STOMP_PORT'] ? ENV['STOMP_PORT'] : 61611
$stderr.write "PORT:#{use_port}\n"
reliable = ENV['STOMP_RELIABLE'] ? true : false
$stderr.write "#{Time.now()} RELIABLE:#{reliable}\n"
proto = ENV['STOMP_PROTOCOL'] ? ENV['STOMP_PROTOCOL'] : '1.1'
$stderr.write "#{Time.now()} PROTOCOL:#{proto}\n"
vhost = ENV['STOMP_HOST'] ? ENV['STOMP_HOST'] : 'localhost'
$stderr.write "#{Time.now()} VHOST:#{vhost}\n"
hash = { :hosts => [
{:login => "guest", :passcode => "guest", :host => vhost,
:port => use_port,
:ssl => use_ssl}, #
],
:connect_headers => {:host => "localhost", :"accept-version" => proto},
:parse_timeout => 60,
:sslctx_newparm => :TLSv1_2,
:reliable => reliable,
## :logger => mta_slogger,
## :nto_cmd_read => true,
}
tname = ENV['STOMP_DEST'] ? ENV['STOMP_DEST'] : '/topic/mtackv2'
$stderr.write "#{Time.now()} DEST:#{tname}\n"
num_threads = ENV['STOMP_NTH'] ? ENV['STOMP_NTH'].to_i : 1
num_threads = 1 if num_threads == 0
$stderr.write "#{Time.now()} num threads:#{num_threads}\n"
amode = ENV['STOMP_ACKMODE'] ? ENV['STOMP_ACKMODE'] : 'client-individual'
$stderr.write "#{Time.now()} ACKMODE:#{amode}\n"
subid = "unique.sub.id-#{355.0/113.0}-#{Time.now.to_f}"
sl_factor = 1.0
$stderr.write "#{Time.now()} sleep factor:#{sl_factor}\n"
main_thread = Thread::current()
$stderr.write "#{Time.now()} main thread:#{main_thread}\n"
tstart = Time.now.to_f
$stderr.write "#{Time.now()} Main start\n"
c = Stomp::Connection.new(hash)
$stderr.write "#{Time.now()} Connect done\n"
$stderr.write "Connection frame: #{c.connection_frame}\n"
sub_headers = {:id => subid, :ack => amode}
pre_fetch = ENV['STOMP_AMQPF'] ? ENV['STOMP_AMQPF'].to_i : 0
if pre_fetch > 0
sub_headers['activemq.prefetchSize'] = pre_fetch
end
$stderr.write "#{Time.now()} SUBSCRIBE Headers:#{sub_headers}\n"
ctrs = []
0.upto(num_threads-1) do |i|
ctrs << 0
end
Thread.abort_on_exception = true
#------------------------------------------------------------------------------#
c.subscribe tname, sub_headers
mlslt= 30.0
## =begin
tstart = Time.now.to_f
getters = 0.upto(num_threads-1).map do |i|
Thread.new("consumer_#{i}", i) do |name, getter_num|
$stderr.write "#{Time.now()} #{Thread::current()} number #{i} START\n"
tms = Time.now.to_f
tme = tms
loop do
telapsed = tms - tstart
tmelapsed = tme - tms
$stderr.write "#{Time.now()} #{Thread::current()} number #{i} RECEIVE #{telapsed} #{tmelapsed}\n"
tms = Time.now.to_f
m = c.receive
# Do work ......
if m.command == Stomp::CMD_ERROR
$stderr.write "#{Time.now()} ERROR - #{m}\n"
Thread::exit()
end
$stderr.write "#{Time.now()} #{Thread::current()} number #{i} got message: #{m.body}\n"
if c.protocol == "1.1"
c.ack(m.headers['message-id'],
{"subscription" => subid}) # 1.1
else
c.ack(m.headers['ack']) # 1.2
end
$stderr.write "#{Time.now()} #{Thread::current()} number #{i} ACK Done\n"
smdi = sprintf("%05d", m.headers['sng_msgnum'])
$stderr.write "ACKMID #{smdi}\n"
ctrs[i] += 1
Thread::pass()
nsl = sl_factor * (rand(100.0)/100.0) # Arbitrary
$stderr.write "#{Time.now()} #{Thread::current()} Sleep: #{name} - #{nsl}\n"
sleep nsl
tme = Time.now.to_f
end
end
end
### =end
#------------------------------------------------------------------------------#
loop do
sleep mlslt
$stderr.write "#{Time.now()} ... main loop start~#{Time.now}\n"
totmc = 0
0.upto(num_threads-1) do |i|
$stderr.write "#{Time.now()} ... Thread:#{i}, message count:#{ctrs[i]}\n"
totmc += ctrs[i]
end
$stderr.write "#{Time.now()} ... main loop end~#{Time.now}~#{totmc}\n"
end
## =end
#------------------------------------------------------------------------------#
c.disconnect
$stderr.write "#{Time.now()} Main end"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment