Last active
August 28, 2019 15:06
-
-
Save say4n/4554c9f370b1cf545f1b34c57b49575a to your computer and use it in GitHub Desktop.
a simple relational algebra emulator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #! /usr/bin/env python3 | |
| class Relation: | |
| def __init__(self, *, attributes=None, name=None): | |
| self.attributes = attributes | |
| self.name = name | |
| self.rows = [] | |
| def insert(self, row): | |
| self.rows += [row] | |
| def __repr__(self): | |
| ret = [] | |
| ret += f"{self.name} :=" + "\n" | |
| ret += "⎽" * (8 * len(self.attributes) - 4) + "\n" | |
| ret += "\t".join(self.attributes) + "\n" | |
| ret += "⎺" * (8 * len(self.attributes) - 4) + "\n" | |
| for row in self.rows: | |
| ret += "\t".join(map(lambda r: str(r), row)) + "\n" | |
| ret += "⎼" * (8 * len(self.attributes) - 4) + "\n" | |
| return "".join(ret) | |
| class Query: | |
| def __init__(self, name): | |
| self.name = name | |
| def product(self, Rx, Ry): | |
| attributes = Rx.attributes + Ry.attributes | |
| r = Relation(attributes=attributes, name=f"{Rx.name} ⨯ {Ry.name}") | |
| for rowX in Rx.rows: | |
| for rowY in Ry.rows: | |
| row = rowX + rowY | |
| r.insert(row) | |
| return r | |
| def join(self, Rx, Ry): | |
| common_attribute = list(set(Rx.attributes).intersection(Ry.attributes)) | |
| attributes = Rx.attributes + \ | |
| [row for row in Ry.attributes if row not in common_attribute] | |
| r = Relation(attributes=attributes, name=f"{Rx.name} ⋈ {Ry.name}") | |
| if len(common_attribute) > 0: | |
| # shared attributes exist, proceed with join | |
| primary_keyX = Rx.attributes.index(common_attribute[0]) | |
| primary_keyY = Ry.attributes.index(common_attribute[0]) | |
| for rowX in Rx.rows: | |
| for rowY in Ry.rows: | |
| if rowX[primary_keyX] == rowY[primary_keyY]: | |
| row = rowX + rowY[:primary_keyY] + \ | |
| rowY[primary_keyY + 1:] | |
| r.insert(row) | |
| return r | |
| else: | |
| return r | |
| def project(self, R, attribs): | |
| a_str = ", ".join(attribs) | |
| r = Relation(attributes=attribs, name=f"π({R.name}, ({a_str}))") | |
| selected_indices = [R.attributes.index(attrib) for attrib in attribs] | |
| for row in R.rows: | |
| projected_row = [] | |
| for index in selected_indices: | |
| projected_row.append(row[index]) | |
| if len(projected_row) > 0: | |
| r.insert(projected_row) | |
| return r | |
| def select(self, R, conditions): | |
| c_str = " ∧ ".join("".join(c) for c in conditions) | |
| r = Relation(attributes=R.attributes, name=f"σ({R.name}, {c_str})") | |
| for row in R.rows: | |
| all_c = True | |
| for condition in conditions: | |
| attribX = R.attributes.index(condition[0]) | |
| attribY = R.attributes.index(condition[2]) | |
| op = condition[1] | |
| if op == ">": | |
| if not row[attribX] > row[attribY]: | |
| all_c = False | |
| break | |
| elif op == ">=": | |
| if not row[attribX] >= row[attribY]: | |
| all_c = False | |
| break | |
| elif op == "<": | |
| if not row[attribX] < row[attribY]: | |
| all_c = False | |
| break | |
| elif op == "<=": | |
| if not row[attribX] <= row[attribY]: | |
| all_c = False | |
| break | |
| elif op == "=": | |
| if not row[attribX] == row[attribY]: | |
| all_c = False | |
| break | |
| elif op == "<>": | |
| if not row[attribX] != row[attribY]: | |
| all_c = False | |
| break | |
| else: | |
| raise SyntaxError(f"Invalid comparator '{op}'") | |
| if all_c: | |
| r.insert(row) | |
| return r | |
| if __name__ == "__main__": | |
| r1 = Relation(name="R1", attributes=["ID", "SS", "DC", "EMT"]) | |
| r1.insert(["AA", 67, 72, 96]) | |
| r1.insert(["BB", 84, 96, 48]) | |
| r1.insert(["CC", 56, 72, 67]) | |
| print("Relation 1:") | |
| print(r1) | |
| r2 = Relation(name="R2", attributes=["ID", "MS", "VLSI"]) | |
| r2.insert(["AA", 76, 82]) | |
| r2.insert(["BB", 56, 55]) | |
| r2.insert(["CC", 99, 100]) | |
| print("Relation 2:") | |
| print(r2) | |
| d = Query("database") | |
| print("Product:") | |
| print(d.product(r1, r2)) | |
| print("Join:") | |
| print(d.join(r1, r2)) | |
| print("Projection:") | |
| print(d.project(d.join(r1, r2), ["ID", "SS", "VLSI"])) | |
| print("Selection:") | |
| conditions = [["SS", ">", "MS"], ["VLSI", "<", "MS"]] | |
| print(d.project(d.select(d.join(r1, r2), conditions), ["ID"])) |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
On executing
./databse.py, it should produce the following output: