Last active
February 26, 2018 04:44
-
-
Save tbnorth/e00e3e5d7864824059ea992637a936c5 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import namedtuple | |
Field = namedtuple("Field", 'name type') | |
# creates a `Field` class which has two fixed attributes, `name` and `type` | |
class TablesIndex: | |
"""TablesIndex - index of DB's and tables in an outline""" | |
def __init__(self, c): | |
"""initialize for c | |
:param outline c: Leo outline to scan | |
""" | |
self.c = c | |
self.db = {} | |
# a dict, keys are paths to DBs taken from @db3 nodes, values are | |
# dicts describing DBs, filled in by scan(). The dicts describing | |
# DBs have a `vnode` entry, and a `tables` entry, the latter being | |
# a list of table defs as returned by read_table_defs() | |
self.scan() | |
def nodes(self, prefix, root=None): | |
"""get descendants of root starting with prefix""" | |
# for a given root node starting point, return the descendant nodes | |
# which start with prefix. Leo lacks as true root node, | |
# c.rootPosition() is actually the first top level node, hence | |
# c.rootPosition().self_and_siblings_iter() | |
for child in root.children() if root else c.rootPosition().self_and_siblings_iter(): | |
if child.h.startswith(prefix): | |
yield child | |
else: | |
# search recursively | |
for i in self.nodes(prefix, root=child): | |
yield i | |
def read_fields(self, node): | |
"""read_fields - read fields from node, i.e. the | |
['one', 'two', ..., 'Extension'] | |
['TEXT', 'REAL', ..., 'TEXT'] | |
lines. | |
:param position node: node to scan | |
:return: list of Fields | |
""" | |
lines = node.b.split('\n') | |
if len(lines) < 2: | |
g.es("Bad @tbl node %s" % node.h) | |
return [] | |
names = eval(lines[0].strip()) # convert text to list by | |
types = eval(lines[1].strip()) # evaluation | |
return [ | |
Field(name=name, type=type) | |
for name, type in zip(names, types) | |
] | |
def read_table_defs(self, node): | |
"""read_table_defs - read @tbl nodes, return a dict with table | |
names for keys, and for values a dict containing `vnode`, the | |
@tbl node, and `fields`, a list of fields in the table. | |
Use vnodes instead of positions because vnodes are more durable, | |
i.e. the don't break when the outline changes, and can be converted | |
to positions just before use with c.vnode2position(v) | |
:param position node: parent of @tbl nodes | |
:return: dict of tables | |
""" | |
tbl = {} | |
for p in self.nodes('@tbl ', root=node): | |
tbl_name = p.h.split(None, 1)[1] | |
if tbl_name in tbl: | |
g.es("WARNING: duplicate definition of %s" % tbl_name) | |
else: | |
tbl[tbl_name] = {'vnode': p.v, 'fields': self.read_fields(p)} | |
return tbl | |
def scan(self): | |
"""scan - scan outline for DB / tables""" | |
for p in self.nodes('@db3 '): | |
db_name = p.h.split(None, 1)[1] | |
if db_name in self.db: | |
g.es("WARNING: duplicate definition of %s" % db_name) | |
else: | |
self.db[db_name] = { | |
'vnode': p.v, | |
'tables': self.read_table_defs(p) | |
} | |
def selected(self): | |
"""Return the selected db / table, or None, None""" | |
for db_path, db_def in self.db.items(): | |
for table_name, table_def in db_def['tables'].items(): | |
table_pos = c.vnode2position(table_def['vnode']) | |
if table_pos == c.p or table_pos.isAncestorOf(c.p): | |
return db_path, table_name | |
return None, None | |
index = TablesIndex(c) | |
g.es("the index, a dict of dicts") | |
g.es(index.db) | |
# "readable" view: | |
# note that there's only one db with one table here, but any number | |
# of both are supported | |
""" | |
{'/home/tsc/Desktop/blob_test.db3': {'tables': {'testorama': {'fields': [Field(name='one', type='TEXT'), | |
Field(name='two', type='REAL'), | |
Field(name='three', type='INTEGER'), | |
Field(name='PrimaryKey', type='INTEGER'), | |
Field(name='Blobs', type='BLOB'), | |
Field(name='Filename', type='TEXT'), | |
Field(name='Extension', type='TEXT')], | |
'vnode': <VNode 7f0969d2cb70 @tbl testorama>}}, | |
'vnode': <VNode 7f0969d2cac8 @db3 /home/tsc/Desktop/blob_test.db3>}} | |
""" | |
g.es("\nthe DBs in the outline") | |
g.es(list(index.db.keys())) | |
g.es("\nthe selected table, if any") | |
db_path, table_name = index.selected() | |
g.es(db_path, table_name) | |
if db_path is not None: | |
table = index.db[db_path]['tables'][table_name] | |
g.es("\nposition for selected table") | |
pos = c.vnode2position(table['vnode']) | |
g.es(pos) | |
g.es("\nfield names in selected table") | |
g.es([i.name for i in table['fields']]) | |
g.es("\nfield types in selected table") | |
g.es([i.type for i in table['fields']]) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment