Last active
April 12, 2016 04:00
-
-
Save curzona/7881941c222529e3058f2bf3bdd9a641 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
curzona@curzona-desktop:~/Desktop/hello_pytest_xdist$ py.test -v -d --tx socket=xxx.xxx.xxx.xxx:8888 --rsyncdir=. | |
========================================== test session starts =========================================== | |
platform linux2 -- Python 2.7.9, pytest-2.9.0, py-1.4.31, pluggy-0.3.1 -- /usr/bin/python | |
cachedir: .cache | |
rootdir: /home/curzona/Desktop/hello_pytest_xdist, inifile: pytest.ini | |
plugins: repeater-0.1.0, cov-2.1.0, xdist-1.12, bdd-2.16.0 | |
[gw0] win32 Python 2.7.11 cwd: C:\Users\temp\Desktop\pyexecnetcache | |
gw0 CINTERNALERROR> Traceback (most recent call last): | |
INTERNALERROR> File "/usr/local/lib/python2.7/dist-packages/_pytest/main.py", line 92, in wrap_session | |
INTERNALERROR> config.hook.pytest_sessionstart(session=session) | |
INTERNALERROR> File "/usr/local/lib/python2.7/dist-packages/_pytest/vendored_packages/pluggy.py", line 724, in __call__ | |
INTERNALERROR> return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs) | |
INTERNALERROR> File "/usr/local/lib/python2.7/dist-packages/_pytest/vendored_packages/pluggy.py", line 338, in _hookexec | |
INTERNALERROR> return self._inner_hookexec(hook, methods, kwargs) | |
INTERNALERROR> File "/usr/local/lib/python2.7/dist-packages/_pytest/vendored_packages/pluggy.py", line 333, in <lambda> | |
INTERNALERROR> _MultiCall(methods, kwargs, hook.spec_opts).execute() | |
INTERNALERROR> File "/usr/local/lib/python2.7/dist-packages/_pytest/vendored_packages/pluggy.py", line 596, in execute | |
INTERNALERROR> res = hook_impl.function(*args) | |
INTERNALERROR> File "/usr/local/lib/python2.7/dist-packages/xdist/dsession.py", line 489, in pytest_sessionstart | |
INTERNALERROR> nodes = self.nodemanager.setup_nodes(putevent=self.queue.put) | |
INTERNALERROR> File "/usr/local/lib/python2.7/dist-packages/xdist/slavemanage.py", line 45, in setup_nodes | |
INTERNALERROR> nodes.append(self.setup_node(spec, putevent)) | |
INTERNALERROR> File "/usr/local/lib/python2.7/dist-packages/xdist/slavemanage.py", line 51, in setup_node | |
INTERNALERROR> self.rsync_roots(gw) | |
INTERNALERROR> File "/usr/local/lib/python2.7/dist-packages/xdist/slavemanage.py", line 37, in rsync_roots | |
INTERNALERROR> self.rsync(gateway, root, **self.rsyncoptions) | |
INTERNALERROR> File "/usr/local/lib/python2.7/dist-packages/xdist/slavemanage.py", line 136, in rsync | |
INTERNALERROR> rsync.send() | |
INTERNALERROR> File "/usr/local/lib/python2.7/dist-packages/execnet/rsync.py", line 127, in send | |
INTERNALERROR> self._end_of_channel(channel) | |
INTERNALERROR> File "/usr/local/lib/python2.7/dist-packages/execnet/rsync.py", line 44, in _end_of_channel | |
INTERNALERROR> channel.waitclose() | |
INTERNALERROR> File "/usr/local/lib/python2.7/dist-packages/execnet/gateway_base.py", line 670, in waitclose | |
INTERNALERROR> raise error | |
INTERNALERROR> RemoteError: Traceback (most recent call last): | |
INTERNALERROR> File """" | |
INTERNALERROR> base execnet gateway code send to the other side for bootstrapping. | |
INTERNALERROR> | |
INTERNALERROR> NOTE: aims to be compatible to Python 2.5-3.X, Jython and IronPython | |
INTERNALERROR> | |
INTERNALERROR> (C) 2004-2013 Holger Krekel, Armin Rigo, Benjamin Peterson, Ronny Pfannschmidt and others | |
INTERNALERROR> """ | |
INTERNALERROR> from __future__ import with_statement | |
INTERNALERROR> import sys, os, weakref | |
INTERNALERROR> import traceback, struct | |
INTERNALERROR> | |
INTERNALERROR> # NOTE that we want to avoid try/except style importing | |
INTERNALERROR> # to avoid setting sys.exc_info() during import | |
INTERNALERROR> # | |
INTERNALERROR> | |
INTERNALERROR> ISPY3 = sys.version_info >= (3, 0) | |
INTERNALERROR> if ISPY3: | |
INTERNALERROR> from io import BytesIO | |
INTERNALERROR> exec("def do_exec(co, loc): exec(co, loc)\n" | |
INTERNALERROR> "def reraise(cls, val, tb): raise val\n") | |
INTERNALERROR> unicode = str | |
INTERNALERROR> _long_type = int | |
INTERNALERROR> from _thread import interrupt_main | |
INTERNALERROR> else: | |
INTERNALERROR> from StringIO import StringIO as BytesIO | |
INTERNALERROR> exec("def do_exec(co, loc): exec co in loc\n" | |
INTERNALERROR> "def reraise(cls, val, tb): raise cls, val, tb\n") | |
INTERNALERROR> bytes = str | |
INTERNALERROR> _long_type = long | |
INTERNALERROR> try: | |
INTERNALERROR> from thread import interrupt_main | |
INTERNALERROR> except ImportError: | |
INTERNALERROR> interrupt_main = None | |
INTERNALERROR> | |
INTERNALERROR> #f = open("/tmp/execnet-%s" % os.getpid(), "w") | |
INTERNALERROR> #def log_extra(*msg): | |
INTERNALERROR> # f.write(" ".join([str(x) for x in msg]) + "\n") | |
INTERNALERROR> | |
INTERNALERROR> | |
INTERNALERROR> class EmptySemaphore: | |
INTERNALERROR> acquire = release = lambda self: None | |
INTERNALERROR> | |
INTERNALERROR> def get_execmodel(backend): | |
INTERNALERROR> if hasattr(backend, "backend"): | |
INTERNALERROR> return backend | |
INTERNALERROR> if backend == "thread": | |
INTERNALERROR> importdef = { | |
INTERNALERROR> 'get_ident': ['thread::get_ident', '_thread::get_ident'], | |
INTERNALERROR> '_start_new_thread': ['thread::start_new_thread', | |
INTERNALERROR> '_thread::start_new_thread'], | |
INTERNALERROR> 'threading': ["threading",], | |
INTERNALERROR> 'queue': ["queue", "Queue"], | |
INTERNALERROR> 'sleep': ['time::sleep'], | |
INTERNALERROR> 'subprocess': ['subprocess'], | |
INTERNALERROR> 'socket': ['socket'], | |
INTERNALERROR> '_fdopen': ['os::fdopen'], | |
INTERNALERROR> '_lock': ['threading'], | |
INTERNALERROR> '_event': ['threading'], | |
INTERNALERROR> } | |
INTERNALERROR> def exec_start(self, func, args=()): | |
INTERNALERROR> self._start_new_thread(func, args) | |
INTERNALERROR> | |
INTERNALERROR> elif backend == "eventlet": | |
INTERNALERROR> importdef = { | |
INTERNALERROR> 'get_ident': ['eventlet.green.thread::get_ident'], | |
INTERNALERROR> '_spawn_n': ['eventlet::spawn_n'], | |
INTERNALERROR> 'threading': ['eventlet.green.threading'], | |
INTERNALERROR> 'queue': ["eventlet.queue"], | |
INTERNALERROR> 'sleep': ['eventlet::sleep'], | |
INTERNALERROR> 'subprocess': ['eventlet.green.subprocess'], | |
INTERNALERROR> 'socket': ['eventlet.green.socket'], | |
INTERNALERROR> '_fdopen': ['eventlet.green.os::fdopen'], | |
INTERNALERROR> '_lock': ['eventlet.green.threading'], | |
INTERNALERROR> '_event': ['eventlet.green.threading'], | |
INTERNALERROR> } | |
INTERNALERROR> def exec_start(self, func, args=()): | |
INTERNALERROR> self._spawn_n(func, *args) | |
INTERNALERROR> elif backend == "gevent": | |
INTERNALERROR> importdef = { | |
INTERNALERROR> 'get_ident': ['gevent.thread::get_ident'], | |
INTERNALERROR> '_spawn_n': ['gevent::spawn'], | |
INTERNALERROR> 'threading': ['threading'], | |
INTERNALERROR> 'queue': ["gevent.queue"], | |
INTERNALERROR> 'sleep': ['gevent::sleep'], | |
INTERNALERROR> 'subprocess': ['gevent.subprocess'], | |
INTERNALERROR> 'socket': ['gevent.socket'], | |
INTERNALERROR> # XXX | |
INTERNALERROR> '_fdopen': ['gevent.fileobject::FileObjectThread'], | |
INTERNALERROR> '_lock': ['gevent.lock'], | |
INTERNALERROR> '_event': ['gevent.event'], | |
INTERNALERROR> } | |
INTERNALERROR> def exec_start(self, func, args=()): | |
INTERNALERROR> self._spawn_n(func, *args) | |
INTERNALERROR> else: | |
INTERNALERROR> raise ValueError("unknown execmodel %r" %(backend,)) | |
INTERNALERROR> | |
INTERNALERROR> class ExecModel: | |
INTERNALERROR> def __init__(self, name): | |
INTERNALERROR> self._importdef = importdef | |
INTERNALERROR> self.backend = name | |
INTERNALERROR> self._count = 0 | |
INTERNALERROR> | |
INTERNALERROR> def __repr__(self): | |
INTERNALERROR> return "<ExecModel %r>" % self.backend | |
INTERNALERROR> | |
INTERNALERROR> def __getattr__(self, name): | |
INTERNALERROR> locs = self._importdef.get(name) | |
INTERNALERROR> if locs is None: | |
INTERNALERROR> raise AttributeError(name) | |
INTERNALERROR> for loc in locs: | |
INTERNALERROR> parts = loc.split("::") | |
INTERNALERROR> loc = parts.pop(0) | |
INTERNALERROR> try: | |
INTERNALERROR> mod = __import__(loc, None, None, "__doc__") | |
INTERNALERROR> except ImportError: | |
INTERNALERROR> pass | |
INTERNALERROR> else: | |
INTERNALERROR> if parts: | |
INTERNALERROR> mod = getattr(mod, parts[0]) | |
INTERNALERROR> setattr(self, name, mod) | |
INTERNALERROR> return mod | |
INTERNALERROR> raise AttributeError(name) | |
INTERNALERROR> | |
INTERNALERROR> start = exec_start | |
INTERNALERROR> | |
INTERNALERROR> def fdopen(self, fd, mode, bufsize=1): | |
INTERNALERROR> return self._fdopen(fd, mode, bufsize) | |
INTERNALERROR> | |
INTERNALERROR> def WorkerPool(self, hasprimary=False): | |
INTERNALERROR> return WorkerPool(self, hasprimary=hasprimary) | |
INTERNALERROR> | |
INTERNALERROR> def Semaphore(self, size=None): | |
INTERNALERROR> if size is None: | |
INTERNALERROR> return EmptySemaphore() | |
INTERNALERROR> return self._lock.Semaphore(size) | |
INTERNALERROR> | |
INTERNALERROR> def Lock(self): | |
INTERNALERROR> return self._lock.RLock() | |
INTERNALERROR> | |
INTERNALERROR> def RLock(self): | |
INTERNALERROR> return self._lock.RLock() | |
INTERNALERROR> | |
INTERNALERROR> def Event(self): | |
INTERNALERROR> event = self._event.Event() | |
INTERNALERROR> if sys.version_info < (2,7): | |
INTERNALERROR> # patch wait function to return event state instead of None | |
INTERNALERROR> real_wait = event.wait | |
INTERNALERROR> def wait(timeout=None): | |
INTERNALERROR> real_wait(timeout=timeout) | |
INTERNALERROR> return event.isSet() | |
INTERNALERROR> event.wait = wait | |
INTERNALERROR> return event | |
INTERNALERROR> | |
INTERNALERROR> def PopenPiped(self, args): | |
INTERNALERROR> PIPE = self.subprocess.PIPE | |
INTERNALERROR> return self.subprocess.Popen(args, stdout=PIPE, stdin=PIPE) | |
INTERNALERROR> | |
INTERNALERROR> | |
INTERNALERROR> return ExecModel(backend) | |
INTERNALERROR> | |
INTERNALERROR> | |
INTERNALERROR> class Reply(object): | |
INTERNALERROR> """ reply instances provide access to the result | |
INTERNALERROR> of a function execution that got dispatched | |
INTERNALERROR> through WorkerPool.spawn() | |
INTERNALERROR> """ | |
INTERNALERROR> def __init__(self, task, threadmodel): | |
INTERNALERROR> self.task = task | |
INTERNALERROR> self._result_ready = threadmodel.Event() | |
INTERNALERROR> self.running = True | |
INTERNALERROR> | |
INTERNALERROR> def get(self, timeout=None): | |
INTERNALERROR> """ get the result object from an asynchronous function execution. | |
INTERNALERROR> if the function execution raised an exception, | |
INTERNALERROR> then calling get() will reraise that exception | |
INTERNALERROR> including its traceback. | |
INTERNALERROR> """ | |
INTERNALERROR> self.waitfinish(timeout) | |
INTERNALERROR> try: | |
INTERNALERROR> return self._result | |
INTERNALERROR> except AttributeError: | |
INTERNALERROR> reraise(*(self._excinfo[:3])) # noqa | |
INTERNALERROR> | |
INTERNALERROR> def waitfinish(self, timeout=None): | |
INTERNALERROR> if not self._result_ready.wait(timeout): | |
INTERNALERROR> raise IOError("timeout waiting for %r" %(self.task, )) | |
INTERNALERROR> | |
INTERNALERROR> def run(self): | |
INTERNALERROR> func, args, kwargs = self.task | |
INTERNALERROR> try: | |
INTERNALERROR> try: | |
INTERNALERROR> self._result = func(*args, **kwargs) | |
INTERNALERROR> except: | |
INTERNALERROR> self._excinfo = sys.exc_info() | |
INTERNALERROR> finally: | |
INTERNALERROR> self._result_ready.set() | |
INTERNALERROR> self.running = False | |
INTERNALERROR> | |
INTERNALERROR> | |
INTERNALERROR> class WorkerPool(object): | |
INTERNALERROR> """ A WorkerPool allows to spawn function executions | |
INTERNALERROR> to threads, returning a reply object on which you | |
INTERNALERROR> can ask for the result (and get exceptions reraised). | |
INTERNALERROR> | |
INTERNALERROR> This implementation allows the main thread to integrate | |
INTERNALERROR> itself into performing function execution through | |
INTERNALERROR> calling integrate_as_primary_thread() which will return | |
INTERNALERROR> when the pool received a trigger_shutdown(). | |
INTERNALERROR> """ | |
INTERNALERROR> def __init__(self, execmodel, hasprimary=False): | |
INTERNALERROR> """ by default allow unlimited number of spawns. """ | |
INTERNALERROR> self.execmodel = execmodel | |
INTERNALERROR> self._running_lock = self.execmodel.Lock() | |
INTERNALERROR> self._running = set() | |
INTERNALERROR> self._shuttingdown = False | |
INTERNALERROR> self._waitall_events = [] | |
INTERNALERROR> if hasprimary: | |
INTERNALERROR> if self.execmodel.backend != "thread": | |
INTERNALERROR> raise ValueError("hasprimary=True requires thread model") | |
INTERNALERROR> self._primary_thread_task_ready = self.execmodel.Event() | |
INTERNALERROR> else: | |
INTERNALERROR> self._primary_thread_task_ready = None | |
INTERNALERROR> | |
INTERNALERROR> def integrate_as_primary_thread(self): | |
INTERNALERROR> """ integrate the thread with which we are called as a primary | |
INTERNALERROR> thread for executing functions triggered with spawn(). | |
INTERNALERROR> """ | |
INTERNALERROR> assert self.execmodel.backend == "thread", self.execmodel | |
INTERNALERROR> primary_thread_task_ready = self._primary_thread_task_ready | |
INTERNALERROR> # interacts with code at REF1 | |
INTERNALERROR> while 1: | |
INTERNALERROR> primary_thread_task_ready.wait() | |
INTERNALERROR> reply = self._primary_thread_task | |
INTERNALERROR> if reply is None: # trigger_shutdown() woke us up | |
INTERNALERROR> break | |
INTERNALERROR> self._perform_spawn(reply) | |
INTERNALERROR> # we are concurrent with trigger_shutdown and spawn | |
INTERNALERROR> with self._running_lock: | |
INTERNALERROR> if self._shuttingdown: | |
INTERNALERROR> break | |
INTERNALERROR> primary_thread_task_ready.clear() | |
INTERNALERROR> | |
INTERNALERROR> def trigger_shutdown(self): | |
INTERNALERROR> with self._running_lock: | |
INTERNALERROR> self._shuttingdown = True | |
INTERNALERROR> if self._primary_thread_task_ready is not None: | |
INTERNALERROR> self._primary_thread_task = None | |
INTERNALERROR> self._primary_thread_task_ready.set() | |
INTERNALERROR> | |
INTERNALERROR> def active_count(self): | |
INTERNALERROR> return len(self._running) | |
INTERNALERROR> | |
INTERNALERROR> def _perform_spawn(self, reply): | |
INTERNALERROR> reply.run() | |
INTERNALERROR> with self._running_lock: | |
INTERNALERROR> self._running.remove(reply) | |
INTERNALERROR> if not self._running: | |
INTERNALERROR> while self._waitall_events: | |
INTERNALERROR> waitall_event = self._waitall_events.pop() | |
INTERNALERROR> waitall_event.set() | |
INTERNALERROR> | |
INTERNALERROR> def _try_send_to_primary_thread(self, reply): | |
INTERNALERROR> # REF1 in 'thread' model we give priority to running in main thread | |
INTERNALERROR> # note that we should be called with _running_lock hold | |
INTERNALERROR> primary_thread_task_ready = self._primary_thread_task_ready | |
INTERNALERROR> if primary_thread_task_ready is not None: | |
INTERNALERROR> if not primary_thread_task_ready.isSet(): | |
INTERNALERROR> self._primary_thread_task = reply | |
INTERNALERROR> # wake up primary thread | |
INTERNALERROR> primary_thread_task_ready.set() | |
INTERNALERROR> return True | |
INTERNALERROR> return False | |
INTERNALERROR> | |
INTERNALERROR> def spawn(self, func, *args, **kwargs): | |
INTERNALERROR> """ return Reply object for the asynchronous dispatch | |
INTERNALERROR> of the given func(*args, **kwargs). | |
INTERNALERROR> """ | |
INTERNALERROR> reply = Reply((func, args, kwargs), self.execmodel) | |
INTERNALERROR> with self._running_lock: | |
INTERNALERROR> if self._shuttingdown: | |
INTERNALERROR> raise ValueError("pool is shutting down") | |
INTERNALERROR> self._running.add(reply) | |
INTERNALERROR> if not self._try_send_to_primary_thread(reply): | |
INTERNALERROR> self.execmodel.start(self._perform_spawn, (reply,)) | |
INTERNALERROR> return reply | |
INTERNALERROR> | |
INTERNALERROR> def terminate(self, timeout=None): | |
INTERNALERROR> """ trigger shutdown and wait for completion of all executions. """ | |
INTERNALERROR> self.trigger_shutdown() | |
INTERNALERROR> return self.waitall(timeout=timeout) | |
INTERNALERROR> | |
INTERNALERROR> def waitall(self, timeout=None): | |
INTERNALERROR> """ wait until all active spawns have finished executing. """ | |
INTERNALERROR> with self._running_lock: | |
INTERNALERROR> if not self._running: | |
INTERNALERROR> return True | |
INTERNALERROR> # if a Reply still runs, we let run_and_release | |
INTERNALERROR> # signal us -- note that we are still holding the | |
INTERNALERROR> # _running_lock to avoid race conditions | |
INTERNALERROR> my_waitall_event = self.execmodel.Event() | |
INTERNALERROR> self._waitall_events.append(my_waitall_event) | |
INTERNALERROR> return my_waitall_event.wait(timeout=timeout) | |
INTERNALERROR> | |
INTERNALERROR> | |
INTERNALERROR> sysex = (KeyboardInterrupt, SystemExit) | |
INTERNALERROR> | |
INTERNALERROR> | |
INTERNALERROR> DEBUG = os.environ.get('EXECNET_DEBUG') | |
INTERNALERROR> pid = os.getpid() | |
INTERNALERROR> if DEBUG == '2': | |
INTERNALERROR> def trace(*msg): | |
INTERNALERROR> try: | |
INTERNALERROR> line = " ".join(map(str, msg)) | |
INTERNALERROR> sys.stderr.write("[%s] %s\n" % (pid, line)) | |
INTERNALERROR> sys.stderr.flush() | |
INTERNALERROR> except Exception: | |
INTERNALERROR> pass # nothing we can do, likely interpreter-shutdown | |
INTERNALERROR> elif DEBUG: | |
INTERNALERROR> import tempfile, os.path | |
INTERNALERROR> fn = os.path.join(tempfile.gettempdir(), 'execnet-debug-%d' % pid) | |
INTERNALERROR> #sys.stderr.write("execnet-debug at %r" %(fn,)) | |
INTERNALERROR> debugfile = open(fn, 'w') | |
INTERNALERROR> def trace(*msg): | |
INTERNALERROR> try: | |
INTERNALERROR> line = " ".join(map(str, msg)) | |
INTERNALERROR> debugfile.write(line + "\n") | |
INTERNALERROR> debugfile.flush() | |
INTERNALERROR> except Exception: | |
INTERNALERROR> try: | |
INTERNALERROR> v = sys.exc_info()[1] | |
INTERNALERROR> sys.stderr.write( | |
INTERNALERROR> "[%s] exception during tracing: %r\n" % (pid, v)) | |
INTERNALERROR> except Exception: | |
INTERNALERROR> pass # nothing we can do, likely interpreter-shutdown | |
INTERNALERROR> else: | |
INTERNALERROR> notrace = trace = lambda *msg: None | |
INTERNALERROR> | |
INTERNALERROR> class Popen2IO: | |
INTERNALERROR> error = (IOError, OSError, EOFError) | |
INTERNALERROR> | |
INTERNALERROR> def __init__(self, outfile, infile, execmodel): | |
INTERNALERROR> # we need raw byte streams | |
INTERNALERROR> self.outfile, self.infile = outfile, infile | |
INTERNALERROR> if sys.platform == "win32": | |
INTERNALERROR> import msvcrt | |
INTERNALERROR> try: | |
INTERNALERROR> msvcrt.setmode(infile.fileno(), os.O_BINARY) | |
INTERNALERROR> msvcrt.setmode(outfile.fileno(), os.O_BINARY) | |
INTERNALERROR> except (AttributeError, IOError): | |
INTERNALERROR> pass | |
INTERNALERROR> self._read = getattr(infile, "buffer", infile).read | |
INTERNALERROR> self._write = getattr(outfile, "buffer", outfile).write | |
INTERNALERROR> self.execmodel = execmodel | |
INTERNALERROR> | |
INTERNALERROR> def read(self, numbytes): | |
INTERNALERROR> """Read exactly 'numbytes' bytes from the pipe. """ | |
INTERNALERROR> # a file in non-blocking mode may return less bytes, so we loop | |
INTERNALERROR> buf = bytes() | |
INTERNALERROR> while numbytes > len(buf): | |
INTERNALERROR> data = self._read(numbytes-len(buf)) | |
INTERNALERROR> if not data: | |
INTERNALERROR> raise EOFError("expected %d bytes, got %d" %(numbytes, len(buf))) | |
INTERNALERROR> buf += data | |
INTERNALERROR> return buf | |
INTERNALERROR> | |
INTERNALERROR> def write(self, data): | |
INTERNALERROR> """write out all data bytes. """ | |
INTERNALERROR> assert isinstance(data, bytes) | |
INTERNALERROR> self._write(data) | |
INTERNALERROR> self.outfile.flush() | |
INTERNALERROR> | |
INTERNALERROR> def close_read(self): | |
INTERNALERROR> self.infile.close() | |
INTERNALERROR> | |
INTERNALERROR> def close_write(self): | |
INTERNALERROR> self.outfile.close() | |
INTERNALERROR> | |
INTERNALERROR> class Message: | |
INTERNALERROR> """ encapsulates Messages and their wire protocol. """ | |
INTERNALERROR> _types = [] | |
INTERNALERROR> | |
INTERNALERROR> def __init__(self, msgcode, channelid=0, data=''): | |
INTERNALERROR> self.msgcode = msgcode | |
INTERNALERROR> self.channelid = channelid | |
INTERNALERROR> self.data = data | |
INTERNALERROR> | |
INTERNALERROR> @staticmethod | |
INTERNALERROR> def from_io(io): | |
INTERNALERROR> try: | |
INTERNALERROR> header = io.read(9) # type 1, channel 4, payload 4 | |
INTERNALERROR> if not header: | |
INTERNALERROR> raise EOFError("empty read") | |
INTERNALERROR> except EOFError: | |
INTERNALERROR> e = sys.exc_info()[1] | |
INTERNALERROR> raise EOFError('couldnt load message header, ' + e.args[0]) | |
INTERNALERROR> msgtype, channel, payload = struct.unpack('!bii', header) | |
INTERNALERROR> return Message(msgtype, channel, io.read(payload)) | |
INTERNALERROR> | |
INTERNALERROR> def to_io(self, io): | |
INTERNALERROR> header = struct.pack('!bii', self.msgcode, self.channelid, | |
INTERNALERROR> len(self.data)) | |
INTERNALERROR> io.write(header+self.data) | |
INTERNALERROR> | |
INTERNALERROR> def received(self, gateway): | |
INTERNALERROR> self._types[self.msgcode](self, gateway) | |
INTERNALERROR> | |
INTERNALERROR> def __repr__(self): | |
INTERNALERROR> name = self._types[self.msgcode].__name__.upper() | |
INTERNALERROR> return "<Message %s channel=%s lendata=%s>" %( | |
INTERNALERROR> name, self.channelid, len(self.data)) | |
INTERNALERROR> | |
INTERNALERROR> class GatewayReceivedTerminate(Exception): | |
INTERNALERROR> """ Receiverthread got termination message. """ | |
INTERNALERROR> | |
INTERNALERROR> def _setupmessages(): | |
INTERNALERROR> def status(message, gateway): | |
INTERNALERROR> # we use the channelid to send back information | |
INTERNALERROR> # but don't instantiate a channel object | |
INTERNALERROR> d = {'numchannels': len(gateway._channelfactory._channels), | |
INTERNALERROR> 'numexecuting': gateway._execpool.active_count(), | |
INTERNALERROR> 'execmodel': gateway.execmodel.backend, | |
INTERNALERROR> } | |
INTERNALERROR> gateway._send(Message.CHANNEL_DATA, message.channelid, | |
INTERNALERROR> dumps_internal(d)) | |
INTERNALERROR> gateway._send(Message.CHANNEL_CLOSE, message.channelid) | |
INTERNALERROR> | |
INTERNALERROR> def channel_exec(message, gateway): | |
INTERNALERROR> channel = gateway._channelfactory.new(message.channelid) | |
INTERNALERROR> gateway._local_schedulexec(channel=channel,sourcetask=message.data) | |
INTERNALERROR> | |
INTERNALERROR> def channel_data(message, gateway): | |
INTERNALERROR> gateway._channelfactory._local_receive(message.channelid, message.data) | |
INTERNALERROR> | |
INTERNALERROR> def channel_close(message, gateway): | |
INTERNALERROR> gateway._channelfactory._local_close(message.channelid) | |
INTERNALERROR> | |
INTERNALERROR> def channel_close_error(message, gateway): | |
INTERNALERROR> remote_error = RemoteError(loads_internal(message.data)) | |
INTERNALERROR> gateway._channelfactory._local_close(message.channelid, remote_error) | |
INTERNALERROR> | |
INTERNALERROR> def channel_last_message(message, gateway): | |
INTERNALERROR> gateway._channelfactory._local_close(message.channelid, sendonly=True) | |
INTERNALERROR> | |
INTERNALERROR> def gateway_terminate(message, gateway): | |
INTERNALERROR> raise GatewayReceivedTerminate(gateway) | |
INTERNALERROR> | |
INTERNALERROR> def reconfigure(message, gateway): | |
INTERNALERROR> if message.channelid == 0: | |
INTERNALERROR> target = gateway | |
INTERNALERROR> else: | |
INTERNALERROR> target = gateway._channelfactory.new(message.channelid) | |
INTERNALERROR> target._strconfig = loads_internal(message.data, gateway) | |
INTERNALERROR> | |
INTERNALERROR> types = [ | |
INTERNALERROR> status, reconfigure, gateway_terminate, | |
INTERNALERROR> channel_exec, channel_data, channel_close, | |
INTERNALERROR> channel_close_error, channel_last_message, | |
INTERNALERROR> ] | |
INTERNALERROR> for i, handler in enumerate(types): | |
INTERNALERROR> Message._types.append(handler) | |
INTERNALERROR> setattr(Message, handler.__name__.upper(), i) | |
INTERNALERROR> | |
INTERNALERROR> _setupmessages() | |
INTERNALERROR> | |
INTERNALERROR> def geterrortext(excinfo, | |
INTERNALERROR> format_exception=traceback.format_exception, sysex=sysex): | |
INTERNALERROR> try: | |
INTERNALERROR> l = format_exception(*excinfo) | |
INTERNALERROR> errortext = "".join(l) | |
INTERNALERROR> except sysex: | |
INTERNALERROR> raise | |
INTERNALERROR> except: | |
INTERNALERROR> errortext = '%s: %s' % (excinfo[0].__name__, | |
INTERNALERROR> excinfo[1]) | |
INTERNALERROR> return errortext | |
INTERNALERROR> | |
INTERNALERROR> class RemoteError(Exception): | |
INTERNALERROR> """ Exception containing a stringified error from the other side. """ | |
INTERNALERROR> def __init__(self, formatted): | |
INTERNALERROR> self.formatted = formatted | |
INTERNALERROR> Exception.__init__(self) | |
INTERNALERROR> | |
INTERNALERROR> def __str__(self): | |
INTERNALERROR> return self.formatted | |
INTERNALERROR> | |
INTERNALERROR> def __repr__(self): | |
INTERNALERROR> return "%s: %s" %(self.__class__.__name__, self.formatted) | |
INTERNALERROR> | |
INTERNALERROR> def warn(self): | |
INTERNALERROR> if self.formatted != INTERRUPT_TEXT: | |
INTERNALERROR> # XXX do this better | |
INTERNALERROR> sys.stderr.write("[%s] Warning: unhandled %r\n" | |
INTERNALERROR> % (os.getpid(), self,)) | |
INTERNALERROR> | |
INTERNALERROR> class TimeoutError(IOError): | |
INTERNALERROR> """ Exception indicating that a timeout was reached. """ | |
INTERNALERROR> | |
INTERNALERROR> | |
INTERNALERROR> NO_ENDMARKER_WANTED = object() | |
INTERNALERROR> | |
INTERNALERROR> class Channel(object): | |
INTERNALERROR> """Communication channel between two Python Interpreter execution points.""" | |
INTERNALERROR> RemoteError = RemoteError | |
INTERNALERROR> TimeoutError = TimeoutError | |
INTERNALERROR> _INTERNALWAKEUP = 1000 | |
INTERNALERROR> _executing = False | |
INTERNALERROR> | |
INTERNALERROR> def __init__(self, gateway, id): | |
INTERNALERROR> assert isinstance(id, int) | |
INTERNALERROR> self.gateway = gateway | |
INTERNALERROR> #XXX: defaults copied from Unserializer | |
INTERNALERROR> self._strconfig = getattr(gateway, '_strconfig', (True, False)) | |
INTERNALERROR> self.id = id | |
INTERNALERROR> self._items = self.gateway.execmodel.queue.Queue() | |
INTERNALERROR> self._closed = False | |
INTERNALERROR> self._receiveclosed = self.gateway.execmodel.Event() | |
INTERNALERROR> self._remoteerrors = [] | |
INTERNALERROR> | |
INTERNALERROR> def _trace(self, *msg): | |
INTERNALERROR> self.gateway._trace(self.id, *msg) | |
INTERNALERROR> | |
INTERNALERROR> def setcallback(self, callback, endmarker=NO_ENDMARKER_WANTED): | |
INTERNALERROR> """ set a callback function for receiving items. | |
INTERNALERROR> | |
INTERNALERROR> All already queued items will immediately trigger the callback. | |
INTERNALERROR> Afterwards the callback will execute in the receiver thread | |
INTERNALERROR> for each received data item and calls to ``receive()`` will | |
INTERNALERROR> raise an error. | |
INTERNALERROR> If an endmarker is specified the callback will eventually | |
INTERNALERROR> be called with the endmarker when the channel closes. | |
INTERNALERROR> """ | |
INTERNALERROR> _callbacks = self.gateway._channelfactory._callbacks | |
INTERNALERROR> with self.gateway._receivelock: | |
INTERNALERROR> if self._items is None: | |
INTERNALERROR> raise IOError("%r has callback already registered" %(self,)) | |
INTERNALERROR> items = self._items | |
INTERNALERROR> self._items = None | |
INTERNALERROR> while 1: | |
INTERNALERROR> try: | |
INTERNALERROR> olditem = items.get(block=False) | |
INTERNALERROR> except self.gateway.execmodel.queue.Empty: | |
INTERNALERROR> if not (self._closed or self._receiveclosed.isSet()): | |
INTERNALERROR> _callbacks[self.id] = ( | |
INTERNALERROR> callback, | |
INTERNALERROR> endmarker, | |
INTERNALERROR> self._strconfig, | |
INTERNALERROR> ) | |
INTERNALERROR> break | |
INTERNALERROR> else: | |
INTERNALERROR> if olditem is ENDMARKER: | |
INTERNALERROR> items.put(olditem) # for other receivers | |
INTERNALERROR> if endmarker is not NO_ENDMARKER_WANTED: | |
INTERNALERROR> callback(endmarker) | |
INTERNALERROR> break | |
INTERNALERROR> else: | |
INTERNALERROR> callback(olditem) | |
INTERNALERROR> | |
INTERNALERROR> def __repr__(self): | |
INTERNALERROR> flag = self.isclosed() and "closed" or "open" | |
INTERNALERROR> return "<Channel id=%d %s>" % (self.id, flag) | |
INTERNALERROR> | |
INTERNALERROR> def __del__(self): | |
INTERNALERROR> if self.gateway is None: # can be None in tests | |
INTERNALERROR> return | |
INTERNALERROR> self._trace("channel.__del__") | |
INTERNALERROR> # no multithreading issues here, because we have the last ref to 'self' | |
INTERNALERROR> if self._closed: | |
INTERNALERROR> # state transition "closed" --> "deleted" | |
INTERNALERROR> for error in self._remoteerrors: | |
INTERNALERROR> error.warn() | |
INTERNALERROR> elif self._receiveclosed.isSet(): | |
INTERNALERROR> # state transition "sendonly" --> "deleted" | |
INTERNALERROR> # the remote channel is already in "deleted" state, nothing to do | |
INTERNALERROR> pass | |
INTERNALERROR> else: | |
INTERNALERROR> # state transition "opened" --> "deleted" | |
INTERNALERROR> # check if we are in the middle of interpreter shutdown | |
INTERNALERROR> # in which case the process will go away and we probably | |
INTERNALERROR> # don't need to try to send a closing or last message | |
INTERNALERROR> # (and often it won't work anymore to send things out) | |
INTERNALERROR> if Message is not None: | |
INTERNALERROR> if self._items is None: # has_callback | |
INTERNALERROR> msgcode = Message.CHANNEL_LAST_MESSAGE | |
INTERNALERROR> else: | |
INTERNALERROR> msgcode = Message.CHANNEL_CLOSE | |
INTERNALERROR> try: | |
INTERNALERROR> self.gateway._send(msgcode, self.id) | |
INTERNALERROR> except (IOError, ValueError): # ignore problems with sending | |
INTERNALERROR> pass | |
INTERNALERROR> | |
INTERNALERROR> def _getremoteerror(self): | |
INTERNALERROR> try: | |
INTERNALERROR> return self._remoteerrors.pop(0) | |
INTERNALERROR> except IndexError: | |
INTERNALERROR> try: | |
INTERNALERROR> return self.gateway._error | |
INTERNALERROR> except AttributeError: | |
INTERNALERROR> pass | |
INTERNALERROR> return None | |
INTERNALERROR> | |
INTERNALERROR> # | |
INTERNALERROR> # public API for channel objects | |
INTERNALERROR> # | |
INTERNALERROR> def isclosed(self): | |
INTERNALERROR> """ return True if the channel is closed. A closed | |
INTERNALERROR> channel may still hold items. | |
INTERNALERROR> """ | |
INTERNALERROR> return self._closed | |
INTERNALERROR> | |
INTERNALERROR> def makefile(self, mode='w', proxyclose=False): | |
INTERNALERROR> """ return a file-like object. | |
INTERNALERROR> mode can be 'w' or 'r' for writeable/readable files. | |
INTERNALERROR> if proxyclose is true file.close() will also close the channel. | |
INTERNALERROR> """ | |
INTERNALERROR> if mode == "w": | |
INTERNALERROR> return ChannelFileWrite(channel=self, proxyclose=proxyclose) | |
INTERNALERROR> elif mode == "r": | |
INTERNALERROR> return ChannelFileRead(channel=self, proxyclose=proxyclose) | |
INTERNALERROR> raise ValueError("mode %r not availabe" %(mode,)) | |
INTERNALERROR> | |
INTERNALERROR> def close(self, error=None): | |
INTERNALERROR> """ close down this channel with an optional error message. | |
INTERNALERROR> Note that closing of a channel tied to remote_exec happens | |
INTERNALERROR> automatically at the end of execution and cannot | |
INTERNALERROR> be done explicitely. | |
INTERNALERROR> """ | |
INTERNALERROR> if self._executing: | |
INTERNALERROR> raise IOError("cannot explicitly close channel within remote_exec") | |
INTERNALERROR> if self._closed: | |
INTERNALERROR> self.gateway._trace(self, "ignoring redundant call to close()") | |
INTERNALERROR> if not self._closed: | |
INTERNALERROR> # state transition "opened/sendonly" --> "closed" | |
INTERNALERROR> # threads warning: the channel might be closed under our feet, | |
INTERNALERROR> # but it's never damaging to send too many CHANNEL_CLOSE messages | |
INTERNALERROR> # however, if the other side triggered a close already, we | |
INTERNALERROR> # do not send back a closed message. | |
INTERNALERROR> if not self._receiveclosed.isSet(): | |
INTERNALERROR> put = self.gateway._send | |
INTERNALERROR> if error is not None: | |
INTERNALERROR> put(Message.CHANNEL_CLOSE_ERROR, self.id, | |
INTERNALERROR> dumps_internal(error)) | |
INTERNALERROR> else: | |
INTERNALERROR> put(Message.CHANNEL_CLOSE, self.id) | |
INTERNALERROR> self._trace("sent channel close message") | |
INTERNALERROR> if isinstance(error, RemoteError): | |
INTERNALERROR> self._remoteerrors.append(error) | |
INTERNALERROR> self._closed = True # --> "closed" | |
INTERNALERROR> self._receiveclosed.set() | |
INTERNALERROR> queue = self._items | |
INTERNALERROR> if queue is not None: | |
INTERNALERROR> queue.put(ENDMARKER) | |
INTERNALERROR> self.gateway._channelfactory._no_longer_opened(self.id) | |
INTERNALERROR> | |
INTERNALERROR> def waitclose(self, timeout=None): | |
INTERNALERROR> """ wait until this channel is closed (or the remote side | |
INTERNALERROR> otherwise signalled that no more data was being sent). | |
INTERNALERROR> The channel may still hold receiveable items, but not receive | |
INTERNALERROR> any more after waitclose() has returned. Exceptions from executing | |
INTERNALERROR> code on the other side are reraised as local channel.RemoteErrors. | |
INTERNALERROR> EOFError is raised if the reading-connection was prematurely closed, | |
INTERNALERROR> which often indicates a dying process. | |
INTERNALERROR> self.TimeoutError is raised after the specified number of seconds | |
INTERNALERROR> (default is None, i.e. wait indefinitely). | |
INTERNALERROR> """ | |
INTERNALERROR> self._receiveclosed.wait(timeout=timeout) # wait for non-"opened" state | |
INTERNALERROR> if not self._receiveclosed.isSet(): | |
INTERNALERROR> raise self.TimeoutError("Timeout after %r seconds" % timeout) | |
INTERNALERROR> error = self._getremoteerror() | |
INTERNALERROR> if error: | |
INTERNALERROR> raise error | |
INTERNALERROR> | |
INTERNALERROR> def send(self, item): | |
INTERNALERROR> """sends the given item to the other side of the channel, | |
INTERNALERROR> possibly blocking if the sender queue is full. | |
INTERNALERROR> The item must be a simple python type and will be | |
INTERNALERROR> copied to the other side by value. IOError is | |
INTERNALERROR> raised if the write pipe was prematurely closed. | |
INTERNALERROR> """ | |
INTERNALERROR> if self.isclosed(): | |
INTERNALERROR> raise IOError("cannot send to %r" %(self,)) | |
INTERNALERROR> self.gateway._send(Message.CHANNEL_DATA, self.id, dumps_internal(item)) | |
INTERNALERROR> | |
INTERNALERROR> def receive(self, timeout=None): | |
INTERNALERROR> """receive a data item that was sent from the other side. | |
INTERNALERROR> timeout: None [default] blocked waiting. A positive number | |
INTERNALERROR> indicates the number of seconds after which a channel.TimeoutError | |
INTERNALERROR> exception will be raised if no item was received. | |
INTERNALERROR> Note that exceptions from the remotely executing code will be | |
INTERNALERROR> reraised as channel.RemoteError exceptions containing | |
INTERNALERROR> a textual representation of the remote traceback. | |
INTERNALERROR> """ | |
INTERNALERROR> itemqueue = self._items | |
INTERNALERROR> if itemqueue is None: | |
INTERNALERROR> raise IOError("cannot receive(), channel has receiver callback") | |
INTERNALERROR> try: | |
INTERNALERROR> x = itemqueue.get(timeout=timeout) | |
INTERNALERROR> except self.gateway.execmodel.queue.Empty: | |
INTERNALERROR> raise self.TimeoutError("no item after %r seconds" %(timeout)) | |
INTERNALERROR> if x is ENDMARKER: | |
INTERNALERROR> itemqueue.put(x) # for other receivers | |
INTERNALERROR> raise self._getremoteerror() or EOFError() | |
INTERNALERROR> else: | |
INTERNALERROR> return x | |
INTERNALERROR> | |
INTERNALERROR> def __iter__(self): | |
INTERNALERROR> return self | |
INTERNALERROR> | |
INTERNALERROR> def next(self): | |
INTERNALERROR> try: | |
INTERNALERROR> return self.receive() | |
INTERNALERROR> except EOFError: | |
INTERNALERROR> raise StopIteration | |
INTERNALERROR> __next__ = next | |
INTERNALERROR> | |
INTERNALERROR> | |
INTERNALERROR> def reconfigure(self, py2str_as_py3str=True, py3str_as_py2str=False): | |
INTERNALERROR> """ | |
INTERNALERROR> set the string coercion for this channel | |
INTERNALERROR> the default is to try to convert py2 str as py3 str, | |
INTERNALERROR> but not to try and convert py3 str to py2 str | |
INTERNALERROR> """ | |
INTERNALERROR> self._strconfig = (py2str_as_py3str, py3str_as_py2str) | |
INTERNALERROR> data = dumps_internal(self._strconfig) | |
INTERNALERROR> self.gateway._send(Message.RECONFIGURE, self.id, data=data) | |
INTERNALERROR> | |
INTERNALERROR> ENDMARKER = object() | |
INTERNALERROR> INTERRUPT_TEXT = "keyboard-interrupted" | |
INTERNALERROR> | |
INTERNALERROR> class ChannelFactory(object): | |
INTERNALERROR> def __init__(self, gateway, startcount=1): | |
INTERNALERROR> self._channels = weakref.WeakValueDictionary() | |
INTERNALERROR> self._callbacks = {} | |
INTERNALERROR> self._writelock = gateway.execmodel.Lock() | |
INTERNALERROR> self.gateway = gateway | |
INTERNALERROR> self.count = startcount | |
INTERNALERROR> self.finished = False | |
INTERNALERROR> self._list = list # needed during interp-shutdown | |
INTERNALERROR> | |
INTERNALERROR> def new(self, id=None): | |
INTERNALERROR> """ create a new Channel with 'id' (or create new id if None). """ | |
INTERNALERROR> with self._writelock: | |
INTERNALERROR> if self.finished: | |
INTERNALERROR> raise IOError("connexion already closed: %s" % (self.gateway,)) | |
INTERNALERROR> if id is None: | |
INTERNALERROR> id = self.count | |
INTERNALERROR> self.count += 2 | |
INTERNALERROR> try: | |
INTERNALERROR> channel = self._channels[id] | |
INTERNALERROR> except KeyError: | |
INTERNALERROR> channel = self._channels[id] = Channel(self.gateway, id) | |
INTERNALERROR> return channel | |
INTERNALERROR> | |
INTERNALERROR> def channels(self): | |
INTERNALERROR> return self._list(self._channels.values()) | |
INTERNALERROR> | |
INTERNALERROR> # | |
INTERNALERROR> # internal methods, called from the receiver thread | |
INTERNALERROR> # | |
INTERNALERROR> def _no_longer_opened(self, id): | |
INTERNALERROR> try: | |
INTERNALERROR> del self._channels[id] | |
INTERNALERROR> except KeyError: | |
INTERNALERROR> pass | |
INTERNALERROR> try: | |
INTERNALERROR> callback, endmarker, strconfig = self._callbacks.pop(id) | |
INTERNALERROR> except KeyError: | |
INTERNALERROR> pass | |
INTERNALERROR> else: | |
INTERNALERROR> if endmarker is not NO_ENDMARKER_WANTED: | |
INTERNALERROR> callback(endmarker) | |
INTERNALERROR> | |
INTERNALERROR> def _local_close(self, id, remoteerror=None, sendonly=False): | |
INTERNALERROR> channel = self._channels.get(id) | |
INTERNALERROR> if channel is None: | |
INTERNALERROR> # channel already in "deleted" state | |
INTERNALERROR> if remoteerror: | |
INTERNALERROR> remoteerror.warn() | |
INTERNALERROR> self._no_longer_opened(id) | |
INTERNALERROR> else: | |
INTERNALERROR> # state transition to "closed" state | |
INTERNALERROR> if remoteerror: | |
INTERNALERROR> channel._remoteerrors.append(remoteerror) | |
INTERNALERROR> queue = channel._items | |
INTERNALERROR> if queue is not None: | |
INTERNALERROR> queue.put(ENDMARKER) | |
INTERNALERROR> self._no_longer_opened(id) | |
INTERNALERROR> if not sendonly: # otherwise #--> "sendonly" | |
INTERNALERROR> channel._closed = True # --> "closed" | |
INTERNALERROR> channel._receiveclosed.set() | |
INTERNALERROR> | |
INTERNALERROR> def _local_receive(self, id, data): | |
INTERNALERROR> # executes in receiver thread | |
INTERNALERROR> channel = self._channels.get(id) | |
INTERNALERROR> try: | |
INTERNALERROR> callback, endmarker, strconfig = self._callbacks[id] | |
INTERNALERROR> except KeyError: | |
INTERNALERROR> queue = channel and channel._items | |
INTERNALERROR> if queue is None: | |
INTERNALERROR> pass # drop data | |
INTERNALERROR> else: | |
INTERNALERROR> item = loads_internal(data, channel) | |
INTERNALERROR> queue.put(item) | |
INTERNALERROR> else: | |
INTERNALERROR> try: | |
INTERNALERROR> data = loads_internal(data, channel, strconfig) | |
INTERNALERROR> callback(data) # even if channel may be already closed | |
INTERNALERROR> except Exception: | |
INTERNALERROR> excinfo = sys.exc_info() | |
INTERNALERROR> self.gateway._trace("exception during callback: %s" % | |
INTERNALERROR> excinfo[1]) | |
INTERNALERROR> errortext = self.gateway._geterrortext(excinfo) | |
INTERNALERROR> self.gateway._send(Message.CHANNEL_CLOSE_ERROR, | |
INTERNALERROR> id, dumps_internal(errortext)) | |
INTERNALERROR> self._local_close(id, errortext) | |
INTERNALERROR> | |
INTERNALERROR> def _finished_receiving(self): | |
INTERNALERROR> with self._writelock: | |
INTERNALERROR> self.finished = True | |
INTERNALERROR> for id in self._list(self._channels): | |
INTERNALERROR> self._local_close(id, sendonly=True) | |
INTERNALERROR> for id in self._list(self._callbacks): | |
INTERNALERROR> self._no_longer_opened(id) | |
INTERNALERROR> | |
INTERNALERROR> class ChannelFile(object): | |
INTERNALERROR> def __init__(self, channel, proxyclose=True): | |
INTERNALERROR> self.channel = channel | |
INTERNALERROR> self._proxyclose = proxyclose | |
INTERNALERROR> | |
INTERNALERROR> def isatty(self): | |
INTERNALERROR> return False | |
INTERNALERROR> | |
INTERNALERROR> def close(self): | |
INTERNALERROR> if self._proxyclose: | |
INTERNALERROR> self.channel.close() | |
INTERNALERROR> | |
INTERNALERROR> def __repr__(self): | |
INTERNALERROR> state = self.channel.isclosed() and 'closed' or 'open' | |
INTERNALERROR> return '<ChannelFile %d %s>' %(self.channel.id, state) | |
INTERNALERROR> | |
INTERNALERROR> class ChannelFileWrite(ChannelFile): | |
INTERNALERROR> def write(self, out): | |
INTERNALERROR> self.channel.send(out) | |
INTERNALERROR> | |
INTERNALERROR> def flush(self): | |
INTERNALERROR> pass | |
INTERNALERROR> | |
INTERNALERROR> class ChannelFileRead(ChannelFile): | |
INTERNALERROR> def __init__(self, channel, proxyclose=True): | |
INTERNALERROR> super(ChannelFileRead, self).__init__(channel, proxyclose) | |
INTERNALERROR> self._buffer = None | |
INTERNALERROR> | |
INTERNALERROR> def read(self, n): | |
INTERNALERROR> try: | |
INTERNALERROR> if self._buffer is None: | |
INTERNALERROR> self._buffer = self.channel.receive() | |
INTERNALERROR> while len(self._buffer) < n: | |
INTERNALERROR> self._buffer += self.channel.receive() | |
INTERNALERROR> except EOFError: | |
INTERNALERROR> self.close() | |
INTERNALERROR> if self._buffer is None: | |
INTERNALERROR> ret = "" | |
INTERNALERROR> else: | |
INTERNALERROR> ret = self._buffer[:n] | |
INTERNALERROR> self._buffer = self._buffer[n:] | |
INTERNALERROR> return ret | |
INTERNALERROR> | |
INTERNALERROR> def readline(self): | |
INTERNALERROR> if self._buffer is not None: | |
INTERNALERROR> i = self._buffer.find("\n") | |
INTERNALERROR> if i != -1: | |
INTERNALERROR> return self.read(i+1) | |
INTERNALERROR> line = self.read(len(self._buffer)+1) | |
INTERNALERROR> else: | |
INTERNALERROR> line = self.read(1) | |
INTERNALERROR> while line and line[-1] != "\n": | |
INTERNALERROR> c = self.read(1) | |
INTERNALERROR> if not c: | |
INTERNALERROR> break | |
INTERNALERROR> line += c | |
INTERNALERROR> return line | |
INTERNALERROR> | |
INTERNALERROR> class BaseGateway(object): | |
INTERNALERROR> exc_info = sys.exc_info | |
INTERNALERROR> _sysex = sysex | |
INTERNALERROR> id = "<slave>" | |
INTERNALERROR> | |
INTERNALERROR> def __init__(self, io, id, _startcount=2): | |
INTERNALERROR> self.execmodel = io.execmodel | |
INTERNALERROR> self._io = io | |
INTERNALERROR> self.id = id | |
INTERNALERROR> self._strconfig = (Unserializer.py2str_as_py3str, | |
INTERNALERROR> Unserializer.py3str_as_py2str) | |
INTERNALERROR> self._channelfactory = ChannelFactory(self, _startcount) | |
INTERNALERROR> self._receivelock = self.execmodel.RLock() | |
INTERNALERROR> # globals may be NONE at process-termination | |
INTERNALERROR> self.__trace = trace | |
INTERNALERROR> self._geterrortext = geterrortext | |
INTERNALERROR> self._receivepool = self.execmodel.WorkerPool() | |
INTERNALERROR> | |
INTERNALERROR> def _trace(self, *msg): | |
INTERNALERROR> self.__trace(self.id, *msg) | |
INTERNALERROR> | |
INTERNALERROR> def _initreceive(self): | |
INTERNALERROR> self._receivepool.spawn(self._thread_receiver) | |
INTERNALERROR> | |
INTERNALERROR> def _thread_receiver(self): | |
INTERNALERROR> def log(*msg): | |
INTERNALERROR> self._trace("[receiver-thread]", *msg) | |
INTERNALERROR> | |
INTERNALERROR> log("RECEIVERTHREAD: starting to run") | |
INTERNALERROR> io = self._io | |
INTERNALERROR> try: | |
INTERNALERROR> while 1: | |
INTERNALERROR> msg = Message.from_io(io) | |
INTERNALERROR> log("received", msg) | |
INTERNALERROR> with self._receivelock: | |
INTERNALERROR> msg.received(self) | |
INTERNALERROR> del msg | |
INTERNALERROR> except (KeyboardInterrupt, GatewayReceivedTerminate): | |
INTERNALERROR> pass | |
INTERNALERROR> except EOFError: | |
INTERNALERROR> log("EOF without prior gateway termination message") | |
INTERNALERROR> self._error = self.exc_info()[1] | |
INTERNALERROR> except Exception: | |
INTERNALERROR> log(self._geterrortext(self.exc_info())) | |
INTERNALERROR> log('finishing receiving thread') | |
INTERNALERROR> # wake up and terminate any execution waiting to receive | |
INTERNALERROR> self._channelfactory._finished_receiving() | |
INTERNALERROR> log('terminating execution') | |
INTERNALERROR> self._terminate_execution() | |
INTERNALERROR> log('closing read') | |
INTERNALERROR> self._io.close_read() | |
INTERNALERROR> log('closing write') | |
INTERNALERROR> self._io.close_write() | |
INTERNALERROR> log('terminating our receive pseudo pool') | |
INTERNALERROR> self._receivepool.trigger_shutdown() | |
INTERNALERROR> | |
INTERNALERROR> def _terminate_execution(self): | |
INTERNALERROR> pass | |
INTERNALERROR> | |
INTERNALERROR> def _send(self, msgcode, channelid=0, data=bytes()): | |
INTERNALERROR> message = Message(msgcode, channelid, data) | |
INTERNALERROR> try: | |
INTERNALERROR> message.to_io(self._io) | |
INTERNALERROR> self._trace('sent', message) | |
INTERNALERROR> except (IOError, ValueError): | |
INTERNALERROR> e = sys.exc_info()[1] | |
INTERNALERROR> self._trace('failed to send', message, e) | |
INTERNALERROR> # ValueError might be because the IO is already closed | |
INTERNALERROR> raise IOError("cannot send (already closed?)") | |
INTERNALERROR> | |
INTERNALERROR> def _local_schedulexec(self, channel, sourcetask): | |
INTERNALERROR> channel.close("execution disallowed") | |
INTERNALERROR> | |
INTERNALERROR> # _____________________________________________________________________ | |
INTERNALERROR> # | |
INTERNALERROR> # High Level Interface | |
INTERNALERROR> # _____________________________________________________________________ | |
INTERNALERROR> # | |
INTERNALERROR> def newchannel(self): | |
INTERNALERROR> """ return a new independent channel. """ | |
INTERNALERROR> return self._channelfactory.new() | |
INTERNALERROR> | |
INTERNALERROR> def join(self, timeout=None): | |
INTERNALERROR> """ Wait for receiverthread to terminate. """ | |
INTERNALERROR> self._trace("waiting for receiver thread to finish") | |
INTERNALERROR> self._receivepool.waitall() | |
INTERNALERROR> | |
INTERNALERROR> class SlaveGateway(BaseGateway): | |
INTERNALERROR> | |
INTERNALERROR> def _local_schedulexec(self, channel, sourcetask): | |
INTERNALERROR> sourcetask = loads_internal(sourcetask) | |
INTERNALERROR> self._execpool.spawn(self.executetask, ((channel, sourcetask))) | |
INTERNALERROR> | |
INTERNALERROR> def _terminate_execution(self): | |
INTERNALERROR> # called from receiverthread | |
INTERNALERROR> self._trace("shutting down execution pool") | |
INTERNALERROR> self._execpool.trigger_shutdown() | |
INTERNALERROR> if not self._execpool.waitall(5.0): | |
INTERNALERROR> self._trace("execution ongoing after 5 secs, trying interrupt_main") | |
INTERNALERROR> # We try hard to terminate execution based on the assumption | |
INTERNALERROR> # that there is only one gateway object running per-process. | |
INTERNALERROR> if sys.platform != "win32": | |
INTERNALERROR> self._trace("sending ourselves a SIGINT") | |
INTERNALERROR> os.kill(os.getpid(), 2) # send ourselves a SIGINT | |
INTERNALERROR> elif interrupt_main is not None: | |
INTERNALERROR> self._trace("calling interrupt_main()") | |
INTERNALERROR> interrupt_main() | |
INTERNALERROR> if not self._execpool.waitall(10.0): | |
INTERNALERROR> self._trace("execution did not finish in another 10 secs, " | |
INTERNALERROR> "calling os._exit()") | |
INTERNALERROR> os._exit(1) | |
INTERNALERROR> | |
INTERNALERROR> def serve(self): | |
INTERNALERROR> trace = lambda msg: self._trace("[serve] " + msg) | |
INTERNALERROR> hasprimary = self.execmodel.backend == "thread" | |
INTERNALERROR> self._execpool = self.execmodel.WorkerPool(hasprimary=hasprimary) | |
INTERNALERROR> trace("spawning receiver thread") | |
INTERNALERROR> self._initreceive() | |
INTERNALERROR> try: | |
INTERNALERROR> if hasprimary: | |
INTERNALERROR> # this will return when we are in shutdown | |
INTERNALERROR> trace("integrating as primary thread") | |
INTERNALERROR> self._execpool.integrate_as_primary_thread() | |
INTERNALERROR> trace("joining receiver thread") | |
INTERNALERROR> self.join() | |
INTERNALERROR> except KeyboardInterrupt: | |
INTERNALERROR> # in the slave we can't really do anything sensible | |
INTERNALERROR> trace("swallowing keyboardinterrupt, serve finished") | |
INTERNALERROR> | |
INTERNALERROR> def executetask(self, item): | |
INTERNALERROR> try: | |
INTERNALERROR> channel, (source, call_name, kwargs) = item | |
INTERNALERROR> if not ISPY3 and kwargs: | |
INTERNALERROR> # some python2 versions do not accept unicode keyword params | |
INTERNALERROR> # note: Unserializer generally turns py2-str to py3-str objects | |
INTERNALERROR> newkwargs = {} | |
INTERNALERROR> for name, value in kwargs.items(): | |
INTERNALERROR> if isinstance(name, unicode): | |
INTERNALERROR> name = name.encode('ascii') | |
INTERNALERROR> newkwargs[name] = value | |
INTERNALERROR> kwargs = newkwargs | |
INTERNALERROR> loc = {'channel' : channel, '__name__': '__channelexec__'} | |
INTERNALERROR> self._trace("execution starts[%s]: %s" % | |
INTERNALERROR> (channel.id, repr(source)[:50])) | |
INTERNALERROR> channel._executing = True | |
INTERNALERROR> try: | |
INTERNALERROR> co = compile(source+'\n', '<remote exec>', 'exec') | |
INTERNALERROR> do_exec(co, loc) # noqa | |
INTERNALERROR> if call_name: | |
INTERNALERROR> self._trace('calling %s(**%60r)' % (call_name, kwargs)) | |
INTERNALERROR> function = loc[call_name] | |
INTERNALERROR> function(channel, **kwargs) | |
INTERNALERROR> finally: | |
INTERNALERROR> channel._executing = False | |
INTERNALERROR> self._trace("execution finished") | |
INTERNALERROR> except KeyboardInterrupt: | |
INTERNALERROR> channel.close(INTERRUPT_TEXT) | |
INTERNALERROR> raise | |
INTERNALERROR> except: | |
INTERNALERROR> excinfo = self.exc_info() | |
INTERNALERROR> if not isinstance(excinfo[1], EOFError): | |
INTERNALERROR> if not channel.gateway._channelfactory.finished: | |
INTERNALERROR> self._trace("got exception: %r" % (excinfo[1],)) | |
INTERNALERROR> errortext = self._geterrortext(excinfo) | |
INTERNALERROR> channel.close(errortext) | |
INTERNALERROR> return | |
INTERNALERROR> self._trace("ignoring EOFError because receiving finished") | |
INTERNALERROR> channel.close() | |
INTERNALERROR> | |
INTERNALERROR> # | |
INTERNALERROR> # Cross-Python pickling code, tested from test_serializer.py | |
INTERNALERROR> # | |
INTERNALERROR> | |
INTERNALERROR> class DataFormatError(Exception): | |
INTERNALERROR> pass | |
INTERNALERROR> | |
INTERNALERROR> class DumpError(DataFormatError): | |
INTERNALERROR> """Error while serializing an object.""" | |
INTERNALERROR> | |
INTERNALERROR> class LoadError(DataFormatError): | |
INTERNALERROR> """Error while unserializing an object.""" | |
INTERNALERROR> | |
INTERNALERROR> if ISPY3: | |
INTERNALERROR> def bchr(n): | |
INTERNALERROR> return bytes([n]) | |
INTERNALERROR> else: | |
INTERNALERROR> bchr = chr | |
INTERNALERROR> | |
INTERNALERROR> DUMPFORMAT_VERSION = bchr(1) | |
INTERNALERROR> | |
INTERNALERROR> FOUR_BYTE_INT_MAX = 2147483647 | |
INTERNALERROR> | |
INTERNALERROR> FLOAT_FORMAT = "!d" | |
INTERNALERROR> FLOAT_FORMAT_SIZE = struct.calcsize(FLOAT_FORMAT) | |
INTERNALERROR> | |
INTERNALERROR> class _Stop(Exception): | |
INTERNALERROR> pass | |
INTERNALERROR> | |
INTERNALERROR> class Unserializer(object): | |
INTERNALERROR> num2func = {} # is filled after this class definition | |
INTERNALERROR> py2str_as_py3str = True # True | |
INTERNALERROR> py3str_as_py2str = False # false means py2 will get unicode | |
INTERNALERROR> | |
INTERNALERROR> def __init__(self, stream, channel_or_gateway=None, strconfig=None): | |
INTERNALERROR> gateway = getattr(channel_or_gateway, 'gateway', channel_or_gateway) | |
INTERNALERROR> strconfig = getattr(channel_or_gateway, '_strconfig', strconfig) | |
INTERNALERROR> if strconfig: | |
INTERNALERROR> self.py2str_as_py3str, self.py3str_as_py2str = strconfig | |
INTERNALERROR> self.stream = stream | |
INTERNALERROR> self.channelfactory = getattr(gateway, '_channelfactory', gateway) | |
INTERNALERROR> | |
INTERNALERROR> def load(self, versioned=False): | |
INTERNALERROR> if versioned: | |
INTERNALERROR> ver = self.stream.read(1) | |
INTERNALERROR> if ver != DUMPFORMAT_VERSION: | |
INTERNALERROR> raise LoadError("wrong dumpformat version") | |
INTERNALERROR> self.stack = [] | |
INTERNALERROR> try: | |
INTERNALERROR> while True: | |
INTERNALERROR> opcode = self.stream.read(1) | |
INTERNALERROR> if not opcode: | |
INTERNALERROR> raise EOFError | |
INTERNALERROR> try: | |
INTERNALERROR> loader = self.num2func[opcode] | |
INTERNALERROR> except KeyError: | |
INTERNALERROR> raise LoadError("unkown opcode %r - " | |
INTERNALERROR> "wire protocol corruption?" % (opcode,)) | |
INTERNALERROR> loader(self) | |
INTERNALERROR> except _Stop: | |
INTERNALERROR> if len(self.stack) != 1: | |
INTERNALERROR> raise LoadError("internal unserialization error") | |
INTERNALERROR> return self.stack.pop(0) | |
INTERNALERROR> else: | |
INTERNALERROR> raise LoadError("didn't get STOP") | |
INTERNALERROR> | |
INTERNALERROR> def load_none(self): | |
INTERNALERROR> self.stack.append(None) | |
INTERNALERROR> | |
INTERNALERROR> def load_true(self): | |
INTERNALERROR> self.stack.append(True) | |
INTERNALERROR> | |
INTERNALERROR> def load_false(self): | |
INTERNALERROR> self.stack.append(False) | |
INTERNALERROR> | |
INTERNALERROR> def load_int(self): | |
INTERNALERROR> i = self._read_int4() | |
INTERNALERROR> self.stack.append(i) | |
INTERNALERROR> | |
INTERNALERROR> def load_longint(self): | |
INTERNALERROR> s = self._read_byte_string() | |
INTERNALERROR> self.stack.append(int(s)) | |
INTERNALERROR> | |
INTERNALERROR> if ISPY3: | |
INTERNALERROR> load_long = load_int | |
INTERNALERROR> load_longlong = load_longint | |
INTERNALERROR> else: | |
INTERNALERROR> def load_long(self): | |
INTERNALERROR> i = self._read_int4() | |
INTERNALERROR> self.stack.append(long(i)) | |
INTERNALERROR> | |
INTERNALERROR> def load_longlong(self): | |
INTERNALERROR> l = self._read_byte_string() | |
INTERNALERROR> self.stack.append(long(l)) | |
INTERNALERROR> | |
INTERNALERROR> def load_float(self): | |
INTERNALERROR> binary = self.stream.read(FLOAT_FORMAT_SIZE) | |
INTERNALERROR> self.stack.append(struct.unpack(FLOAT_FORMAT, binary)[0]) | |
INTERNALERROR> | |
INTERNALERROR> def _read_int4(self): | |
INTERNALERROR> return struct.unpack("!i", self.stream.read(4))[0] | |
INTERNALERROR> | |
INTERNALERROR> def _read_byte_string(self): | |
INTERNALERROR> length = self._read_int4() | |
INTERNALERROR> as_bytes = self.stream.read(length) | |
INTERNALERROR> return as_bytes | |
INTERNALERROR> | |
INTERNALERROR> def load_py3string(self): | |
INTERNALERROR> as_bytes = self._read_byte_string() | |
INTERNALERROR> if not ISPY3 and self.py3str_as_py2str: | |
INTERNALERROR> # XXX Should we try to decode into latin-1? | |
INTERNALERROR> self.stack.append(as_bytes) | |
INTERNALERROR> else: | |
INTERNALERROR> self.stack.append(as_bytes.decode("utf-8")) | |
INTERNALERROR> | |
INTERNALERROR> def load_py2string(self): | |
INTERNALERROR> as_bytes = self._read_byte_string() | |
INTERNALERROR> if ISPY3 and self.py2str_as_py3str: | |
INTERNALERROR> s = as_bytes.decode("latin-1") | |
INTERNALERROR> else: | |
INTERNALERROR> s = as_bytes | |
INTERNALERROR> self.stack.append(s) | |
INTERNALERROR> | |
INTERNALERROR> def load_bytes(self): | |
INTERNALERROR> s = self._read_byte_string() | |
INTERNALERROR> self.stack.append(s) | |
INTERNALERROR> | |
INTERNALERROR> def load_unicode(self): | |
INTERNALERROR> self.stack.append(self._read_byte_string().decode("utf-8")) | |
INTERNALERROR> | |
INTERNALERROR> def load_newlist(self): | |
INTERNALERROR> length = self._read_int4() | |
INTERNALERROR> self.stack.append([None] * length) | |
INTERNALERROR> | |
INTERNALERROR> def load_setitem(self): | |
INTERNALERROR> if len(self.stack) < 3: | |
INTERNALERROR> raise LoadError("not enough items for setitem") | |
INTERNALERROR> value = self.stack.pop() | |
INTERNALERROR> key = self.stack.pop() | |
INTERNALERROR> self.stack[-1][key] = value | |
INTERNALERROR> | |
INTERNALERROR> def load_newdict(self): | |
INTERNALERROR> self.stack.append({}) | |
INTERNALERROR> | |
INTERNALERROR> def _load_collection(self, type_): | |
INTERNALERROR> length = self._read_int4() | |
INTERNALERROR> if length: | |
INTERNALERROR> res = type_(self.stack[-length:]) | |
INTERNALERROR> del self.stack[-length:] | |
INTERNALERROR> self.stack.append(res) | |
INTERNALERROR> else: | |
INTERNALERROR> self.stack.append(type_()) | |
INTERNALERROR> | |
INTERNALERROR> def load_buildtuple(self): | |
INTERNALERROR> self._load_collection(tuple) | |
INTERNALERROR> | |
INTERNALERROR> def load_set(self): | |
INTERNALERROR> self._load_collection(set) | |
INTERNALERROR> | |
INTERNALERROR> def load_frozenset(self): | |
INTERNALERROR> self._load_collection(frozenset) | |
INTERNALERROR> | |
INTERNALERROR> def load_stop(self): | |
INTERNALERROR> raise _Stop | |
INTERNALERROR> | |
INTERNALERROR> def load_channel(self): | |
INTERNALERROR> id = self._read_int4() | |
INTERNALERROR> newchannel = self.channelfactory.new(id) | |
INTERNALERROR> self.stack.append(newchannel) | |
INTERNALERROR> | |
INTERNALERROR> # automatically build opcodes and byte-encoding | |
INTERNALERROR> | |
INTERNALERROR> class opcode: | |
INTERNALERROR> """ container for name -> num mappings. """ | |
INTERNALERROR> | |
INTERNALERROR> def _buildopcodes(): | |
INTERNALERROR> l = [] | |
INTERNALERROR> for name, func in Unserializer.__dict__.items(): | |
INTERNALERROR> if name.startswith("load_"): | |
INTERNALERROR> opname = name[5:].upper() | |
INTERNALERROR> l.append((opname, func)) | |
INTERNALERROR> l.sort() | |
INTERNALERROR> for i,(opname, func) in enumerate(l): | |
INTERNALERROR> assert i < 26, "xxx" | |
INTERNALERROR> i = bchr(64+i) | |
INTERNALERROR> Unserializer.num2func[i] = func | |
INTERNALERROR> setattr(opcode, opname, i) | |
INTERNALERROR> | |
INTERNALERROR> _buildopcodes() | |
INTERNALERROR> | |
INTERNALERROR> def dumps(obj): | |
INTERNALERROR> """ return a serialized bytestring of the given obj. | |
INTERNALERROR> | |
INTERNALERROR> The obj and all contained objects must be of a builtin | |
INTERNALERROR> python type (so nested dicts, sets, etc. are all ok but | |
INTERNALERROR> not user-level instances). | |
INTERNALERROR> """ | |
INTERNALERROR> return _Serializer().save(obj, versioned=True) | |
INTERNALERROR> | |
INTERNALERROR> def dump(byteio, obj): | |
INTERNALERROR> """ write a serialized bytestring of the given obj to the given stream. """ | |
INTERNALERROR> _Serializer(write=byteio.write).save(obj, versioned=True) | |
INTERNALERROR> | |
INTERNALERROR> def loads(bytestring, py2str_as_py3str=False, py3str_as_py2str=False): | |
INTERNALERROR> """ return the object as deserialized from the given bytestring. | |
INTERNALERROR> | |
INTERNALERROR> py2str_as_py3str: if true then string (str) objects previously | |
INTERNALERROR> dumped on Python2 will be loaded as Python3 | |
INTERNALERROR> strings which really are text objects. | |
INTERNALERROR> py3str_as_py2str: if true then string (str) objects previously | |
INTERNALERROR> dumped on Python3 will be loaded as Python2 | |
INTERNALERROR> strings instead of unicode objects. | |
INTERNALERROR> | |
INTERNALERROR> if the bytestring was dumped with an incompatible protocol | |
INTERNALERROR> version or if the bytestring is corrupted, the | |
INTERNALERROR> ``execnet.DataFormatError`` will be raised. | |
INTERNALERROR> """ | |
INTERNALERROR> io = BytesIO(bytestring) | |
INTERNALERROR> return load(io, py2str_as_py3str=py2str_as_py3str, | |
INTERNALERROR> py3str_as_py2str=py3str_as_py2str) | |
INTERNALERROR> | |
INTERNALERROR> def load(io, py2str_as_py3str=False, py3str_as_py2str=False): | |
INTERNALERROR> """ derserialize an object form the specified stream. | |
INTERNALERROR> | |
INTERNALERROR> Behaviour and parameters are otherwise the same as with ``loads`` | |
INTERNALERROR> """ | |
INTERNALERROR> strconfig=(py2str_as_py3str, py3str_as_py2str) | |
INTERNALERROR> return Unserializer(io, strconfig=strconfig).load(versioned=True) | |
INTERNALERROR> | |
INTERNALERROR> def loads_internal(bytestring, channelfactory=None, strconfig=None): | |
INTERNALERROR> io = BytesIO(bytestring) | |
INTERNALERROR> return Unserializer(io, channelfactory, strconfig).load() | |
INTERNALERROR> | |
INTERNALERROR> def dumps_internal(obj): | |
INTERNALERROR> return _Serializer().save(obj) | |
INTERNALERROR> | |
INTERNALERROR> | |
INTERNALERROR> class _Serializer(object): | |
INTERNALERROR> _dispatch = {} | |
INTERNALERROR> | |
INTERNALERROR> def __init__(self, write=None): | |
INTERNALERROR> if write is None: | |
INTERNALERROR> self._streamlist = [] | |
INTERNALERROR> write = self._streamlist.append | |
INTERNALERROR> self._write = write | |
INTERNALERROR> | |
INTERNALERROR> def save(self, obj, versioned=False): | |
INTERNALERROR> # calling here is not re-entrant but multiple instances | |
INTERNALERROR> # may write to the same stream because of the common platform | |
INTERNALERROR> # atomic-write guaruantee (concurrent writes each happen atomicly) | |
INTERNALERROR> if versioned: | |
INTERNALERROR> self._write(DUMPFORMAT_VERSION) | |
INTERNALERROR> self._save(obj) | |
INTERNALERROR> self._write(opcode.STOP) | |
INTERNALERROR> try: | |
INTERNALERROR> streamlist = self._streamlist | |
INTERNALERROR> except AttributeError: | |
INTERNALERROR> return None | |
INTERNALERROR> return type(streamlist[0])().join(streamlist) | |
INTERNALERROR> | |
INTERNALERROR> def _save(self, obj): | |
INTERNALERROR> tp = type(obj) | |
INTERNALERROR> try: | |
INTERNALERROR> dispatch = self._dispatch[tp] | |
INTERNALERROR> except KeyError: | |
INTERNALERROR> methodname = 'save_' + tp.__name__ | |
INTERNALERROR> meth = getattr(self.__class__, methodname, None) | |
INTERNALERROR> if meth is None: | |
INTERNALERROR> raise DumpError("can't serialize %s" % (tp,)) | |
INTERNALERROR> dispatch = self._dispatch[tp] = meth | |
INTERNALERROR> dispatch(self, obj) | |
INTERNALERROR> | |
INTERNALERROR> def save_NoneType(self, non): | |
INTERNALERROR> self._write(opcode.NONE) | |
INTERNALERROR> | |
INTERNALERROR> def save_bool(self, boolean): | |
INTERNALERROR> if boolean: | |
INTERNALERROR> self._write(opcode.TRUE) | |
INTERNALERROR> else: | |
INTERNALERROR> self._write(opcode.FALSE) | |
INTERNALERROR> | |
INTERNALERROR> def save_bytes(self, bytes_): | |
INTERNALERROR> self._write(opcode.BYTES) | |
INTERNALERROR> self._write_byte_sequence(bytes_) | |
INTERNALERROR> | |
INTERNALERROR> if ISPY3: | |
INTERNALERROR> def save_str(self, s): | |
INTERNALERROR> self._write(opcode.PY3STRING) | |
INTERNALERROR> self._write_unicode_string(s) | |
INTERNALERROR> else: | |
INTERNALERROR> def save_str(self, s): | |
INTERNALERROR> self._write(opcode.PY2STRING) | |
INTERNALERROR> self._write_byte_sequence(s) | |
INTERNALERROR> | |
INTERNALERROR> def save_unicode(self, s): | |
INTERNALERROR> self._write(opcode.UNICODE) | |
INTERNALERROR> self._write_unicode_string(s) | |
INTERNALERROR> | |
INTERNALERROR> def _write_unicode_string(self, s): | |
INTERNALERROR> try: | |
INTERNALERROR> as_bytes = s.encode("utf-8") | |
INTERNALERROR> except UnicodeEncodeError: | |
INTERNALERROR> raise DumpError("strings must be utf-8 encodable") | |
INTERNALERROR> self._write_byte_sequence(as_bytes) | |
INTERNALERROR> | |
INTERNALERROR> def _write_byte_sequence(self, bytes_): | |
INTERNALERROR> self._write_int4(len(bytes_), "string is too long") | |
INTERNALERROR> self._write(bytes_) | |
INTERNALERROR> | |
INTERNALERROR> def _save_integral(self, i, short_op, long_op): | |
INTERNALERROR> if i <= FOUR_BYTE_INT_MAX: | |
INTERNALERROR> self._write(short_op) | |
INTERNALERROR> self._write_int4(i) | |
INTERNALERROR> else: | |
INTERNALERROR> self._write(long_op) | |
INTERNALERROR> self._write_byte_sequence(str(i).rstrip("L").encode("ascii")) | |
INTERNALERROR> | |
INTERNALERROR> def save_int(self, i): | |
INTERNALERROR> self._save_integral(i, opcode.INT, opcode.LONGINT) | |
INTERNALERROR> | |
INTERNALERROR> def save_long(self, l): | |
INTERNALERROR> self._save_integral(l, opcode.LONG, opcode.LONGLONG) | |
INTERNALERROR> | |
INTERNALERROR> def save_float(self, flt): | |
INTERNALERROR> self._write(opcode.FLOAT) | |
INTERNALERROR> self._write(struct.pack(FLOAT_FORMAT, flt)) | |
INTERNALERROR> | |
INTERNALERROR> def _write_int4(self, i, error="int must be less than %i" % | |
INTERNALERROR> (FOUR_BYTE_INT_MAX,)): | |
INTERNALERROR> if i > FOUR_BYTE_INT_MAX: | |
INTERNALERROR> raise DumpError(error) | |
INTERNALERROR> self._write(struct.pack("!i", i)) | |
INTERNALERROR> | |
INTERNALERROR> def save_list(self, L): | |
INTERNALERROR> self._write(opcode.NEWLIST) | |
INTERNALERROR> self._write_int4(len(L), "list is too long") | |
INTERNALERROR> for i, item in enumerate(L): | |
INTERNALERROR> self._write_setitem(i, item) | |
INTERNALERROR> | |
INTERNALERROR> def _write_setitem(self, key, value): | |
INTERNALERROR> self._save(key) | |
INTERNALERROR> self._save(value) | |
INTERNALERROR> self._write(opcode.SETITEM) | |
INTERNALERROR> | |
INTERNALERROR> def save_dict(self, d): | |
INTERNALERROR> self._write(opcode.NEWDICT) | |
INTERNALERROR> for key, value in d.items(): | |
INTERNALERROR> self._write_setitem(key, value) | |
INTERNALERROR> | |
INTERNALERROR> def save_tuple(self, tup): | |
INTERNALERROR> for item in tup: | |
INTERNALERROR> self._save(item) | |
INTERNALERROR> self._write(opcode.BUILDTUPLE) | |
INTERNALERROR> self._write_int4(len(tup), "tuple is too long") | |
INTERNALERROR> | |
INTERNALERROR> def _write_set(self, s, op): | |
INTERNALERROR> for item in s: | |
INTERNALERROR> self._save(item) | |
INTERNALERROR> self._write(op) | |
INTERNALERROR> self._write_int4(len(s), "set is too long") | |
INTERNALERROR> | |
INTERNALERROR> def save_set(self, s): | |
INTERNALERROR> self._write_set(s, opcode.SET) | |
INTERNALERROR> | |
INTERNALERROR> def save_frozenset(self, s): | |
INTERNALERROR> self._write_set(s, opcode.FROZENSET) | |
INTERNALERROR> | |
INTERNALERROR> def save_Channel(self, channel): | |
INTERNALERROR> self._write(opcode.CHANNEL) | |
INTERNALERROR> self._write_int4(channel.id) | |
INTERNALERROR> | |
INTERNALERROR> def init_popen_io(execmodel): | |
INTERNALERROR> if not hasattr(os, 'dup'): # jython | |
INTERNALERROR> io = Popen2IO(sys.stdout, sys.stdin, execmodel) | |
INTERNALERROR> import tempfile | |
INTERNALERROR> sys.stdin = tempfile.TemporaryFile('r') | |
INTERNALERROR> sys.stdout = tempfile.TemporaryFile('w') | |
INTERNALERROR> else: | |
INTERNALERROR> try: | |
INTERNALERROR> devnull = os.devnull | |
INTERNALERROR> except AttributeError: | |
INTERNALERROR> if os.name == 'nt': | |
INTERNALERROR> devnull = 'NUL' | |
INTERNALERROR> else: | |
INTERNALERROR> devnull = '/dev/null' | |
INTERNALERROR> # stdin | |
INTERNALERROR> stdin = execmodel.fdopen(os.dup(0), 'r', 1) | |
INTERNALERROR> fd = os.open(devnull, os.O_RDONLY) | |
INTERNALERROR> os.dup2(fd, 0) | |
INTERNALERROR> os.close(fd) | |
INTERNALERROR> | |
INTERNALERROR> # stdout | |
INTERNALERROR> stdout = execmodel.fdopen(os.dup(1), 'w', 1) | |
INTERNALERROR> fd = os.open(devnull, os.O_WRONLY) | |
INTERNALERROR> os.dup2(fd, 1) | |
INTERNALERROR> | |
INTERNALERROR> # stderr for win32 | |
INTERNALERROR> if os.name == 'nt': | |
INTERNALERROR> sys.stderr = execmodel.fdopen(os.dup(2), 'w', 1) | |
INTERNALERROR> os.dup2(fd, 2) | |
INTERNALERROR> os.close(fd) | |
INTERNALERROR> io = Popen2IO(stdout, stdin, execmodel) | |
INTERNALERROR> sys.stdin = execmodel.fdopen(0, 'r', 1) | |
INTERNALERROR> sys.stdout = execmodel.fdopen(1, 'w', 1) | |
INTERNALERROR> return io | |
INTERNALERROR> | |
INTERNALERROR> def serve(io, id): | |
INTERNALERROR> trace("creating slavegateway on %r" %(io,)) | |
INTERNALERROR> SlaveGateway(io=io, id=id, _startcount=2).serve() | |
INTERNALERROR> | |
INTERNALERROR> import socket | |
INTERNALERROR> class SocketIO: | |
INTERNALERROR> def __init__(self, sock, execmodel): | |
INTERNALERROR> self.sock = sock | |
INTERNALERROR> self.execmodel = execmodel | |
INTERNALERROR> socket = execmodel.socket | |
INTERNALERROR> try: | |
INTERNALERROR> sock.setsockopt(socket.SOL_IP, socket.IP_TOS, 0x10)# IPTOS_LOWDELAY | |
INTERNALERROR> sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) | |
INTERNALERROR> except (AttributeError, socket.error): | |
INTERNALERROR> sys.stderr.write("WARNING: cannot set socketoption") | |
INTERNALERROR> | |
INTERNALERROR> def read(self, numbytes): | |
INTERNALERROR> "Read exactly 'bytes' bytes from the socket." | |
INTERNALERROR> buf = bytes() | |
INTERNALERROR> while len(buf) < numbytes: | |
INTERNALERROR> t = self.sock.recv(numbytes - len(buf)) | |
INTERNALERROR> if not t: | |
INTERNALERROR> raise EOFError | |
INTERNALERROR> buf += t | |
INTERNALERROR> return buf | |
INTERNALERROR> | |
INTERNALERROR> def write(self, data): | |
INTERNALERROR> self.sock.sendall(data) | |
INTERNALERROR> | |
INTERNALERROR> def close_read(self): | |
INTERNALERROR> try: | |
INTERNALERROR> self.sock.shutdown(0) | |
INTERNALERROR> except self.execmodel.socket.error: | |
INTERNALERROR> pass | |
INTERNALERROR> def close_write(self): | |
INTERNALERROR> try: | |
INTERNALERROR> self.sock.shutdown(1) | |
INTERNALERROR> except self.execmodel.socket.error: | |
INTERNALERROR> pass | |
INTERNALERROR> | |
INTERNALERROR> def wait(self): | |
INTERNALERROR> pass | |
INTERNALERROR> | |
INTERNALERROR> def kill(self): | |
INTERNALERROR> pass | |
INTERNALERROR> | |
INTERNALERROR> try: execmodel | |
INTERNALERROR> except NameError: | |
INTERNALERROR> execmodel = get_execmodel('thread') | |
INTERNALERROR> io = SocketIO(clientsock, execmodel) | |
INTERNALERROR> io.write('1'.encode('ascii')) | |
INTERNALERROR> serve(io, id='socket=192.168.1.108:8888-slave')", line 1029, in executetask | |
INTERNALERROR> File "<string>", line 1, in do_exec | |
INTERNALERROR> File "<remote exec>", line 109, in <module> | |
INTERNALERROR> File "<remote exec>", line 104, in serve_rsync | |
INTERNALERROR> AttributeError: 'module' object has no attribute 'symlink' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment