Created
September 1, 2015 20:12
-
-
Save badocelot/0071a966425b7fca6e23 to your computer and use it in GitHub Desktop.
Toward an rdbms in Python...
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Heading (set): | |
def __init__(self, *attributes): | |
if len(attributes) == 1 \ | |
and isinstance(attributes[0], (list, tuple, set)): | |
attributes = attributes[0] | |
for att in attributes: | |
if not isinstance(att, str): | |
print(att) | |
raise TypeError("Attribute name must be string.") | |
if att == "": | |
raise ValueError("Attribute name must not be empty.") | |
super().__init__(attributes) | |
def __repr__(self): | |
if len(self) == 1: | |
return "Heading(%s)" % repr(tuple(x for x in self)[0]) | |
return "Heading%s" % repr(tuple(self)) | |
def __str__(self): | |
return "%s" % str(tuple(self)) | |
class Tuple (dict): | |
def __init__(self, values): | |
try: | |
self.__heading = Heading(*values.keys()) | |
except: | |
raise ValueError("invalid tuple heading") | |
for a in values: | |
self[a] = values[a] | |
@property | |
def heading(self): | |
return self.__heading | |
def __getattr__(self, attr): | |
return self[attr] | |
def __getstate__(self): | |
return dict(self) | |
def __setstate__(self, state): | |
for x in state: | |
self[x] = state[x] | |
self.__heading = Heading(*self.keys()) | |
def __repr__(self): | |
return "Tuple(%s)" % repr({x:self[x] for x in self}) | |
def __str__(self): | |
return str({x:self[x] for x in self}) | |
def Tuples(*tuples): | |
return tuple(Tuple(t) for t in tuples) | |
class Relation: | |
def __init__(self, heading, tuples): | |
for t in tuples: | |
if t.heading != heading: | |
raise ValueError("invalid tuple: %s" % repr(t)) | |
self.__heading = Heading(heading) | |
self.__tuples = tuple(t for t in tuples) | |
@property | |
def heading(self): | |
return self.__heading | |
@property | |
def tuples(self): | |
return self.__tuples | |
def union(self, other): | |
if other.heading != self.heading: | |
raise TypeError("incompatible headings") | |
new_tuples = list(self.tuples) | |
for t in other.tuples: | |
if t not in new_tuples: | |
new_tuples.append(t) | |
return Relation(self.heading, tuple(new_tuples)) | |
def intersection(self, other): | |
if other.heading != self.heading: | |
raise TypeError("incompatible headings") | |
new_tuples = [] | |
for t in self.tuples: | |
if t in other.tuples: | |
new_tuples.append(t) | |
return Relation(self.heading, tuple(new_tuples)) | |
def difference(self, other): | |
if other.heading != self.heading: | |
raise TypeError("incompatible headings") | |
new_tuples = [] | |
for t in self.tuples: | |
if t not in other.tuples: | |
new_tuples.append(t) | |
return Relation(self.heading, tuple(new_tuples)) | |
def projection(self, new_heading): | |
for att in new_heading: | |
if att not in self.heading: | |
raise TypeError("invalid heading") | |
new_tuples = [] | |
for t in self.tuples: | |
new_t = {} | |
for att in new_heading: | |
new_t[att] = t[att] | |
new_t = Tuple(new_t) | |
if new_t not in new_tuples: | |
new_tuples.append(new_t) | |
return Relation(new_heading, tuple(new_tuples)) | |
def restriction(self, condfn): | |
new_tuples = [] | |
for t in self.tuples: | |
if condfn(t): | |
new_tuples.append(t) | |
return Relation(self.heading, tuple(new_tuples)) | |
def rename(self, old_attr, new_attr): | |
if new_attr in self.heading: | |
raise ValueError("attribute name exists") | |
new_heading = [] | |
for a in self.heading: | |
if a == old_attr: | |
new_heading.append(new_attr) | |
else: | |
new_heading.append(a) | |
new_heading = Heading(*new_heading) | |
new_tuples = [] | |
for t in self.tuples: | |
new_t = {} | |
for a in t: | |
if a == old_attr: | |
new_t[new_attr] = t[a] | |
else: | |
new_t[a] = t[a] | |
new_t = Tuple(new_t) | |
new_tuples.append(new_t) | |
return Relation(new_heading, tuple(new_tuples)) | |
def cross_join(self, other): | |
x = self | |
y = other | |
while len(x.heading.intersection(y.heading)) > 0: | |
for a in x.heading: | |
if a in other.heading: | |
x = x.rename(a, a + " 1") | |
y = y.rename(a, a + " 2") | |
new_heading = Heading(x.heading.union(y.heading)) | |
new_tuples = [] | |
for t in x.tuples: | |
for u in y.tuples: | |
new_t = {} | |
for a in t.heading: | |
new_t[a] = t[a] | |
for a in u.heading: | |
new_t[a] = u[a] | |
new_t = Tuple(new_t) | |
new_tuples.append(new_t) | |
return Relation(new_heading, tuple(new_tuples)) | |
def natural_join(self, other): | |
new_heading = Heading(self.heading.union(other.heading)) | |
overlap = self.heading.intersection(other.heading) | |
new_tuples = [] | |
for t in self.tuples: | |
for u in other.tuples: | |
match = True | |
for a in overlap: | |
if t[a] != u[a]: | |
match = False | |
break | |
if match: | |
new_t = {} | |
for a in self.heading: | |
new_t[a] = t[a] | |
for a in other.heading: | |
new_t[a] = u[a] | |
new_t = Tuple(new_t) | |
if new_t not in new_tuples: | |
new_tuples.append(new_t) | |
return Relation(new_heading, tuple(new_tuples)) | |
def join (self, other, wherefn): | |
x = self | |
y = other | |
while len(x.heading.intersection(y.heading)) > 0: | |
for a in x.heading: | |
if a in other.heading: | |
x = x.rename(a, a + " 1") | |
y = y.rename(a, a + " 2") | |
new_heading = Heading(x.heading.union(y.heading)) | |
new_tuples = [] | |
for i in range(len(self.tuples)): | |
t1 = self.tuples[i] | |
t2 = x.tuples[i] | |
for j in range(len(other.tuples)): | |
u1 = other.tuples[j] | |
u2 = y.tuples[j] | |
if wherefn(t1, u1): | |
new_t = {} | |
for a in x.heading: | |
new_t[a] = t2[a] | |
for a in y.heading: | |
new_t[a] = u2[a] | |
new_t = Tuple(new_t) | |
new_tuples.append(new_t) | |
return Relation(new_heading, tuple(new_tuples)) | |
def __getstate__(self): | |
return {"heading": self.heading, "tuples": self.tuples} | |
def __setstate__(self, state): | |
self.__heading = state["heading"] | |
self.__tuples = state["tuples"] | |
def __getattr__(self, attr): | |
return self.__getattribute__(attr) | |
def __len__(self): | |
return len(self.tuples) | |
def __repr__(self): | |
return "Relation(%s, %s)" % (repr(self.heading), repr(self.tuples)) | |
def __str__(self): | |
return "\n".join(tuple(str(t) for t in self.tuples)) | |
class Relvar: | |
def __init__(self, name, heading, tuples): | |
self.__relation = Relation(heading, tuples) | |
self.__name = name | |
@property | |
def heading(self): | |
return self.__relation.heading | |
@property | |
def name(self): | |
return self.__name | |
@property | |
def relation(self): | |
return self.__relation | |
@property | |
def tuples(self): | |
return self.__relation.tuples | |
def insert(self, t): | |
if t.heading != self.heading: | |
raise TypeError("incompatible headings") | |
self.__relation = self.relation.union(Relation(t.heading, (t,))) | |
def insert_many(self, tuples): | |
for t in tuples: | |
if t.heading != self.heading: | |
raise TypeError("incompatible headings") | |
self.__relation = self.relation.union(Relation(t.heading, tuples)) | |
def remove(self, t): | |
if t.heading != self.heading: | |
raise TypeError("incompatible headings") | |
self.__relation = self.relation.difference(Relation(t.heading, (t,))) | |
def remove_where(self, wherefn): | |
self.__relation = self.relation.difference( | |
self.relation.restriction(wherefn)) | |
def update(self, old_tuple, new_tuple): | |
if old_tuple not in self.tuples: | |
raise ValueError("tuple does not exist in relation") | |
if new_tuple.heading != self.heading: | |
raise TypeError("incompatible headings") | |
self.__relation = self.relation.difference( | |
Relation(self.heading, (old_tuple,))).union( | |
Relation(self.heading, (new_tuple,))) | |
def update_many(self, old_tuples, new_tuples): | |
for old in old_tuples: | |
if old not in self.tuples: | |
raise ValueError("tuple does not exist in relation") | |
for new in new_tuples: | |
if new.heading != self.heading: | |
raise TypeError("incompatible headings") | |
self.__relation = self.relation.difference( | |
Relation(self.heading, old_tuples)).union( | |
Relation(self.heading, new_tuples)) | |
def update_where(self, wherefn): | |
class Setter: | |
def set(self2, attr, value): | |
if attr not in self.heading: | |
raise ValueError("invalid attribute") | |
old_tuples = self.relation.restriction(wherefn).tuples | |
new_tuples = [] | |
for t in old_tuples: | |
new_t = {} | |
for a in t.heading: | |
if a == attr: | |
new_t[a] = value | |
else: | |
new_t[a] = t[a] | |
new_t = Tuple(new_t) | |
new_tuples.append(new_t) | |
self.update_many(old_tuples, new_tuples) | |
return self2 | |
return Setter() | |
@staticmethod | |
def from_relation(name, rel): | |
return Relvar(name, rel.heading, rel.tuples) | |
@staticmethod | |
def load(filename): | |
import pickle | |
f = open(filename, "rb") | |
r = pickle.load(f) | |
f.close() | |
return r | |
def save(self, filename): | |
import pickle | |
f = open(filename, "wb") | |
pickle.dump(self, f) | |
f.close() | |
def __getattr__(self, attr): | |
return self.__relation.__getattr__(attr) | |
def __getstate__(self): | |
return {"name": self.name, "relation": self.relation} | |
def __setstate__(self, state): | |
self.__name = state["name"] | |
self.__relation = state["relation"] | |
def __len__(self): | |
return len(self.tuples) | |
def __repr__(self): | |
return "Relvar(%s, %s, %s)" % (repr(self.name), repr(self.heading), | |
repr(self.tuples)) | |
def __str__(self): | |
return self.name + ":\n" + "\n".join(tuple(str(t) for t in self.tuples)) | |
# Test data | |
A = Relvar("A", Heading("ID"), Tuples({"ID": 1}, {"ID": 2})) | |
S = Relvar("S", Heading("SNO", "SNAME", "STATUS", "CITY"), ( | |
Tuple({"SNO": "S1", "SNAME": "Clark", "STATUS": 10, "CITY": "London"}), | |
Tuple({"SNO": "S2", "SNAME": "Duncan", "STATUS": 20, "CITY": "Athens"}), | |
Tuple({"SNO": "S3", "SNAME": "Michael", "STATUS": 30, "CITY": "London"}))) | |
SP = Relvar("SP", Heading("SNO", "PNO", "QTY"), ( | |
Tuple({"SNO": "S1", "PNO": "P1", "QTY": 20}), | |
Tuple({"SNO": "S1", "PNO": "P2", "QTY": 10}), | |
Tuple({"SNO": "S2", "PNO": "P1", "QTY": 5}), | |
Tuple({"SNO": "S3", "PNO": "P2", "QTY": 10}))) | |
TABLE_DUM = Relation(Heading(), ()) | |
TABLE_DEE = Relation(Heading(), (Tuple({}),)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
If the TABLE_DUM and TABLE_DEE didn't give it away, this is heavy inspired by C.J. Date's work.