Last active
December 27, 2015 06:29
-
-
Save MMcM/7281756 to your computer and use it in GitHub Desktop.
Using FDB SQL STORAGE_FORMAT to allow read access from Python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from google.protobuf import descriptor | |
from google.protobuf import descriptor_pb2 | |
from google.protobuf import descriptor_pool | |
from google.protobuf import message | |
from google.protobuf import reflection | |
import fdb | |
from directory import directory | |
# https://github.com/FoundationDB/sql-layer/blob/master/src/main/protobuf/akiban_information_schema.proto | |
from akiban_information_schema_pb2 import AkibanInformationSchema | |
# https://github.com/FoundationDB/sql-layer/blob/master/src/main/protobuf/common_storage_formats.proto | |
from common_storage_formats_pb2 import protobuf_row | |
# https://github.com/FoundationDB/sql-layer/blob/master/src/main/protobuf/sql_custom_options.proto | |
import sql_custom_options_pb2 | |
import struct | |
class RowParserFactory(object): | |
"""When protobuf is used as the row format, the FDB SQL Layer | |
saves the generated descriptors in a special directory, keyed by | |
the group name. | |
This loads those descriptors, rendezvouses them with their | |
dependencies, and returns functions that can be used to parse the | |
value byte string into the row message. | |
Unless no_group was chosen as a storage option, the row message | |
will be the group message, which has one optional field for each | |
table type in the group. Only one is populated at any time. | |
""" | |
def __init__(self): | |
self._pool = descriptor_pool.DescriptorPool() | |
file_proto = descriptor_pb2.FileDescriptorProto() | |
descriptor_pb2.DESCRIPTOR.CopyToProto(file_proto) | |
self._pool.Add(file_proto) | |
file_proto = descriptor_pb2.FileDescriptorProto() | |
sql_custom_options_pb2.DESCRIPTOR.CopyToProto(file_proto) | |
self._pool.Add(file_proto) | |
self._pbdir = None | |
self._aises = {} | |
def class_for_message_type(self, descriptor): | |
"""reflect.ParseMessage makes a new class every time and does | |
not arrange for field message types to get associated with | |
their classes. | |
So we duplicate some of what it does inside. | |
""" | |
class _MessageClass(message.Message): | |
__metaclass__ = reflection.GeneratedProtocolMessageType | |
DESCRIPTOR = descriptor | |
return _MessageClass | |
def class_for_row(self, file_desc): | |
"""Build classes for all the message types in the file and | |
return the one marked as the group message or else the only | |
one for the single table case. | |
""" | |
group_class = None | |
row_class = None | |
for descriptor in file_desc.message_types_by_name.values(): | |
row_class = self.class_for_message_type(descriptor) | |
table_options = descriptor.GetOptions().Extensions[sql_custom_options_pb2.TableOptions.fdbsql] | |
if not table_options is None and table_options.is_group: | |
group_class = row_class | |
return group_class or row_class | |
@fdb.transactional | |
def file_desc_from_ais(self, tr, schema_name, table_name): | |
ais = self._aises.get(schema_name) | |
if ais is None: | |
if self._pbdir is None: | |
self._pbdir = directory.open(tr, (u'sql', u'schemaManager', u'protobuf')) | |
pbais = tr[self._pbdir.pack((schema_name,))] | |
if pbais is None: | |
raise Exception("Schema not found") | |
pblen = struct.unpack('>i', pbais[0:4])[0] | |
assert len(pbais) == 4 + pblen | |
ais = AkibanInformationSchema.FromString(pbais[4:]) | |
self._aises[schema_name] = ais | |
for g in ais.schemas[0].groups: | |
if g.rootTableName == table_name: | |
if not g.storage.HasExtension(protobuf_row): | |
raise Exception("Group is not using Protobuf Buffers") | |
return g.storage.Extensions[protobuf_row].file_descriptor | |
raise Exception("Table not found") | |
@fdb.transactional | |
def file_desc_for_group(self, tr, schema_name, table_name): | |
file_proto = self.file_desc_from_ais(tr, schema_name, table_name) | |
self._pool.Add(file_proto) | |
return self._pool.FindFileByName(file_proto.name) | |
@fdb.transactional | |
def parser_for_group(self, tr, schema_name, table_name): | |
"""Return a function that takes the byte string for a protobuf | |
row and returns the row instance. | |
""" | |
file_desc = self.file_desc_for_group(tr, schema_name, table_name) | |
row_class = self.class_for_row(file_desc) | |
def _parser(str): | |
row = row_class() | |
row.ParseFromString(str) | |
return row | |
return _parser |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import fdb | |
from directory import directory | |
db = fdb.open() | |
tdir = directory.open(db, (u'sql',u'data',u'table',u'test',u't1')) | |
idir = tdir.open(db, u't1_s') | |
# Key is tuple of hkey (1, id). Value is tuple of row (id, s). | |
for kv in db[tdir.range()]: | |
print tdir.unpack(kv.key), fdb.tuple.unpack(kv.value) | |
for kv in db[slice(idir.pack((u'A',)),idir.pack((u'E',)))]: | |
# Key is (s, id). Turn into hkey. | |
print fdb.tuple.unpack(db[tdir.pack((1, idir.unpack(kv.key)[1]))]) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
DROP TABLE IF EXISTS t1; | |
CREATE TABLE t1(id INT PRIMARY KEY NOT NULL, s VARCHAR(16)) STORAGE_FORMAT tuple; | |
INSERT INTO t1 VALUES(1,'Fred'),(2,'Wilma'),(3,'Barney'),(4,'Betty'); | |
CREATE INDEX t1_s ON t1(s) STORAGE_FORMAT tuple; | |
SELECT * FROM t1 WHERE s BETWEEN 'A' AND 'E'; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import fdb | |
from directory import directory | |
from fdb_sql_protobuf import RowParserFactory | |
db = fdb.open() | |
parser = RowParserFactory().parser_for_group(db, u'test', u't2') | |
tdir = directory.open(db, (u'sql',u'data',u'table',u'test',u't2')) | |
idir = tdir.open(db, u't2_s') | |
@fdb.transactional | |
def index_lookup(tr,lo,hi): | |
# Initiate futures for main table from index all at once. | |
vals = [tr[tdir.pack((1, idir.unpack(kv.key)[1]))] | |
for kv in tr[idir.pack((lo,)):idir.pack((hi,))]] | |
return [parser(val).t2.s for val in vals] | |
print index_lookup(db, u'M',u'Z') |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
DROP TABLE IF EXISTS t2; | |
CREATE TABLE t2(id INT PRIMARY KEY NOT NULL, s VARCHAR(16)) STORAGE_FORMAT protobuf; | |
INSERT INTO t2 VALUES(1,'Fred'),(2,'Wilma'),(3,'Barney'),(4,'Betty'); | |
CREATE INDEX t2_s ON t2(s) STORAGE_FORMAT tuple; | |
SELECT * FROM t2 WHERE s BETWEEN 'A' and 'E'; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment