Created
August 6, 2012 14:38
-
-
Save lukecampbell/3274911 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyon.util.log import log | |
from pyon.ion.stream import SimpleStreamSubscriber | |
from pyon.core.exception import BadRequest | |
import random | |
import gevent | |
class BastardSubscriber(object): | |
def __init__(self, container, exchange_name='bastard'): | |
self._sub = SimpleStreamSubscriber.new_subscriber(container,exchange_name,self.consume) | |
self.done = gevent.event.Event() | |
self.greenlet = None | |
def __del__(self): | |
self.stop() | |
self.close() | |
super(BastardSubscriber,self).__del__() | |
def consume(self,msg,header): | |
log.error('you should not be using consume') | |
def parse_message(self, message): | |
try: | |
if random.randint(0,4) == 4: | |
raise Exception("I'm a bastard, what'd you expect?") | |
if message.body == 'correct': | |
log.debug('Correct message') | |
message.ack() | |
else: | |
log.debug('Incorrect Message') | |
message.reject() | |
except (AttributeError,KeyError): | |
log.error('Message is crap') | |
log.error('Attempting to reject message') | |
try: | |
message.reject() | |
except: | |
log.critical('Failed to reject message: %s', message) | |
def start(self): | |
if self.greenlet is not None: | |
raise BadRequest('%s is already activated, please stop it first.'% self) | |
self._sub.initialize() | |
self.greenlet = gevent.spawn(self.activate) | |
def stop(self): | |
if self.greenlet is not None: | |
self.done.set() | |
self.greenlet.join(5) | |
self.greenlet = None | |
def close(self): | |
log.debug('Bastard closing') | |
self._sub.close() | |
def activate(self): | |
#-------------------------------------------------------------------------------- | |
# CRITICAL SECTION | |
#-------------------------------------------------------------------------------- | |
self.done.clear() | |
try: | |
while not self.done.is_set(): | |
try: | |
message_pool = self._sub.get_n_msgs(1,0.2) | |
for msg in message_pool: | |
self.parse_message(msg) | |
except gevent.Timeout: | |
pass | |
except: | |
self.done.set() | |
log.exception('Bad shit happened') | |
self._sub.close() | |
self._sub._chan.close_impl() | |
#-------------------------------------------------------------------------------- | |
# END CRITICAL SECTION | |
#-------------------------------------------------------------------------------- | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# IPython log file | |
from pyon.ion.stream import SimpleStreamSubscriber, SimpleStreamPublisher | |
def nop(*args): | |
pass | |
sub = SimpleStreamSubscriber.new_subscriber(cc,'sub',nop) | |
xn = cc.ex_manager.create_xn_queue('sub') | |
pub = SimpleStreamPublisher.new_publisher(cc,'test','nop') | |
xn.bind('nop.data', pub.exchange_point) | |
pub.publish('test_message') | |
sub.initialize() | |
sub.get_one_msg() | |
sub.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
><> def nop(*args): | |
pass | |
... | |
><> from pyon.ion.stream import SimpleStreamSubscriber, SimpleStreamPublisher | |
><> sub = SimpleStreamSubscriber.new_subscriber(cc,'sub',nop) | |
2012-08-06 11:24:25,033 DEBUG pyon.ion.exchange ExchangeManager._create_xn: type: queue, name=sub, xs=NP (lukesys.ion.xs.ioncore,None,B: None), kwargs={} | |
2012-08-06 11:24:25,034 DEBUG pyon.datastore.couchdb.couchdb_datastore find_res_by_name(name=exchange_management, restype=Service) | |
2012-08-06 11:24:25,242 DEBUG pyon.datastore.couchdb.couchdb_datastore find_res_by_name() found 0 objects | |
2012-08-06 11:24:25,242 INFO pyon.ion.exchange ExchangeManager.declare_queue (queue lukesys.ion.xs.ioncore.sub, durable False, AD False) | |
2012-08-06 11:24:25,242 DEBUG pyon.ion.exchange ExchangeManager._ensure_default_declared, declaring default xs | |
2012-08-06 11:24:25,243 INFO pyon.ion.exchange ExchangeManager.declare_exchange lukesys.ion.xs.ioncore | |
2012-08-06 11:24:25,247 DEBUG pyon.net.messaging _next_channel_number: 6 (of 32763 possible, 5 used, 0 bad) | |
2012-08-06 11:24:25,248 DEBUG pyon.net.transport AMQPTransport.declare_exchange_impl(6): lukesys.ion.xs.ioncore, T topic, D False, AD True | |
2012-08-06 11:24:25,249 DEBUG pyon.net.transport AMQPTransport.declare_queue_impl(6): lukesys.ion.xs.ioncore.sub, D False, AD False | |
2012-08-06 11:24:25,250 WARNING pyon.net.endpoint ListeningBaseEndpoint: name param is deprecated, please use from_name instead | |
><> sub.initialize() | |
2012-08-06 11:24:27,350 DEBUG pyon.net.messaging NodeB.channel | |
2012-08-06 11:24:27,355 DEBUG pyon.net.messaging _next_channel_number: 7 (of 32762 possible, 6 used, 0 bad) | |
2012-08-06 11:24:27,359 DEBUG pyon.util.fsm FSM ATTACH -> ACTIVE (None) | |
2012-08-06 11:24:27,360 DEBUG pyon.ion.exchange ExchangeQueue.setup_listener: passing on binding | |
><> sub.close() | |
2012-08-06 11:24:33,756 DEBUG pyon.net.endpoint SimpleStreamSubscriber closed() | |
2012-08-06 11:24:33,757 DEBUG pyon.net.endpoint BaseEndpoint closed() | |
2012-08-06 11:24:33,757 DEBUG pyon.net.channel Channel close() | |
2012-08-06 11:24:33,757 DEBUG pyon.util.fsm FSM CLOSE -> CLOSED (<bound method SubscriberChannel._on_close of <pyon.net.channel.SubscriberChannel object at 0x105a0c650>>) | |
File "/Users/luke/Documents/Dev/virtenvs/sci/bin/ipython", line 8, in <module> | |
load_entry_point('ipython==0.13', 'console_scripts', 'ipython')() | |
File "/Users/luke/Documents/Dev/virtenvs/sci/lib/python2.7/site-packages/IPython/frontend/terminal/ipapp.py", line 389, in launch_new_instance | |
app.start() | |
File "/Users/luke/Documents/Dev/virtenvs/sci/lib/python2.7/site-packages/IPython/frontend/terminal/ipapp.py", line 363, in start | |
self.shell.mainloop() | |
File "/Users/luke/Documents/Dev/virtenvs/sci/lib/python2.7/site-packages/IPython/frontend/terminal/interactiveshell.py", line 467, in mainloop | |
self.interact(display_banner=display_banner) | |
File "/Users/luke/Documents/Dev/virtenvs/sci/lib/python2.7/site-packages/IPython/frontend/terminal/interactiveshell.py", line 586, in interact | |
self.run_cell(source_raw, store_history=True) | |
File "/Users/luke/Documents/Dev/virtenvs/sci/lib/python2.7/site-packages/IPython/core/interactiveshell.py", line 2617, in run_cell | |
interactivity=interactivity) | |
File "/Users/luke/Documents/Dev/virtenvs/sci/lib/python2.7/site-packages/IPython/core/interactiveshell.py", line 2690, in run_ast_nodes | |
if self.run_code(code): | |
File "/Users/luke/Documents/Dev/virtenvs/sci/lib/python2.7/site-packages/IPython/core/interactiveshell.py", line 2746, in run_code | |
exec code_obj in self.user_global_ns, self.user_ns | |
File "<ipython-input-5-a3dfbdb43827>", line 2, in <module> | |
scripts.pycc.entry() | |
File "/Users/luke/Documents/Dev/code/pyon/scripts/pycc.py", line 72, in entry | |
main(opts, *args, **kwargs) | |
File "/Users/luke/Documents/Dev/code/pyon/scripts/pycc.py", line 297, in main | |
do_work(container) | |
File "/Users/luke/Documents/Dev/code/pyon/scripts/pycc.py", line 263, in do_work | |
setup_ipython(get_shell_api(container)) | |
File "/Users/luke/Documents/Dev/code/pyon/scripts/pycc.py", line 365, in setup_ipython | |
ipshell('Pyon - ION R2 CC interactive IPython shell. Type ionhelp() for help') | |
File "/Users/luke/Documents/Dev/virtenvs/sci/lib/python2.7/site-packages/IPython/frontend/terminal/embed.py", line 157, in __call__ | |
self.mainloop(local_ns, module, stack_depth=stack_depth, global_ns=global_ns) | |
File "/Users/luke/Documents/Dev/virtenvs/sci/lib/python2.7/site-packages/IPython/frontend/terminal/embed.py", line 240, in mainloop | |
self.interact(display_banner=display_banner) | |
File "/Users/luke/Documents/Dev/virtenvs/sci/lib/python2.7/site-packages/IPython/frontend/terminal/interactiveshell.py", line 586, in interact | |
self.run_cell(source_raw, store_history=True) | |
File "/Users/luke/Documents/Dev/virtenvs/sci/lib/python2.7/site-packages/IPython/core/interactiveshell.py", line 2617, in run_cell | |
interactivity=interactivity) | |
File "/Users/luke/Documents/Dev/virtenvs/sci/lib/python2.7/site-packages/IPython/core/interactiveshell.py", line 2696, in run_ast_nodes | |
if self.run_code(code): | |
File "/Users/luke/Documents/Dev/virtenvs/sci/lib/python2.7/site-packages/IPython/core/interactiveshell.py", line 2746, in run_code | |
exec code_obj in self.user_global_ns, self.user_ns | |
File "<ipython-input-5-b1d1ff52cdec>", line 1, in <module> | |
sub.close() | |
File "/Users/luke/Documents/Dev/code/pyon/pyon/net/endpoint.py", line 630, in close | |
self._chan.close() | |
File "/Users/luke/Documents/Dev/code/pyon/pyon/net/channel.py", line 187, in close | |
self._fsm.process(self.I_CLOSE) | |
File "/Users/luke/Documents/Dev/code/pyon/pyon/util/fsm.py", line 195, in process | |
res = self.action(self) | |
File "/Users/luke/Documents/Dev/code/pyon/pyon/net/channel.py", line 191, in _on_close | |
traceback.print_stack() | |
2012-08-06 11:24:33,762 DEBUG pyon.net.channel Channel _on_close() | |
2012-08-06 11:24:33,763 DEBUG pyon.net.channel SubscriberChannel close_impl() | |
2012-08-06 11:24:33,763 DEBUG pyon.net.channel RecvChannel.close_impl (7) | |
2012-08-06 11:24:33,763 DEBUG pyon.net.channel BaseChannel.close_impl (7) | |
2012-08-06 11:24:33,763 DEBUG pyon.net.channel YAY! | |
2012-08-06 11:24:33,763 DEBUG pyon.net.channel BaseChannel.on_channel_close | |
channel number: 7 | |
code: 0 | |
text: Normal Shutdown | |
><> sub.initialize() | |
2012-08-06 11:24:51,319 DEBUG pyon.net.messaging NodeB.channel | |
2012-08-06 11:24:51,323 DEBUG pyon.net.messaging _next_channel_number: 7 (of 32762 possible, 6 used, 0 bad) | |
2012-08-06 11:24:51,329 DEBUG pyon.util.fsm FSM ATTACH -> ACTIVE (None) | |
2012-08-06 11:24:51,329 DEBUG pyon.ion.exchange ExchangeQueue.setup_listener: passing on binding | |
><> sub.get_one_msg() | |
2012-08-06 11:24:55,495 INFO pyon.ion.exchange ExchangeManager.qos | |
2012-08-06 11:24:55,495 DEBUG pyon.net.transport AMQPTransport.qos_impl(7): pf_size 0, pf_count 1, global_ False | |
2012-08-06 11:24:55,500 DEBUG pyon.net.channel accept: waiting for 1 msgs, timeout=None | |
2012-08-06 11:24:55,502 DEBUG pyon.util.fsm FSM ATTACH -> ACTIVE (None) | |
2012-08-06 11:24:55,503 DEBUG pyon.util.fsm FSM ENTER_ACCEPT -> ACCEPTED (None) | |
2012-08-06 11:24:55,503 DEBUG pyon.core.interceptor.encode EncodeInterceptor.incoming: <pyon.core.interceptor.interceptor.Invocation object at 0x105abf8d0> | |
2012-08-06 11:24:55,503 DEBUG pyon.core.interceptor.encode Pre-transform: ?test_message | |
2012-08-06 11:24:55,503 DEBUG pyon.core.interceptor.encode Post-transform: test_message | |
2012-08-06 11:24:55,503 DEBUG pyon.core.interceptor.codec CodecInterceptor.incoming: <pyon.core.interceptor.interceptor.Invocation object at 0x105abf8d0> | |
2012-08-06 11:24:55,504 DEBUG pyon.core.interceptor.codec Payload, pre-transform: test_message | |
2012-08-06 11:24:55,504 DEBUG pyon.core.interceptor.codec Payload, post-transform: test_message | |
2012-08-06 11:24:55,504 INFO pyon.net.endpoint MESSAGE RECV >>> RPC-request: ?() -> ###?### status=: | |
HEADERS: {'ts': '1344264172736', 'routing_key': 'nop.data'} | |
CONTENT: test_message | |
DELIVERY: tag=1 | |
--> <pyon.net.endpoint.MessageObject at 0x105abf510> | |
><> sub.close() | |
2012-08-06 11:25:06,353 DEBUG pyon.net.endpoint SimpleStreamSubscriber closed() | |
2012-08-06 11:25:06,353 DEBUG pyon.net.endpoint BaseEndpoint closed() | |
2012-08-06 11:25:06,354 DEBUG pyon.net.channel Channel close() | |
2012-08-06 11:25:06,354 DEBUG pyon.util.fsm FSM CLOSE -> CLOSING (None) | |
><> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment