Created
July 22, 2019 11:35
-
-
Save gmallard/0b1b6cc27d18c4c71b3649f5a3db409d to your computer and use it in GitHub Desktop.
Client Program for Testing / Recreating Issue #142
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- encoding: utf-8 -*- | |
# | |
require 'stomp' | |
if ENV['STOMP_NOSSL'] | |
use_ssl = false | |
$stderr.write "#{Time.now()} use_ssl=false\n" | |
else | |
urc = ENV['STOMP_RUBYCIPHERS'] ? true : false | |
# Certs | |
if ENV['STOMP_OSX'] | |
pkey = "/Users/guyallard/gitwork/sslwork/2017-01/client.key" | |
pcert = "/Users/guyallard/gitwork/sslwork/2017-01/client.crt" | |
else # Linux | |
pkey = "/ad3/gma/ad3/sslwork/2017-01/client.key" | |
pcert = "/ad3/gma/ad3/sslwork/2017-01/client.crt" | |
end | |
use_ssl = Stomp::SSLParams.new(:key_file => pkey, | |
:cert_file => pcert, | |
:use_ruby_ciphers => urc, | |
:fsck => true) | |
$stderr.write "#{Time.now()} use_ssl=yes\n" | |
end | |
# Ports are: | |
# AMQ 61611 is SSL, noCLientAuth | |
# AMQ 61612 is SSL, ckientAuthRequired | |
# Apollo 62610 - no client auth | |
# Artemis 50612 clientAuth | |
# Artemis 50611 no clientAuth | |
use_port = ENV['STOMP_PORT'] ? ENV['STOMP_PORT'] : 61611 | |
$stderr.write "PORT:#{use_port}\n" | |
reliable = ENV['STOMP_RELIABLE'] ? true : false | |
$stderr.write "#{Time.now()} RELIABLE:#{reliable}\n" | |
proto = ENV['STOMP_PROTOCOL'] ? ENV['STOMP_PROTOCOL'] : '1.1' | |
$stderr.write "#{Time.now()} PROTOCOL:#{proto}\n" | |
vhost = ENV['STOMP_HOST'] ? ENV['STOMP_HOST'] : 'localhost' | |
$stderr.write "#{Time.now()} VHOST:#{vhost}\n" | |
hash = { :hosts => [ | |
{:login => "guest", :passcode => "guest", :host => vhost, | |
:port => use_port, | |
:ssl => use_ssl}, # | |
], | |
:connect_headers => {:host => "localhost", :"accept-version" => proto}, | |
:parse_timeout => 60, | |
:sslctx_newparm => :TLSv1_2, | |
:reliable => reliable, | |
## :logger => mta_slogger, | |
## :nto_cmd_read => true, | |
} | |
tname = ENV['STOMP_DEST'] ? ENV['STOMP_DEST'] : '/topic/mtackv2' | |
$stderr.write "#{Time.now()} DEST:#{tname}\n" | |
num_threads = ENV['STOMP_NTH'] ? ENV['STOMP_NTH'].to_i : 1 | |
num_threads = 1 if num_threads == 0 | |
$stderr.write "#{Time.now()} num threads:#{num_threads}\n" | |
amode = ENV['STOMP_ACKMODE'] ? ENV['STOMP_ACKMODE'] : 'client-individual' | |
$stderr.write "#{Time.now()} ACKMODE:#{amode}\n" | |
subid = "unique.sub.id-#{355.0/113.0}-#{Time.now.to_f}" | |
sl_factor = 1.0 | |
$stderr.write "#{Time.now()} sleep factor:#{sl_factor}\n" | |
main_thread = Thread::current() | |
$stderr.write "#{Time.now()} main thread:#{main_thread}\n" | |
tstart = Time.now.to_f | |
$stderr.write "#{Time.now()} Main start\n" | |
c = Stomp::Connection.new(hash) | |
$stderr.write "#{Time.now()} Connect done\n" | |
$stderr.write "Connection frame: #{c.connection_frame}\n" | |
sub_headers = {:id => subid, :ack => amode} | |
pre_fetch = ENV['STOMP_AMQPF'] ? ENV['STOMP_AMQPF'].to_i : 0 | |
if pre_fetch > 0 | |
sub_headers['activemq.prefetchSize'] = pre_fetch | |
end | |
$stderr.write "#{Time.now()} SUBSCRIBE Headers:#{sub_headers}\n" | |
ctrs = [] | |
0.upto(num_threads-1) do |i| | |
ctrs << 0 | |
end | |
Thread.abort_on_exception = true | |
#------------------------------------------------------------------------------# | |
c.subscribe tname, sub_headers | |
mlslt= 30.0 | |
## =begin | |
tstart = Time.now.to_f | |
getters = 0.upto(num_threads-1).map do |i| | |
Thread.new("consumer_#{i}", i) do |name, getter_num| | |
$stderr.write "#{Time.now()} #{Thread::current()} number #{i} START\n" | |
tms = Time.now.to_f | |
tme = tms | |
loop do | |
telapsed = tms - tstart | |
tmelapsed = tme - tms | |
$stderr.write "#{Time.now()} #{Thread::current()} number #{i} RECEIVE #{telapsed} #{tmelapsed}\n" | |
tms = Time.now.to_f | |
m = c.receive | |
# Do work ...... | |
if m.command == Stomp::CMD_ERROR | |
$stderr.write "#{Time.now()} ERROR - #{m}\n" | |
Thread::exit() | |
end | |
$stderr.write "#{Time.now()} #{Thread::current()} number #{i} got message: #{m.body}\n" | |
if c.protocol == "1.1" | |
c.ack(m.headers['message-id'], | |
{"subscription" => subid}) # 1.1 | |
else | |
c.ack(m.headers['ack']) # 1.2 | |
end | |
$stderr.write "#{Time.now()} #{Thread::current()} number #{i} ACK Done\n" | |
smdi = sprintf("%05d", m.headers['sng_msgnum']) | |
$stderr.write "ACKMID #{smdi}\n" | |
ctrs[i] += 1 | |
Thread::pass() | |
nsl = sl_factor * (rand(100.0)/100.0) # Arbitrary | |
$stderr.write "#{Time.now()} #{Thread::current()} Sleep: #{name} - #{nsl}\n" | |
sleep nsl | |
tme = Time.now.to_f | |
end | |
end | |
end | |
### =end | |
#------------------------------------------------------------------------------# | |
loop do | |
sleep mlslt | |
$stderr.write "#{Time.now()} ... main loop start~#{Time.now}\n" | |
totmc = 0 | |
0.upto(num_threads-1) do |i| | |
$stderr.write "#{Time.now()} ... Thread:#{i}, message count:#{ctrs[i]}\n" | |
totmc += ctrs[i] | |
end | |
$stderr.write "#{Time.now()} ... main loop end~#{Time.now}~#{totmc}\n" | |
end | |
## =end | |
#------------------------------------------------------------------------------# | |
c.disconnect | |
$stderr.write "#{Time.now()} Main end" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment