Skip to content

Instantly share code, notes, and snippets.

@vsmelov
Last active October 23, 2018 08:37
Show Gist options
  • Save vsmelov/eb79516097ca040a14dc4e0eec74a9be to your computer and use it in GitHub Desktop.
Save vsmelov/eb79516097ca040a14dc4e0eec74a9be to your computer and use it in GitHub Desktop.
How to close ZMQ Context and fail program on exception
import asyncio
import zmq.asyncio
import logging
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
sh = logging.StreamHandler()
sh.setLevel(logging.DEBUG)
logger.addHandler(sh)
CHANNEL = "tcp://localhost:10001"
class A:
async def __aenter__(self):
self.id = 'ee'
self.url = CHANNEL
self.context = zmq.asyncio.Context().instance()
self.socket = self.context.socket(zmq.DEALER)
self.socket.setsockopt(zmq.IDENTITY, self.id.encode())
logger.info(f'START: connect to {self.url}')
self.socket.connect(self.url)
logger.info(f'OK: connect to {self.url}')
return self
async def run(self):
await asyncio.gather(self.wait_and_fail(), self.listen())
async def listen(self):
while True:
logger.info(f'send request')
await self.socket.send(b'some request')
logger.info(f'r = await self.socket.recv_multipart()')
r = await self.socket.recv_multipart()
logger.info(f'r = {r}')
async def wait_and_fail(self):
await asyncio.sleep(1)
logger.info('raise ValueError')
raise ValueError
async def __aexit__(self, exc_type, exc_val, exc_tb):
logger.info('__aexit__')
# different approaches
if 0: # doesn't work
logger.info('self.socket.close()')
self.socket.close()
elif 0: # doesn't work
logger.info('self.socket.close()')
self.socket.close()
logger.info('self.context.destroy()')
self.context.destroy()
elif 0: # it works
logger.info('self.socket.close(linger=0)')
self.socket.close(linger=0)
logger.info('self.context.destroy()')
self.context.destroy()
elif 0: # it works
logger.info('self.socket.close(linger=0)')
self.socket.close(linger=0)
elif 1: # it works
logger.info('self.context.destroy(linger=0)')
self.context.destroy(linger=0)
elif 0: # doesn't work
logger.info('self.context.destroy()')
self.context.destroy()
elif 0: # doesn't work
logger.info('pass')
pass
async def main():
async with A() as a:
await a.run()
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment