Skip to content

Instantly share code, notes, and snippets.

@amcgregor
Created January 4, 2012 22:48
Show Gist options
  • Save amcgregor/1562635 to your computer and use it in GitHub Desktop.
Save amcgregor/1562635 to your computer and use it in GitHub Desktop.
An example combining nested sets with adjacency lists using SQLAlchemy+Elixir.
# encoding: utf-8
from uuid import uuid4
from elixir import *
from elixir.events import *
from sqlalchemy import UniqueConstraint
from sqlalchemy.sql import and_
class Asset(Entity):
using_options(tablename='assets', inheritance='multi', polymorphic='kind', order_by='l')
using_table_options(UniqueConstraint('parent_guid', 'name'))
guid = Field(String(36), default=lambda: str(uuid4()), primary_key=True)
l, r = Field(Integer, default=0), Field(Integer, default=0)
parent = ManyToOne('Asset')
name = Field(String(250), required=True)
# ...
# Queries
# This shows how adjacency lists and nested sets can work together.
ancestors = property(lambda self: Asset.query.filter(Asset.r > self.r).filter(Asset.l < self.l),
doc="Return all ancestors of this asset.")
children = property(lambda self: Asset.query.filter_by(parent=self))
descendants = property(lambda self: Asset.query.filter(Asset.r < self.r).filter(Asset.l > self.l),
doc="Return all descendants of this asset.")
depth = property(lambda self: self.ancestors.count() + 1,
doc="Return the current asset's depth in the tree.")
siblings = property(lambda self: (self.parent.children.filter(Asset.r < self.r), self.parent.children.filter(Asset.l > self.l)),
doc="Return two lists; the first are siblings to the left, the second, to the right.")
@property
def path(self):
"Return the full path to this asset."
ancestors = self.ancestors.order_by(Asset.l)
value = "" if not self.parent else "/" + "/".join([i.name for i in ancestors[1:]] + [self.name])
_paths[self.guid] = value, self.modified
return value
@classmethod
def stargate(cls, left=None, right=None, value=2, both=None):
"""Open a hole in the left/right structure. Alternatively, with a negative value, close a hole."""
if both:
Asset.table.update(both, values=dict(l = Asset.l + value, r = Asset.r + value)).execute()
if left:
Asset.table.update(left, values=dict(l = Asset.l + value)).execute()
if right:
Asset.table.update(right, values=dict(r = Asset.r + value)).execute()
session.commit()
# Expire the cache of the l and r columns for every Asset.
[session.expire(obj, ['l', 'r']) for obj in session if isinstance(obj, Asset)]
def attach(self, node, after=True, below=True):
"""Attach an object as a child or sibling of the current object."""
session.commit()
# print "Attaching %r to %r (%s and %s)" % (node, self, "after" if after else "before", "below" if below else "level with")
if self is node:
raise Exception, "You can not attach a node to itself."
if node in self.ancestors.all():
raise Exception, "Infinite loops give coders headaches. Putting %r inside %r is a bad idea." % (node, self)
if node.l and node.r:
# Run some additional integrity checks before modifying the database.
assert node.l < node.r, "This object can not be moved as its positional relationship information is corrupt."
assert node.descendants.count() == ( node.r - node.l - 1 ) / 2, "This node is missing descendants and can not be moved."
count = ( 1 + node.descendants.count() ) * 2
try:
if below:
if after: self.stargate(Asset.l >= self.r, Asset.r >= self.r, count)
else: self.stargate(Asset.l > self.l, Asset.r > self.l, count)
else:
if after: self.stargate(Asset.l > self.r, Asset.r > self.r, count)
else: self.stargate(Asset.l >= self.l, Asset.r >= self.l, count)
if not node.l or not node.r:
# This node is currently unassigned and/or corrupt.
if below:
if after: node.l, node.r = self.r - 2, self.r - 1
else: node.l, node.r = self.l + 1, self.l + 2
node.parent = self
node.depth = self.depth + 1
else:
if after: node.l, node.r = self.r + 1, self.r + 2
else: node.l, node.r = self.l - 2, self.l - 1
node.parent = self.parent
node.depth = self.depth
session.commit()
return
# This node was already placed in the tree and needs to be moved. How far?
if below:
if after: delta = self.r - node.r - 1
else: delta = self.l - node.l + 1
else:
if after: delta = self.r - node.r + 2
else: delta = self.l - node.l - 2
# Migrate the node and its ancestors to its new location.
hole = node.l
self.stargate(value=delta, both=and_(Asset.l >= node.l, Asset.r <= node.r))
# Close the resulting hole.
self.stargate(Asset.l >= hole, Asset.r >= hole, -count)
if below: node.parent = self
except:
session.rollback()
raise
else:
session.commit()
import model
class Foo(object):
def nest(self, parent, n, l=0):
o = n
parent.l = o
for m, i in enumerate(parent.children):
o = self.nest(i, o + 1, l+1)
parent.r = o + 1
print (" " * l) + "Parent l=%d, r=%d, return=%d\t%r" % (parent.l, parent.r, o + 1, parent)
return o + 1
try:
a = Foo()
root = model.Asset.query.filter_by(l=1).one()
a.nest(root, 1)
model.DBSession.commit()
except:
print "!!! There was an error recreating the nested set structure."
print "I hope you have backups."
model.DBSession.rollback()
raise
else:
print "Nested set structure recreated successfully."
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment