Skip to content

Instantly share code, notes, and snippets.

@eode
Created November 2, 2018 04:54
Show Gist options
  • Save eode/b513888edaae903580da6fb3bbd87191 to your computer and use it in GitHub Desktop.
Save eode/b513888edaae903580da6fb3bbd87191 to your computer and use it in GitHub Desktop.
example execnet shared storage
from collections import MutableMapping, namedtuple
Message = namedtuple("Message", 'word, obj')
class ExecNetSharedStorageClient(MutableMapping):
def __init__(self, channel):
super().__init__()
self.ch = channel
def _call(self, word, *args, **kwargs):
self.ch.send((word, (args, kwargs)))
response = Message(*self.ch.receive())
if response.word == word:
return response.obj
elif response.word == 'KeyError':
raise KeyError(*response.obj)
elif response.word == 'ValueError':
raise ValueError(*response.obj)
elif response.word == 'Exception':
raise Exception(*response.obj)
raise RuntimeError("Unexpected response: {!r}".format(response))
def __delitem__(self, key):
return self._call('__delitem__', key)
def __getitem__(self, key):
return self._call('__getitem__', key)
def __iter__(self):
return iter(self._call('__iter__'))
def __len__(self):
return self._call('__len__')
def __setitem__(self, key, value):
return self._call('__setitem__', key, value)
class ExecNetSharedStorageServer(dict):
permitted_methods = {'__delitem__', '__getitem__', '__iter__', '__len__', '__setitem__'}
def __init__(self, channel, *args, **kwargs):
super().__init__(*args, **kwargs)
self.ch = channel
def listen(self):
self.ch.setcallback(self._callback)
def check(self, timeout=0):
message = self.ch.receive(timeout)
return self._callback(message)
def _callback(self, message):
word, obj = message
if word in self.permitted_methods:
func = getattr(self, word)
args, kwargs = obj
try:
value = func(*args, **kwargs)
if word == '__iter__':
value = tuple(value)
self.ch.send((word, value))
except (KeyError, ValueError) as err:
self.ch.send((type(err).__name__, err.args))
else:
ch.send(('Exception', ('not permitted', (word, obj))))
def cycle(self, forever=False):
# continue indefinitely
if forever:
while forever:
self.check(timeout=None)
# if not forever, only finish pending items
while True:
try:
self.check()
except self.ch.TimeoutError:
break
if __name__ == "__channelexec__":
# this module was imported and then run via gw.remote_exec(module)
# this is only done for testing. We'll act as the Mapping server.
while True:
servers = []
message = Message(*channel.receive())
if message.word == 'exit':
exit()
if message.word == 'new':
server = ExecNetSharedStorageServer(message.obj)
server.listen()
servers.append(server)
def test_client_server(gw, channel):
client = ExecNetSharedStorageClient(gw.newchannel())
channel.send(('new', client.ch))
return client
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment