Last active
August 3, 2016 17:46
-
-
Save isaacl/9226812 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys,os | |
sys.path.insert(0, os.environ.get('GOOGLE_APPENGINE_SDK_PATH')) | |
sys.path.insert(0, os.environ.get('GOOGLE_APPENGINE_NDB_PATH')) | |
import dev_appserver | |
dev_appserver.fix_sys_path() | |
from google.appengine.ext import testbed | |
t = testbed.Testbed() | |
os.environ['APPLICATION_ID'] = 'foo' | |
t.activate() | |
t.init_datastore_v3_stub(use_sqlite=True, datastore_file='100GzipText.sqlite') | |
t.init_memcache_stub() | |
import ndb | |
class Entity(ndb.Model): | |
val = ndb.TextProperty(compressed=True) | |
for i in range(100): | |
e = Entity() | |
e.val = 'a' * 10000 | |
e.put() | |
t.deactivate() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gc | |
import inspect | |
import logging | |
import os | |
import pprint | |
import sys | |
import time | |
import weakref | |
from pprint import pprint as pp | |
logging.basicConfig(level=5, | |
format='%(asctime)s %(levelname)-8s %(message)s', | |
datefmt='%H:%M:%S', | |
filename='/tmp/ndb.%d.log' % int(time.time()), | |
filemode='w') | |
sys.path.insert(0, os.environ.get('GOOGLE_APPENGINE_SDK_PATH')) | |
sys.path.insert(0, os.environ.get('GOOGLE_APPENGINE_NDB_PATH')) | |
import dev_appserver | |
dev_appserver.fix_sys_path() | |
from google.appengine.ext import testbed | |
t = testbed.Testbed() | |
os.environ['APPLICATION_ID'] = 'foo' | |
t.activate() | |
t.init_datastore_v3_stub(use_sqlite=True, datastore_file='100GzipText.sqlite') | |
t.init_memcache_stub() | |
import ndb | |
class Entity(ndb.Model): | |
val = ndb.TextProperty(compressed=True) | |
wd = weakref.WeakValueDictionary() | |
ndb.get_context().set_cache_policy(False) | |
ndb.get_context().set_memcache_policy(False) | |
# UNCOMMENT TO USE HEAPY | |
# Some info here: http://smira.ru/wp-content/uploads/2011/08/heapy.html | |
# from guppy import hpy | |
# hp = hpy() | |
# hp.setref() | |
def fetch_entities(paged=True, batch_size=10): | |
if paged: | |
def fetcher(): | |
cursor = None | |
while True: | |
entities, cursor, more = Entity.query().fetch_page(batch_size, start_cursor=cursor) | |
for e in entities: | |
yield e | |
if not more: | |
return | |
fetch_it = fetcher() | |
else: | |
fetch_it = Entity.query().iter(batch_size=batch_size) | |
total_txt = 0 | |
for i, entity in enumerate(fetch_it): | |
wd[i] = entity | |
total_txt += len(entity.val) | |
# if i % (batch_size // 2) == batch_size//4: | |
# gc.collect() | |
# if len(wd) > batch_size * 2: | |
# break | |
gc.collect() | |
return i, total_txt | |
# HEAPY HELPERS | |
def ppn(es): | |
ans = list(es.nodes) | |
for e in ans[:10]: | |
print pprint.pformat(e)[:150] | |
if len(ans) > 10: | |
print '...', len(ans) - 10 | |
def g1(es): | |
return iter(es.nodes).next() | |
def print_gens(gens): | |
return pp([(g.gi_code, | |
[t for t in inspect.getmembers(g.gi_frame) | |
if t[0] in ('f_locals', 'f_lineno')]) | |
for g in gens.nodes]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/ndb/context.py b/ndb/context.py | |
index 00fdd36..b041157 100644 | |
--- a/ndb/context.py | |
+++ b/ndb/context.py | |
@@ -826,6 +826,7 @@ class Context(object): | |
else: | |
val = callback(ent) | |
mfut.putq(val) | |
+ batch = None | |
except GeneratorExit: | |
raise | |
except Exception, err: | |
diff --git a/ndb/eventloop.py b/ndb/eventloop.py | |
index 468d37f..2571097 100644 | |
--- a/ndb/eventloop.py | |
+++ b/ndb/eventloop.py | |
@@ -42,7 +42,7 @@ class EventLoop(object): | |
self.idlers = collections.deque() # Cyclic list of (callback, args, kwds) | |
self.inactive = 0 # How many idlers in a row were no-ops | |
self.queue = [] # Sorted list of (time, callback, args, kwds) | |
- self.rpcs = {} # Map of rpc -> (callback, args, kwds) | |
+ self.rpcs = collections.OrderedDict() # Map of rpc -> (callback, args, kwds) | |
def clear(self): | |
"""Remove all pending events without running any.""" | |
@@ -108,6 +108,7 @@ class EventLoop(object): | |
NOTE: If the rpc is a MultiRpc, the callback will be called once | |
for each sub-RPC. TODO: Is this a good idea? | |
""" | |
+ logging.warning('QUEUE_RPC %r', rpc) | |
if rpc is None: | |
return | |
if rpc.state not in (_RUNNING, _FINISHING): | |
@@ -129,6 +130,7 @@ class EventLoop(object): | |
rpcs = [rpc] | |
for rpc in rpcs: | |
self.rpcs[rpc] = (callback, args, kwds) | |
+ import pprint; logging.warning('RPCS now: %s', pprint.pformat(self.rpcs.values())) | |
def add_idle(self, callback, *args, **kwds): | |
"""Add an idle callback. | |
@@ -194,7 +196,7 @@ class EventLoop(object): | |
return 0 | |
if self.rpcs: | |
self.inactive = 0 | |
- rpc = datastore_rpc.MultiRpc.wait_any(self.rpcs) | |
+ rpc = datastore_rpc.MultiRpc.wait_any(reversed(self.rpcs)) | |
if rpc is not None: | |
_logging_debug('rpc: %s.%s', rpc.service, rpc.method) | |
# Yes, wait_any() may return None even for a non-empty argument. | |
@@ -203,6 +205,7 @@ class EventLoop(object): | |
raise RuntimeError('rpc %r was not given to wait_any as a choice %r' % | |
(rpc, self.rpcs)) | |
callback, args, kwds = self.rpcs[rpc] | |
+ logging.warning('RPC FETCHED: %s.%s, %r', rpc.service, rpc.method, callback) | |
del self.rpcs[rpc] | |
if callback is not None: | |
callback(*args, **kwds) | |
diff --git a/ndb/query.py b/ndb/query.py | |
index 11e29c1..ed7a425 100644 | |
--- a/ndb/query.py | |
+++ b/ndb/query.py | |
@@ -939,6 +939,7 @@ class Query(object): | |
rpc = batch.next_batch_async(options) | |
for i, result in enumerate(batch.results): | |
queue.putq((batch, i, result)) | |
+ batch = None | |
queue.complete() | |
except GeneratorExit: | |
diff --git a/ndb/tasklets.py b/ndb/tasklets.py | |
index 3214def..724cec9 100644 | |
--- a/ndb/tasklets.py | |
+++ b/ndb/tasklets.py | |
@@ -59,12 +59,13 @@ implementing an interface that expects tasklets but you have no need to | |
suspend -- there's no need to insert a dummy yield in order to make | |
the tasklet into a generator. | |
""" | |
- | |
+import inspect | |
import collections | |
import logging | |
import os | |
import sys | |
import types | |
+import weakref | |
from .google_imports import apiproxy_stub_map | |
from .google_imports import apiproxy_rpc | |
@@ -103,7 +104,7 @@ class _State(utils.threading_local): | |
def __init__(self): | |
super(_State, self).__init__() | |
- self.all_pending = set() | |
+ self.all_pending = weakref.WeakSet() | |
def add_pending(self, fut): | |
_logging_debug('all_pending: add %s', fut) | |
@@ -768,14 +769,29 @@ class SerialQueueFuture(Future): | |
If, instead of complete(), set_exception() is called, the exception | |
and traceback set there will be used instead of EOFError. | |
""" | |
+ i = 0 | |
def __init__(self, info=None): | |
+ self.id = SerialQueueFuture.i | |
+ SerialQueueFuture.i+=1 | |
+ | |
self._full = False | |
self._queue = collections.deque() | |
self._waiting = collections.deque() | |
super(SerialQueueFuture, self).__init__(info=info) | |
# TODO: __repr__ | |
+ def __repr__(self): | |
+ line = '?' | |
+ for line in self._where: | |
+ if 'tasklets.py' not in line: | |
+ break | |
+ if self._info: | |
+ line += ' for %s' % self._info | |
+ if self._geninfo: | |
+ line += ' %s' % self._geninfo | |
+ return '<%d %s %x created by %s>' % ( | |
+ self.id, self.__class__.__name__, id(self), line) | |
def complete(self): | |
if self._full: | |
@@ -795,6 +811,7 @@ class SerialQueueFuture(Future): | |
waiter.set_exception(exc, tb) | |
def putq(self, value): | |
+ logging.warning('PUTQ %r, val %r, caller: %s', self, value, inspect.stack()[1][3]) | |
if isinstance(value, Future): | |
fut = value | |
else: | |
@@ -819,6 +836,7 @@ class SerialQueueFuture(Future): | |
self._queue.append(fut) | |
def getq(self): | |
+ logging.warning('GETQ %r queue %d wait %d, caller %s', self, len(self._queue), len(self._waiting), inspect.stack()[1][3]) | |
if self._queue: | |
fut = self._queue.popleft() | |
# TODO: Isn't it better to call self.set_result(None) in complete()? |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment