Skip to content

Instantly share code, notes, and snippets.

@paddor
Created December 24, 2015 10:25
Show Gist options
  • Select an option

  • Save paddor/4ad4e2452dd489791cf7 to your computer and use it in GitHub Desktop.

Select an option

Save paddor/4ad4e2452dd489791cf7 to your computer and use it in GitHub Desktop.
CZMQ: Hang when destroying Zactors
$ ./zactor_issue.rb
CZMQ::FFI::Zactor
new Zactor
main thread: #<Thread:0x007fb5208bc3b0 run>
zactor thread: #<Thread:0x007fb52087e678 run>
nil: processing messages
nil: waiting for next message ...
#<FFI::Pointer address=0x007fb52059e860>: zactor created.
instanciates
Finished in 0.00142 seconds (files took 0.13493 seconds to load)
1 example, 0 failures
#<FFI::Pointer address=0x007fb52059e860>: terminated!
finalizer: destroying zactor #<FFI::Pointer address=0x007fb52059e860> ...
# hangs until SIGTERM
^C^C^Cfinalizer: zactor #<FFI::Pointer address=0x007fb52059e860> destroyed.
E: 15-12-24 11:23:35 dangling 'PAIR' socket created at src/zsys.c:379
E: 15-12-24 11:23:35 dangling sockets: cannot terminate ZMQ safely
$ ./zactor_issue.rb
CZMQ::FFI::Zactor
new Zactor
main thread: #<Thread:0x007fa7508bc3c0 run>
zactor thread: #<Thread:0x007fa750824520 run>
nil: processing messages
nil: waiting for next message ...
#<FFI::Pointer address=0x007fa75241a190>: zactor created.
instanciates
#<<
main thread: #<Thread:0x007fa7508bc3c0 run>
zactor thread: #<Thread:0x007fa7518d8740 run>
nil: processing messages
nil: waiting for next message ...
#<FFI::Pointer address=0x007fa75241ef60>: zactor created.
#<FFI::Pointer address=0x007fa75241ef60>: in #<<: sending message: ["VERBOSE"]
#<FFI::Pointer address=0x007fa75241ef60>: in #<<: sending message: ["CMD", "arg1", "arg2"]
#<FFI::Pointer address=0x007fa75241ef60>: in #<<: sending message: ["SILENCE"]
#<FFI::Pointer address=0x007fa75241ef60>: got message: ["VERBOSE"]
#<FFI::Pointer address=0x007fa75241ef60>: waiting for next message ...
#<FFI::Pointer address=0x007fa75241ef60>: in #<<: sending message: ["$TERM"]
#<FFI::Pointer address=0x007fa75241ef60>: in #terminate: sent $TERM. waiting for death singal ...
#<FFI::Pointer address=0x007fa75241ef60>: got message: ["CMD", "arg1", "arg2"]
#<FFI::Pointer address=0x007fa75241ef60>: waiting for next message ...
#<FFI::Pointer address=0x007fa75241ef60>: got message: ["SILENCE"]
#<FFI::Pointer address=0x007fa75241ef60>: waiting for next message ...
#<FFI::Pointer address=0x007fa75241ef60>: got message: ["$TERM"]
#<FFI::Pointer address=0x007fa75241ef60>: got $TERM. terminating ...
#<FFI::Pointer address=0x007fa75241ef60>: terminated!
#<FFI::Pointer address=0x007fa75241ef60>: in #terminate: death singal received.
sends messages
main thread: #<Thread:0x007fa7508bc3c0 run>
zactor thread: #<Thread:0x007fa751a63ab0 run>
nil: processing messages
nil: waiting for next message ...
#<FFI::Pointer address=0x007fa75057e8a0>: zactor created.
#<FFI::Pointer address=0x007fa75057e8a0>: in #<<: sending message: ["foo"]
returns self
#terminate
main thread: #<Thread:0x007fa7508bc3c0 run>
#<FFI::Pointer address=0x007fa75057e8a0>: got message: ["foo"]
#<FFI::Pointer address=0x007fa75057e8a0>: waiting for next message ...
zactor thread: #<Thread:0x007fa751a60ea0 run>
nil: processing messages
nil: waiting for next message ...
#<FFI::Pointer address=0x007fa7504ccf60>: zactor created.
#<FFI::Pointer address=0x007fa7504ccf60>: in #<<: sending message: ["$TERM"]
#<FFI::Pointer address=0x007fa7504ccf60>: in #terminate: sent $TERM. waiting for death singal ...
#<FFI::Pointer address=0x007fa7504ccf60>: got message: ["$TERM"]
#<FFI::Pointer address=0x007fa7504ccf60>: got $TERM. terminating ...
#<FFI::Pointer address=0x007fa7504ccf60>: terminated!
#<FFI::Pointer address=0x007fa7504ccf60>: in #terminate: death singal received.
waits for death signal
Finished in 0.00445 seconds (files took 0.14009 seconds to load)
4 examples, 0 failures
#<FFI::Pointer address=0x007fa75057e8a0>: terminated!
#<FFI::Pointer address=0x007fa75241a190>: terminated!
finalizer: destroying zactor #<FFI::Pointer address=0x007fa7504ccf60> ...
finalizer: zactor #<FFI::Pointer address=0x007fa7504ccf60> destroyed.
finalizer: destroying zactor #<FFI::Pointer address=0x007fa75057e8a0> ...
# hangs until SIGTERM
^C^C^Cfinalizer: zactor #<FFI::Pointer address=0x007fa75057e8a0> destroyed.
finalizer: destroying zactor #<FFI::Pointer address=0x007fa75241ef60> ...
finalizer: zactor #<FFI::Pointer address=0x007fa75241ef60> destroyed.
finalizer: destroying zactor #<FFI::Pointer address=0x007fa75241a190> ...
# hangs until SIGTERM
^C^C^Cfinalizer: zactor #<FFI::Pointer address=0x007fa75241a190> destroyed.
E: 15-12-24 11:22:41 dangling 'PAIR' socket created at src/zsys.c:379
E: 15-12-24 11:22:41 dangling 'PAIR' socket created at src/zsys.c:379
E: 15-12-24 11:22:41 dangling sockets: cannot terminate ZMQ safely
#!/usr/bin/env rspec
# Requirements:
#
# * libzmq (tested with 4.1.3)
# * libczmq (tested with 3.0.2)
# * Ruby (tested with 2.2.4)
# * FFI (tested with 1.9.10)
# * RSpec (tested with 3.4.0)
# * Minitest (tested with 5.8.3)
require 'ffi'
require 'rspec'
module CZMQ
module FFI
module LibC
extend ::FFI::Library
ffi_lib ::FFI::Platform::LIBC
attach_function :free, [ :pointer ], :void, blocking: true
end
extend ::FFI::Library
lib_name = 'libczmq'
lib_paths = ['/usr/local/lib', '/opt/local/lib', '/usr/lib64']
.map { |path| "#{path}/#{lib_name}.#{::FFI::Platform::LIBSUFFIX}" }
ffi_lib lib_paths + [lib_name]
opts = {
blocking: true # only necessary on MRI to deal with the GIL.
}
attach_function :zsock_new, [:int], :pointer, **opts
attach_function :zsock_signal, [:pointer, :char], :int, **opts
attach_function :zsock_destroy, [:pointer], :void, **opts
attach_function :zsock_wait, [:pointer], :int, **opts
attach_function :zmsg_new, [], :pointer, **opts
attach_function :zmsg_recv, [:pointer], :pointer, **opts
attach_function :zmsg_destroy, [:pointer], :void, **opts
attach_function :zmsg_send, [:pointer, :pointer], :int, **opts
attach_function :zmsg_addstr, [:pointer, :string], :int, **opts
attach_function :zmsg_popstr, [:pointer], :pointer, **opts
attach_function :zactor_new, [:pointer, :pointer], :pointer, **opts
attach_function :zactor_destroy, [:pointer], :void, **opts
class Zsock
# Raised when one tries to use an instance of {Zsock} after
# the internal pointer to the native object has been nullified.
class DestroyedError < RuntimeError; end
# Boilerplate for self pointer, initializer, and finalizer
class << self
alias :__new :new
end
# Attaches the pointer _ptr_ to this instance and defines a finalizer for
# it if necessary.
# @param ptr [::FFI::Pointer]
# @param finalize [Boolean]
def initialize(ptr, finalize = true)
@ptr = ptr
if @ptr.null?
@ptr = nil # Remove null pointers so we don't have to test for them.
elsif finalize
@finalizer = self.class.create_finalizer_for @ptr
ObjectSpace.define_finalizer self, @finalizer
end
end
# @param ptr [::FFI::Pointer]
# @return [Proc]
def self.create_finalizer_for(ptr)
Proc.new do
ptr_ptr = ::FFI::MemoryPointer.new :pointer
ptr_ptr.write_pointer ptr
::CZMQ::FFI.zsock_destroy ptr_ptr
end
end
# @return [Boolean]
def null?
!@ptr or @ptr.null?
end
# Return internal pointer
# @return [::FFI::Pointer]
def __ptr
raise DestroyedError unless @ptr
@ptr
end
# So external Libraries can just pass the Object to a FFI function which expects a :pointer
alias_method :to_ptr, :__ptr
# Nullify internal pointer and return pointer pointer.
# @note This detaches the current instance from the native object
# and thus makes it unusable.
# @return [::FFI::MemoryPointer] the pointer pointing to a pointer
# pointing to the native object
def __ptr_give_ref
raise DestroyedError unless @ptr
ptr_ptr = ::FFI::MemoryPointer.new :pointer
ptr_ptr.write_pointer @ptr
__undef_finalizer if @finalizer
@ptr = nil
ptr_ptr
end
# Undefines the finalizer for this object.
# @note Only use this if you need to and can guarantee that the native
# object will be freed by other means.
# @return [void]
def __undef_finalizer
ObjectSpace.undefine_finalizer self
@finalizer = nil
end
# Create a new socket. Returns the new socket, or NULL if the new socket
# could not be created. Note that the symbol zsock_new (and other
# constructors/destructors for zsock) are redirected to the *_checked
# variant, enabling intelligent socket leak detection. This can have
# performance implications if you use a LOT of sockets. To turn off this
# redirection behaviour, define ZSOCK_NOCHECK.
# @param type [Integer, #to_int, #to_i]
# @return [CZMQ::Zsock]
def self.new(type)
type = Integer(type)
ptr = ::CZMQ::FFI.zsock_new(type)
__new ptr
end
# Send a signal over a socket. A signal is a short message carrying a
# success/failure code (by convention, 0 means OK). Signals are encoded
# to be distinguishable from "normal" messages. Accepts a zsock_t or a
# zactor_t argument, and returns 0 if successful, -1 if the signal could
# not be sent. Takes a polymorphic socket reference.
#
# @param status [Integer, #to_int, #to_i]
# @return [Integer]
def signal(status)
raise DestroyedError unless @ptr
self_p = @ptr
status = Integer(status)
result = ::CZMQ::FFI.zsock_signal(self_p, status)
result
end
end
class Zmsg
# Raised when one tries to use an instance of {Zmsg} after
# the internal pointer to the native object has been nullified.
class DestroyedError < RuntimeError; end
# Boilerplate for self pointer, initializer, and finalizer
class << self
alias :__new :new
end
# Attaches the pointer _ptr_ to this instance and defines a finalizer for
# it if necessary.
# @param ptr [::FFI::Pointer]
# @param finalize [Boolean]
def initialize(ptr, finalize = true)
@ptr = ptr
if @ptr.null?
@ptr = nil # Remove null pointers so we don't have to test for them.
elsif finalize
@finalizer = self.class.create_finalizer_for @ptr
ObjectSpace.define_finalizer self, @finalizer
end
end
# @param ptr [::FFI::Pointer]
# @return [Proc]
def self.create_finalizer_for(ptr)
Proc.new do
ptr_ptr = ::FFI::MemoryPointer.new :pointer
ptr_ptr.write_pointer ptr
::CZMQ::FFI.zmsg_destroy ptr_ptr
end
end
# @return [Boolean]
def null?
!@ptr or @ptr.null?
end
# Return internal pointer
# @return [::FFI::Pointer]
def __ptr
raise DestroyedError unless @ptr
@ptr
end
# So external Libraries can just pass the Object to a FFI function which expects a :pointer
alias_method :to_ptr, :__ptr
# Nullify internal pointer and return pointer pointer.
# @note This detaches the current instance from the native object
# and thus makes it unusable.
# @return [::FFI::MemoryPointer] the pointer pointing to a pointer
# pointing to the native object
def __ptr_give_ref
raise DestroyedError unless @ptr
ptr_ptr = ::FFI::MemoryPointer.new :pointer
ptr_ptr.write_pointer @ptr
__undef_finalizer if @finalizer
@ptr = nil
ptr_ptr
end
# Undefines the finalizer for this object.
# @note Only use this if you need to and can guarantee that the native
# object will be freed by other means.
# @return [void]
def __undef_finalizer
ObjectSpace.undefine_finalizer self
@finalizer = nil
end
# Create a new empty message object
# @return [CZMQ::Zmsg]
def self.new()
ptr = ::CZMQ::FFI.zmsg_new()
__new ptr
end
# Receive message from socket, returns zmsg_t object or NULL if the recv
# was interrupted. Does a blocking recv. If you want to not block then use
# the zloop class or zmsg_recv_nowait or zmq_poll to check for socket input
# before receiving.
# @param source [::FFI::Pointer, #to_ptr]
# @return [CZMQ::Zmsg]
def self.recv(source)
ptr = ::CZMQ::FFI.zmsg_recv(source)
__new ptr
end
# Send message to destination socket, and destroy the message after sending
# it successfully. If the message has no frames, sends nothing but destroys
# the message anyhow. Nullifies the caller's reference to the message (as
# it is a destructor).
#
# @param self_p [#__ptr_give_ref]
# @param dest [::FFI::Pointer, #to_ptr]
# @return [Integer]
def self.send(self_p, dest)
self_p = self_p.__ptr_give_ref
result = ::CZMQ::FFI.zmsg_send(self_p, dest)
result
end
# Push string as new frame to end of message.
# Returns 0 on success, -1 on error.
#
# @param string [String, #to_s, nil]
# @return [Integer]
def addstr(string)
raise DestroyedError unless @ptr
self_p = @ptr
result = ::CZMQ::FFI.zmsg_addstr(self_p, string)
result
end
# Pop frame off front of message.
# @return [String]
# @return [nil] if no more frames left
def popstr()
raise DestroyedError unless @ptr
str_p = ::CZMQ::FFI.zmsg_popstr(@ptr) # NOTE: fresh string
return nil if str_p.null?
str = str_p.read_string
::CZMQ::FFI::LibC.free(str_p)
return str
end
# @return [Array<String>] message parts
# @note This empties the message.
def parts
parts = []
while str = popstr
parts << str
end
return parts
end
end
class Zactor
# Raised when one tries to use an instance of {Zactor} after
# the internal pointer to the native object has been nullified.
class DestroyedError < RuntimeError; end
# Create a new callback of the following type:
# Actors get a pipe and arguments from caller
# typedef void (zactor_fn) (
# zsock_t *pipe, void *args);
#
# @note WARNING: If your Ruby code doesn't retain a reference to the
# FFI::Function object after passing it to a C function call,
# it may be garbage collected while C still holds the pointer,
# potentially resulting in a segmentation fault.
def self.fn
::FFI::Function.new :void, [:pointer, :pointer], blocking: true do |pipe, args|
pipe = Zsock.__new pipe, false
yield pipe, args
end
end
# @yieldparam msg_parts [Array<String>] all parts of received message
# @yieldparam pipe [CZMQ::FFI::Zsock] PAIR socket
def initialize
@running = true
@ptr_mtx = Mutex.new # mutex for zactor_t resource
@state_mtx = Mutex.new # mutex for state of this object, like @running
@callback = Zactor.fn do |pipe, _args|
puts "zactor thread: %p" % Thread.current
begin
pipe.signal(0) # handshake, so zactor_new() returns
warn "#{@ptr.inspect}: processing messages"
while true
warn "#{@ptr.inspect}: waiting for next message ..."
msg = Zmsg.recv(pipe)
if msg.null? # interrupted
warn "#{@ptr.inspect}: got interrupt. terminating ..."
break
end
msg_parts = msg.parts
warn "#{@ptr.inspect}: got message: #{msg_parts.inspect}"
if "$TERM" == msg_parts[0]
warn "#{@ptr.inspect}: got $TERM. terminating ..."
# NOTE: A dying actor implicitly sends signal 0.
break
end
yield msg_parts, pipe
end
rescue
puts "Zactor callback raised exception:"
p $!
ensure
warn "#{@ptr.inspect}: terminated!"
@state_mtx.synchronize do
@running = false
end
end
end
puts "main thread: %p" % Thread.current
@ptr = ::CZMQ::FFI.zactor_new(@callback, _args = nil)
raise "couldn't create actor" if @ptr.null?
warn "#{__ptr}: zactor created."
@finalizer = self.class.create_finalizer_for @ptr
ObjectSpace.define_finalizer self, @finalizer
end
# @param ptr [::FFI::Pointer]
# @return [Proc]
def self.create_finalizer_for(ptr)
Proc.new do
warn "finalizer: destroying zactor #{ptr} ... "
ptr_ptr = ::FFI::MemoryPointer.new :pointer
ptr_ptr.write_pointer ptr
::CZMQ::FFI.zactor_destroy ptr_ptr
warn "finalizer: zactor #{ptr} destroyed."
end
end
# @return [Boolean]
def null?
!@ptr or @ptr.null?
end
# Return internal pointer
# @return [::FFI::Pointer]
def __ptr
raise DestroyedError unless @ptr
@ptr
end
# So external Libraries can just pass the Object to a FFI function which
# expects a :pointer
alias_method :to_ptr, :__ptr
# Nullify internal pointer and return pointer pointer.
# @note This detaches the current instance from the native object
# and thus makes it unusable.
# @return [::FFI::MemoryPointer] the pointer pointing to a pointer
# pointing to the native object
def __ptr_give_ref
raise DestroyedError unless @ptr
ptr_ptr = ::FFI::MemoryPointer.new :pointer
ptr_ptr.write_pointer @ptr
__undef_finalizer if @finalizer
@ptr = nil
ptr_ptr
end
# Undefines the finalizer for this object.
# @note Only use this if you need to and can guarantee that the native
# object will be freed by other means.
# @return [void]
def __undef_finalizer
ObjectSpace.undefine_finalizer self
@finalizer = nil
end
# @param message_parts [Array<String>]
def <<(message_parts)
raise DestroyedError unless @ptr
warn "#{__ptr}: in #<<: sending message: #{message_parts.inspect}"
msg = Zmsg.new
message_parts.each { |part| msg.addstr(part) }
@ptr_mtx.synchronize do
Zmsg.send(msg, self)
end
self
end
# Tells the actor to terminate and waits for it.
def terminate
@state_mtx.synchronize do
raise "actor dead" if !@running
self << ["$TERM"]
@running = false
end
warn "#{__ptr}: in #terminate: sent $TERM. waiting for death singal ..."
wait
warn "#{__ptr}: in #terminate: death singal received."
end
# @return [Boolean] whether this actor has been terminated
def terminated?
@state_mtx.synchronize do
!@running
end
end
private
# Wait on a signal. Use this to coordinate between threads, over pipe
# pairs. Blocks until the signal is received. Returns -1 on error, 0 or
# greater on success. Accepts a zsock_t or a zactor_t as argument.
# Takes a polymorphic socket reference.
#
# @return [Integer]
def wait()
@ptr_mtx.synchronize do
raise DestroyedError unless @ptr
::CZMQ::FFI.zsock_wait(@ptr)
end
end
end
end
end
RSpec.configure do |config|
config.expect_with :minitest
end
describe CZMQ::FFI::Zactor do
context "new Zactor" do
subject do
CZMQ::FFI::Zactor.new do
# this block won't be called
end
end
it "instanciates" do
refute_nil subject
end
end
# describe "#<<" do
# subject do
# CZMQ::FFI::Zactor.new do |msg_parts, pipe|
# received_messages << msg_parts
# end
# end
# let(:received_messages) { [] }
# let(:messages) do
# [ %w[ VERBOSE ], %w[ CMD arg1 arg2 ], %w[ SILENCE ] ]
# end
#
# it "sends messages" do
# messages.each { |msg| subject << msg }
# subject.terminate
# assert_equal messages, received_messages
# end
#
# it "returns self" do # so it can be chained
# assert_same subject, subject << ["foo"]
# end
# end
#
# describe "#terminate" do
# subject do
# CZMQ::FFI::Zactor.new do
# # this block won't be called
# end
# end
#
# it "waits for death signal" do
# subject.terminate
# assert_operator subject, :terminated?
# end
# end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment