Skip to content

Instantly share code, notes, and snippets.

@gregorynicholas
Forked from someone1/delete_tests.py
Created March 15, 2013 21:15
Show Gist options
  • Save gregorynicholas/5173149 to your computer and use it in GitHub Desktop.
Save gregorynicholas/5173149 to your computer and use it in GitHub Desktop.
appengine NDB delete tests
from google.appengine.ext.ndb import model, context, tasklets
from random import choice
import string
from werkzeug import Response
class Items(model.Model):
user = model.StringProperty()
@classmethod
@tasklets.tasklet
def delete_all_for_user(cls, user):
yield model.delete_multi_async(cls.query(cls.user == user).fetch(keys_only=True, batch_size=100))
class Config(model.Model):
user = model.StringProperty()
stat_name = model.StringProperty()
@classmethod
@tasklets.tasklet
def delete_all_for_user(cls, user):
qry = cls.query(cls.user == user)
yield qry.map_async(Stats.delete_all_for_config_callback, batch_size=100)
class Stats(model.Model):
name = model.StringProperty()
@classmethod
@tasklets.tasklet
def delete_all_for_config_callback(cls, ent):
qry = cls.query(cls.name == ent.stat_name)
#f = qry.map_async(lambda k: k.delete_async(), keys_only=True, batch_size=100)
f = model.delete_multi_async(qry.fetch(keys_only=True, batch_size=100))
yield f, ent.key.delete_async()
@context.toplevel
def test_delete_view(request):
user = 'Test_User'
if 'test' in request.args:
yield Items.delete_all_for_user(user), Config.delete_all_for_user(user)
else:
#Setup
model.put_multi([Items(user=user) for _ in xrange(200)])
for _ in xrange(50):
conf = Config(user=user)
conf.stat_name = "".join([choice(string.ascii_letters) for _ in xrange(10)])
model.put_multi([conf] + [Stats(name=conf.stat_name) for _ in xrange(10)])
raise tasklets.Return(Response('Done'))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment