Skip to content

Instantly share code, notes, and snippets.

@lukecampbell
Created August 6, 2012 14:38
Show Gist options
  • Save lukecampbell/3274911 to your computer and use it in GitHub Desktop.
Save lukecampbell/3274911 to your computer and use it in GitHub Desktop.
from pyon.util.log import log
from pyon.ion.stream import SimpleStreamSubscriber
from pyon.core.exception import BadRequest
import random
import gevent
class BastardSubscriber(object):
def __init__(self, container, exchange_name='bastard'):
self._sub = SimpleStreamSubscriber.new_subscriber(container,exchange_name,self.consume)
self.done = gevent.event.Event()
self.greenlet = None
def __del__(self):
self.stop()
self.close()
super(BastardSubscriber,self).__del__()
def consume(self,msg,header):
log.error('you should not be using consume')
def parse_message(self, message):
try:
if random.randint(0,4) == 4:
raise Exception("I'm a bastard, what'd you expect?")
if message.body == 'correct':
log.debug('Correct message')
message.ack()
else:
log.debug('Incorrect Message')
message.reject()
except (AttributeError,KeyError):
log.error('Message is crap')
log.error('Attempting to reject message')
try:
message.reject()
except:
log.critical('Failed to reject message: %s', message)
def start(self):
if self.greenlet is not None:
raise BadRequest('%s is already activated, please stop it first.'% self)
self._sub.initialize()
self.greenlet = gevent.spawn(self.activate)
def stop(self):
if self.greenlet is not None:
self.done.set()
self.greenlet.join(5)
self.greenlet = None
def close(self):
log.debug('Bastard closing')
self._sub.close()
def activate(self):
#--------------------------------------------------------------------------------
# CRITICAL SECTION
#--------------------------------------------------------------------------------
self.done.clear()
try:
while not self.done.is_set():
try:
message_pool = self._sub.get_n_msgs(1,0.2)
for msg in message_pool:
self.parse_message(msg)
except gevent.Timeout:
pass
except:
self.done.set()
log.exception('Bad shit happened')
self._sub.close()
self._sub._chan.close_impl()
#--------------------------------------------------------------------------------
# END CRITICAL SECTION
#--------------------------------------------------------------------------------
# IPython log file
from pyon.ion.stream import SimpleStreamSubscriber, SimpleStreamPublisher
def nop(*args):
pass
sub = SimpleStreamSubscriber.new_subscriber(cc,'sub',nop)
xn = cc.ex_manager.create_xn_queue('sub')
pub = SimpleStreamPublisher.new_publisher(cc,'test','nop')
xn.bind('nop.data', pub.exchange_point)
pub.publish('test_message')
sub.initialize()
sub.get_one_msg()
sub.close()
><> def nop(*args):
pass
...
><> from pyon.ion.stream import SimpleStreamSubscriber, SimpleStreamPublisher
><> sub = SimpleStreamSubscriber.new_subscriber(cc,'sub',nop)
2012-08-06 11:24:25,033 DEBUG pyon.ion.exchange ExchangeManager._create_xn: type: queue, name=sub, xs=NP (lukesys.ion.xs.ioncore,None,B: None), kwargs={}
2012-08-06 11:24:25,034 DEBUG pyon.datastore.couchdb.couchdb_datastore find_res_by_name(name=exchange_management, restype=Service)
2012-08-06 11:24:25,242 DEBUG pyon.datastore.couchdb.couchdb_datastore find_res_by_name() found 0 objects
2012-08-06 11:24:25,242 INFO pyon.ion.exchange ExchangeManager.declare_queue (queue lukesys.ion.xs.ioncore.sub, durable False, AD False)
2012-08-06 11:24:25,242 DEBUG pyon.ion.exchange ExchangeManager._ensure_default_declared, declaring default xs
2012-08-06 11:24:25,243 INFO pyon.ion.exchange ExchangeManager.declare_exchange lukesys.ion.xs.ioncore
2012-08-06 11:24:25,247 DEBUG pyon.net.messaging _next_channel_number: 6 (of 32763 possible, 5 used, 0 bad)
2012-08-06 11:24:25,248 DEBUG pyon.net.transport AMQPTransport.declare_exchange_impl(6): lukesys.ion.xs.ioncore, T topic, D False, AD True
2012-08-06 11:24:25,249 DEBUG pyon.net.transport AMQPTransport.declare_queue_impl(6): lukesys.ion.xs.ioncore.sub, D False, AD False
2012-08-06 11:24:25,250 WARNING pyon.net.endpoint ListeningBaseEndpoint: name param is deprecated, please use from_name instead
><> sub.initialize()
2012-08-06 11:24:27,350 DEBUG pyon.net.messaging NodeB.channel
2012-08-06 11:24:27,355 DEBUG pyon.net.messaging _next_channel_number: 7 (of 32762 possible, 6 used, 0 bad)
2012-08-06 11:24:27,359 DEBUG pyon.util.fsm FSM ATTACH -> ACTIVE (None)
2012-08-06 11:24:27,360 DEBUG pyon.ion.exchange ExchangeQueue.setup_listener: passing on binding
><> sub.close()
2012-08-06 11:24:33,756 DEBUG pyon.net.endpoint SimpleStreamSubscriber closed()
2012-08-06 11:24:33,757 DEBUG pyon.net.endpoint BaseEndpoint closed()
2012-08-06 11:24:33,757 DEBUG pyon.net.channel Channel close()
2012-08-06 11:24:33,757 DEBUG pyon.util.fsm FSM CLOSE -> CLOSED (<bound method SubscriberChannel._on_close of <pyon.net.channel.SubscriberChannel object at 0x105a0c650>>)
File "/Users/luke/Documents/Dev/virtenvs/sci/bin/ipython", line 8, in <module>
load_entry_point('ipython==0.13', 'console_scripts', 'ipython')()
File "/Users/luke/Documents/Dev/virtenvs/sci/lib/python2.7/site-packages/IPython/frontend/terminal/ipapp.py", line 389, in launch_new_instance
app.start()
File "/Users/luke/Documents/Dev/virtenvs/sci/lib/python2.7/site-packages/IPython/frontend/terminal/ipapp.py", line 363, in start
self.shell.mainloop()
File "/Users/luke/Documents/Dev/virtenvs/sci/lib/python2.7/site-packages/IPython/frontend/terminal/interactiveshell.py", line 467, in mainloop
self.interact(display_banner=display_banner)
File "/Users/luke/Documents/Dev/virtenvs/sci/lib/python2.7/site-packages/IPython/frontend/terminal/interactiveshell.py", line 586, in interact
self.run_cell(source_raw, store_history=True)
File "/Users/luke/Documents/Dev/virtenvs/sci/lib/python2.7/site-packages/IPython/core/interactiveshell.py", line 2617, in run_cell
interactivity=interactivity)
File "/Users/luke/Documents/Dev/virtenvs/sci/lib/python2.7/site-packages/IPython/core/interactiveshell.py", line 2690, in run_ast_nodes
if self.run_code(code):
File "/Users/luke/Documents/Dev/virtenvs/sci/lib/python2.7/site-packages/IPython/core/interactiveshell.py", line 2746, in run_code
exec code_obj in self.user_global_ns, self.user_ns
File "<ipython-input-5-a3dfbdb43827>", line 2, in <module>
scripts.pycc.entry()
File "/Users/luke/Documents/Dev/code/pyon/scripts/pycc.py", line 72, in entry
main(opts, *args, **kwargs)
File "/Users/luke/Documents/Dev/code/pyon/scripts/pycc.py", line 297, in main
do_work(container)
File "/Users/luke/Documents/Dev/code/pyon/scripts/pycc.py", line 263, in do_work
setup_ipython(get_shell_api(container))
File "/Users/luke/Documents/Dev/code/pyon/scripts/pycc.py", line 365, in setup_ipython
ipshell('Pyon - ION R2 CC interactive IPython shell. Type ionhelp() for help')
File "/Users/luke/Documents/Dev/virtenvs/sci/lib/python2.7/site-packages/IPython/frontend/terminal/embed.py", line 157, in __call__
self.mainloop(local_ns, module, stack_depth=stack_depth, global_ns=global_ns)
File "/Users/luke/Documents/Dev/virtenvs/sci/lib/python2.7/site-packages/IPython/frontend/terminal/embed.py", line 240, in mainloop
self.interact(display_banner=display_banner)
File "/Users/luke/Documents/Dev/virtenvs/sci/lib/python2.7/site-packages/IPython/frontend/terminal/interactiveshell.py", line 586, in interact
self.run_cell(source_raw, store_history=True)
File "/Users/luke/Documents/Dev/virtenvs/sci/lib/python2.7/site-packages/IPython/core/interactiveshell.py", line 2617, in run_cell
interactivity=interactivity)
File "/Users/luke/Documents/Dev/virtenvs/sci/lib/python2.7/site-packages/IPython/core/interactiveshell.py", line 2696, in run_ast_nodes
if self.run_code(code):
File "/Users/luke/Documents/Dev/virtenvs/sci/lib/python2.7/site-packages/IPython/core/interactiveshell.py", line 2746, in run_code
exec code_obj in self.user_global_ns, self.user_ns
File "<ipython-input-5-b1d1ff52cdec>", line 1, in <module>
sub.close()
File "/Users/luke/Documents/Dev/code/pyon/pyon/net/endpoint.py", line 630, in close
self._chan.close()
File "/Users/luke/Documents/Dev/code/pyon/pyon/net/channel.py", line 187, in close
self._fsm.process(self.I_CLOSE)
File "/Users/luke/Documents/Dev/code/pyon/pyon/util/fsm.py", line 195, in process
res = self.action(self)
File "/Users/luke/Documents/Dev/code/pyon/pyon/net/channel.py", line 191, in _on_close
traceback.print_stack()
2012-08-06 11:24:33,762 DEBUG pyon.net.channel Channel _on_close()
2012-08-06 11:24:33,763 DEBUG pyon.net.channel SubscriberChannel close_impl()
2012-08-06 11:24:33,763 DEBUG pyon.net.channel RecvChannel.close_impl (7)
2012-08-06 11:24:33,763 DEBUG pyon.net.channel BaseChannel.close_impl (7)
2012-08-06 11:24:33,763 DEBUG pyon.net.channel YAY!
2012-08-06 11:24:33,763 DEBUG pyon.net.channel BaseChannel.on_channel_close
channel number: 7
code: 0
text: Normal Shutdown
><> sub.initialize()
2012-08-06 11:24:51,319 DEBUG pyon.net.messaging NodeB.channel
2012-08-06 11:24:51,323 DEBUG pyon.net.messaging _next_channel_number: 7 (of 32762 possible, 6 used, 0 bad)
2012-08-06 11:24:51,329 DEBUG pyon.util.fsm FSM ATTACH -> ACTIVE (None)
2012-08-06 11:24:51,329 DEBUG pyon.ion.exchange ExchangeQueue.setup_listener: passing on binding
><> sub.get_one_msg()
2012-08-06 11:24:55,495 INFO pyon.ion.exchange ExchangeManager.qos
2012-08-06 11:24:55,495 DEBUG pyon.net.transport AMQPTransport.qos_impl(7): pf_size 0, pf_count 1, global_ False
2012-08-06 11:24:55,500 DEBUG pyon.net.channel accept: waiting for 1 msgs, timeout=None
2012-08-06 11:24:55,502 DEBUG pyon.util.fsm FSM ATTACH -> ACTIVE (None)
2012-08-06 11:24:55,503 DEBUG pyon.util.fsm FSM ENTER_ACCEPT -> ACCEPTED (None)
2012-08-06 11:24:55,503 DEBUG pyon.core.interceptor.encode EncodeInterceptor.incoming: <pyon.core.interceptor.interceptor.Invocation object at 0x105abf8d0>
2012-08-06 11:24:55,503 DEBUG pyon.core.interceptor.encode Pre-transform: ?test_message
2012-08-06 11:24:55,503 DEBUG pyon.core.interceptor.encode Post-transform: test_message
2012-08-06 11:24:55,503 DEBUG pyon.core.interceptor.codec CodecInterceptor.incoming: <pyon.core.interceptor.interceptor.Invocation object at 0x105abf8d0>
2012-08-06 11:24:55,504 DEBUG pyon.core.interceptor.codec Payload, pre-transform: test_message
2012-08-06 11:24:55,504 DEBUG pyon.core.interceptor.codec Payload, post-transform: test_message
2012-08-06 11:24:55,504 INFO pyon.net.endpoint MESSAGE RECV >>> RPC-request: ?() -> ###?### status=:
HEADERS: {'ts': '1344264172736', 'routing_key': 'nop.data'}
CONTENT: test_message
DELIVERY: tag=1
--> <pyon.net.endpoint.MessageObject at 0x105abf510>
><> sub.close()
2012-08-06 11:25:06,353 DEBUG pyon.net.endpoint SimpleStreamSubscriber closed()
2012-08-06 11:25:06,353 DEBUG pyon.net.endpoint BaseEndpoint closed()
2012-08-06 11:25:06,354 DEBUG pyon.net.channel Channel close()
2012-08-06 11:25:06,354 DEBUG pyon.util.fsm FSM CLOSE -> CLOSING (None)
><>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment