Last active
December 7, 2018 19:41
-
-
Save SebDeclercq/3311172bca45bfcf06674a67f799192b to your computer and use it in GitHub Desktop.
Script populating an OCP6 Database with fake data
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| -- ENTITIES | |
| CREATE TABLE address ( | |
| id SERIAL PRIMARY KEY, | |
| street_name TEXT NOT NULL, | |
| home_number TEXT NOT NULL, | |
| zip_code TEXT NOT NULL, | |
| country TEXT | |
| ); | |
| CREATE TABLE role ( | |
| id SERIAL PRIMARY KEY, | |
| name TEXT NOT NULL | |
| ); | |
| CREATE TABLE permission ( | |
| id SERIAL PRIMARY KEY, | |
| label TEXT NOT NULL | |
| ); | |
| CREATE TABLE user_account ( | |
| id SERIAL PRIMARY KEY, | |
| member_id INTEGER NOT NULL, | |
| email TEXT NOT NULL, | |
| phone_nb TEXT, | |
| hashed_pwd TEXT NOT NULL | |
| ); | |
| CREATE TABLE member ( | |
| id SERIAL PRIMARY KEY, | |
| name TEXT NOT NULL, | |
| firstname TEXT NOT NULL, | |
| works_at_pizzeria_id INTEGER DEFAULT NULL, | |
| user_account_id INTEGER, | |
| address_id INTEGER NOT NULL, | |
| role_id INTEGER | |
| ); | |
| CREATE TABLE bill ( | |
| id SERIAL PRIMARY KEY, | |
| emission_date DATE NOT NULL, | |
| total_amout_ati NUMERIC(4, 2) NOT NULL, | |
| order_id INTEGER NOT NULL | |
| ); | |
| CREATE TABLE order_status ( | |
| id SERIAL PRIMARY KEY, | |
| label TEXT NOT NULL | |
| ); | |
| CREATE TABLE taken_order ( | |
| id SERIAL PRIMARY KEY, | |
| member_id INTEGER NOT NULL, | |
| address_id INTEGER NOT NULL, | |
| bill_id INTEGER, | |
| status_id INTEGER NOT NULL, | |
| is_paid BOOLEAN DEFAULT FALSE | |
| ); | |
| CREATE TABLE product ( | |
| id SERIAL PRIMARY KEY, | |
| name TEXT NOT NULL, | |
| barcode TEXT NOT NULL, | |
| gram_weight INTEGER NOT NULL, | |
| unit_price_ati NUMERIC(4, 2) NOT NULL | |
| ); | |
| CREATE TABLE pizzeria ( | |
| id SERIAL PRIMARY KEY, | |
| name TEXT, | |
| phone_nb TEXT NOT NULL, | |
| address_id INTEGER | |
| ); | |
| CREATE TABLE recipe ( | |
| id SERIAL PRIMARY KEY, | |
| name TEXT NOT NULL, | |
| is_public BOOLEAN DEFAULT FALSE, | |
| description TEXT NOT NULL | |
| ); | |
| CREATE TABLE catalog_item ( | |
| id SERIAL PRIMARY KEY, | |
| name TEXT NOT NULL, | |
| description TEXT NOT NULL, | |
| picture_file TEXT, | |
| unit_price_ati NUMERIC(4, 2) NOT NULL, | |
| is_available BOOLEAN, | |
| is_displayed BOOLEAN, | |
| recipe_id INTEGER, | |
| product_id INTEGER -- a catalog_item may be a product with no recipe (sodas, etc.) | |
| ); | |
| CREATE TABLE keyword ( | |
| id SERIAL PRIMARY KEY, | |
| name TEXT NOT NULL | |
| ); | |
| -- ASSOCIATIVE ENTITIES | |
| CREATE TABLE has_permission_to ( | |
| role_id INTEGER REFERENCES role NOT NULL, | |
| permission_id INTEGER REFERENCES permission NOT NULL, | |
| PRIMARY KEY (role_id, permission_id) | |
| ); | |
| CREATE TABLE contains_item ( | |
| order_id INTEGER REFERENCES taken_order NOT NULL, | |
| item_id INTEGER REFERENCES catalog_item NOT NULL, | |
| quantity INTEGER NOT NULL DEFAULT 1, | |
| unit_price_ati NUMERIC(4, 2) NOT NULL, | |
| PRIMARY KEY (order_id, item_id) | |
| ); | |
| CREATE TABLE has_product_in_stock ( | |
| pizzeria_id INTEGER REFERENCES pizzeria NOT NULL, | |
| product_id INTEGER REFERENCES product NOT NULL, | |
| quantity INTEGER NOT NULL DEFAULT 0, | |
| PRIMARY KEY (pizzeria_id, product_id) | |
| ); | |
| CREATE TABLE requires_product ( | |
| recipe_id INTEGER REFERENCES recipe NOT NULL, | |
| product_id INTEGER REFERENCES product NOT NULL, | |
| gram_amount INTEGER NOT NULL, | |
| PRIMARY KEY (recipe_id, product_id) | |
| ); | |
| CREATE TABLE has_keyword ( | |
| item_id INTEGER REFERENCES catalog_item NOT NULL, | |
| keyword_id INTEGER REFERENCES keyword NOT NULL, | |
| PRIMARY KEY (item_id, keyword_id) | |
| ); | |
| -- FOREIGN CONSTRAINTS | |
| ALTER TABLE member | |
| ADD CONSTRAINT fk_member_user_account | |
| FOREIGN KEY (user_account_id) | |
| REFERENCES user_account (id); | |
| ALTER TABLE member | |
| ADD CONSTRAINT fk_member_pizzeria | |
| FOREIGN KEY (works_at_pizzeria_id) | |
| REFERENCES pizzeria (id); | |
| ALTER TABLE member | |
| ADD CONSTRAINT fk_member_address | |
| FOREIGN KEY (address_id) | |
| REFERENCES address (id); | |
| ALTER TABLE user_account | |
| ADD CONSTRAINT fk_user_account_member | |
| FOREIGN KEY (member_id) | |
| REFERENCES member (id); | |
| ALTER TABLE taken_order | |
| ADD CONSTRAINT fk_order_member | |
| FOREIGN KEY (member_id) | |
| REFERENCES member (id); | |
| ALTER TABLE taken_order | |
| ADD CONSTRAINT fk_order_bill | |
| FOREIGN KEY (bill_id) | |
| REFERENCES bill (id); | |
| ALTER TABLE taken_order | |
| ADD CONSTRAINT fk_order_status | |
| FOREIGN KEY (status_id) | |
| REFERENCES order_status (id); | |
| ALTER TABLE taken_order | |
| ADD CONSTRAINT fk_order_address | |
| FOREIGN KEY (address_id) | |
| REFERENCES address (id); | |
| ALTER TABLE bill | |
| ADD CONSTRAINT fk_bill_order | |
| FOREIGN KEY (order_id) | |
| REFERENCES taken_order (id); | |
| ALTER TABLE pizzeria | |
| ADD CONSTRAINT fk_pizzeria_address | |
| FOREIGN KEY (address_id) | |
| REFERENCES address (id); | |
| ALTER TABLE catalog_item | |
| ADD CONSTRAINT fk_catalog_item_recipe | |
| FOREIGN KEY (recipe_id) | |
| REFERENCES recipe (id); | |
| ALTER TABLE catalog_item | |
| ADD CONSTRAINT fk_catalog_item_product | |
| FOREIGN KEY (product_id) | |
| REFERENCES product (id); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| from dataclasses import dataclass | |
| import os | |
| import random | |
| import re | |
| import time | |
| import string | |
| from typing import Callable, List, Dict, Optional, Any, Generator | |
| from argparse import ArgumentParser, Namespace | |
| from passlib.hash import pbkdf2_sha256 | |
| from faker import Faker | |
| import records | |
| import sqlalchemy | |
| @dataclass | |
| class Member: | |
| name: str | |
| firstname: str | |
| works_at_pizzeria_id: Optional[int] # REFERENCES Pizzeria | |
| user_account_id: Optional[int] # REFERENCES UserAccount | |
| role_id: Optional[int] # REFERENCES Role | |
| address_id: int # REFERENCES Address | |
| class FakeMember(Member): | |
| def __init__(self, pizzeria_id: int, address_id: int) -> None: | |
| fake: Faker = Faker('fr_FR') | |
| self.name = fake.last_name() | |
| self.firstname = fake.first_name() | |
| self.works_at_pizzeria_id = pizzeria_id | |
| self.address_id = address_id | |
| self.user_account_id = None | |
| self.role = None | |
| @dataclass | |
| class Address: | |
| street_name: str | |
| home_number: str | |
| zip_code: str | |
| country: Optional[str] = 'France' | |
| class FakeAddress(Address): | |
| def __init__(self) -> None: | |
| fake: Faker = Faker('fr_FR') | |
| street: str = fake.address().split('\n')[0] | |
| m: Optional[Any] = re.match(r'^(\d+),?(.*)$', street) | |
| if m and len(m.groups()) == 2: | |
| home_number: str = m[1] | |
| street_name: str = m[2].lstrip().title() | |
| else: | |
| home_number = fake.building_number() | |
| street_name = street.rstrip() | |
| self.street_name = street_name | |
| self.home_number = home_number | |
| self.zip_code = fake.postcode().replace(' ', '') | |
| self.country = 'France' | |
| @dataclass | |
| class UserAccount: | |
| email: str | |
| phone_nb: str | |
| member_id: int # REFERENCES Member | |
| hashed_pwd: str | |
| class FakeUserAccount(UserAccount): | |
| def __init__(self, member_id: int) -> None: | |
| fake: Faker = Faker('fr_FR') | |
| self.email = fake.email() | |
| self.phone_nb = re.sub(r'^\+33|\D', '', fake.phone_number()) | |
| if len(self.phone_nb) < 10: | |
| self.phone_nb = '0' + self.phone_nb | |
| self.member_id = member_id | |
| self.hashed_pwd = pbkdf2_sha256.hash( | |
| ''.join(random.sample(string.printable, 15)) | |
| ) | |
| @dataclass | |
| class TakenOrder: | |
| member_id: int # REFERENCES Member | |
| address_id: int # REFERENCES Address | |
| status_id: int # REFERENCES OrderStatus | |
| is_paid: bool | |
| bill_id: Optional[int] = None # REFERENCES Bill | |
| class FakeTakenOrder(TakenOrder): | |
| def __init__(self, member_id: int, address_id: int, | |
| order_status_ids: List[int]) -> None: | |
| self.member_id = member_id | |
| self.address_id = address_id | |
| self.status_id = random.choice(order_status_ids) | |
| self.is_paid = random.choice((True, False)) | |
| self.bill_id = None | |
| @dataclass | |
| class Bill: | |
| emission_date: str | |
| total_amout_ati: float | |
| order_id: int # REFERENCES TakenOrder | |
| class FakeBill(Bill): | |
| def __init__(self, order_id: int) -> None: | |
| fake: Faker = Faker('fr_FR') | |
| self.emission_date = fake.date_time_this_decade() | |
| self.total_amout_ati = fake.pyfloat( | |
| positive=True, left_digits=2, right_digits=2 | |
| ) | |
| self.order_id = order_id | |
| @dataclass | |
| class Product: | |
| name: str | |
| barcode: str | |
| gram_weight: int | |
| unit_price_ati: int | |
| class FakeProduct(Product): | |
| def __init__(self, name: str) -> None: | |
| fake: Faker = Faker('fr_FR') | |
| self.name = name | |
| self.barcode = fake.ean13() | |
| self.gram_weight = random.randint(20, 2000) * 5 | |
| self.unit_price_ati = fake.pyfloat( | |
| positive=True, left_digits=2, right_digits=2 | |
| ) | |
| @dataclass | |
| class Pizzeria: | |
| name: str | |
| phone_nb: str | |
| address_id: Optional[int] | |
| class FakePizzeria(Pizzeria): | |
| def __init__(self, name: str, address_id: int) -> None: | |
| fake: Faker = Faker('fr_FR') | |
| self.name = name | |
| self.phone_nb = re.sub(r'\+33|\D', '', fake.phone_number()) | |
| if len(self.phone_nb) < 10: | |
| self.phone_nb = '0' + self.phone_nb | |
| self.address_id = address_id | |
| @dataclass | |
| class Recipe: | |
| name: str | |
| description: str | |
| is_public: bool = False | |
| class FakeRecipe(Recipe): | |
| def __init__(self, name: str) -> None: | |
| fake: Faker = Faker('fr_FR') | |
| self.name = name | |
| self.description = fake.paragraphs() | |
| self.is_public = random.choice((True, False)) | |
| @dataclass | |
| class CatalogItem: | |
| name: str | |
| description: str | |
| picture_file: str | |
| unit_price_ati: float | |
| is_available: bool | |
| is_displayed: bool | |
| recipe_id: Optional[int] = None | |
| product_id: Optional[int] = None | |
| class FakeCatalogItem(CatalogItem): | |
| def __init__(self, name: str, parent: str, parent_id: int) -> None: | |
| fake: Faker = Faker('fr_FR') | |
| self.name = name | |
| self.description = fake.sentences() | |
| self.picture_file = fake.file_name(extension='jpg') | |
| self.unit_price_ati = fake.pyfloat( | |
| positive=True, left_digits=2, right_digits=2 | |
| ) | |
| self.is_available = random.choice((True, False)) | |
| self.is_displayed = random.choice((True, False)) | |
| if parent == 'recipe': | |
| self.recipe_id = parent_id | |
| self.product_id = None | |
| elif parent == 'product': | |
| self.product_id = parent_id | |
| self.recipe_id = None | |
| else: | |
| raise AttributeError(f'Unknown parent "{parent}"') | |
| @dataclass | |
| class Role: | |
| name: str | |
| @dataclass | |
| class Permission: | |
| label: str | |
| @dataclass | |
| class OrderStatus: | |
| label: str | |
| @dataclass | |
| class Keyword: | |
| name: str | |
| class RandomDataGenerator: | |
| @staticmethod | |
| def addresses(size: int = 10) -> Generator[Address, None, None]: | |
| for i in range(size): | |
| yield FakeAddress() | |
| @staticmethod | |
| def members(address_ids: List[int], pizzeria_ids: List[int], | |
| size: int = 10) -> Generator[Member, None, None]: | |
| if len(address_ids) < size: | |
| raise ValueError('Not enough address_ids') | |
| pizzeria_ids = [random.choice((None, i)) for i in pizzeria_ids] | |
| random.shuffle(address_ids) | |
| for i in range(size): | |
| yield FakeMember( | |
| random.choice(pizzeria_ids), | |
| address_ids[i] | |
| ) | |
| @staticmethod | |
| def user_accounts(member_ids: List[int], size: int = 10)\ | |
| -> Generator[UserAccount, None, None]: | |
| if len(member_ids) < size: | |
| raise ValueError('Not enough member_ids') | |
| random.shuffle(member_ids) | |
| for i in range(size): | |
| yield FakeUserAccount(member_ids[i]) | |
| @staticmethod | |
| def taken_orders(member_ids: List[int], address_ids: List[int], | |
| order_status_ids: List[int], | |
| size: int = 10) -> Generator[TakenOrder, None, None]: | |
| if len(member_ids) < size: | |
| raise ValueError('Not enough member_ids') | |
| elif len(address_ids) < size: | |
| raise ValueError('Not enough address_ids') | |
| random.shuffle(member_ids) | |
| random.shuffle(address_ids) | |
| for i in range(size): | |
| yield FakeTakenOrder( | |
| member_ids[i], address_ids[i], order_status_ids | |
| ) | |
| @staticmethod | |
| def bills(taken_order_ids: List[int], size: int = 10)\ | |
| -> Generator[Bill, None, None]: | |
| if len(taken_order_ids) < size: | |
| print(taken_order_ids) | |
| raise ValueError('Not enough taken_order_ids') | |
| for i in range(size): | |
| yield FakeBill(taken_order_ids[i]) | |
| @staticmethod | |
| def products() -> Generator[Product, None, None]: | |
| product_names: List[str] = [ | |
| 'farine de blé', 'tomate pelée', 'pulpe de tomate', | |
| 'mozzarella', 'ananas', 'parmesan', 'viande hachée de boeuf', | |
| 'saumon', 'fromage de chèvre', 'artichaut', 'poivron', | |
| 'thon', 'oignon', 'ail', 'pâte', 'oeuf', 'mélange fruits de mer', | |
| 'jambon', 'jambon sec', 'chorizo', 'truffe', 'petit pois', | |
| 'viande de boeuf', 'moule', 'palourde', 'coque', | |
| "farine d'épeautre", 'langoustine', 'écrevisse', 'crevette rose', | |
| 'crevette grise', 'olive', 'roquette', 'basilic', 'champignon', | |
| ] | |
| for product_name in product_names: | |
| yield FakeProduct(product_name) | |
| @staticmethod | |
| def pizzerias(address_ids: List[Optional[int]])\ | |
| -> Generator[Pizzeria, None, None]: | |
| # OC Pizza has currently 5 stores | |
| pizzeria_names: List[str] = [ | |
| 'OC Pizza #1', 'OC Pizza bis', 'The Best of OC Pizza', | |
| 'OC Pizza Original', "OC Pizza's" | |
| ] | |
| if len(address_ids) < 5: | |
| address_ids += [None] * 5 | |
| random.shuffle(address_ids) | |
| for i, name in enumerate(pizzeria_names): | |
| yield FakePizzeria(name, address_ids[i]) | |
| @staticmethod | |
| def recipes() -> Generator[Recipe, None, None]: | |
| recipe_names: List[str] = [ | |
| 'Spaghetti bolognaise', 'Pizza regina', 'Pizza calzone', | |
| 'Pizza quatre saisons', 'Pizza de la mer', 'Ravioles au crabe', | |
| 'Tagliatelle au saumon', 'Pizza Margarita', 'Pizza hawaïenne', | |
| 'Pâtes à la carbonara', 'Risotto à la truffe', 'Minestrone', | |
| 'Linguine aux fruits de mer', 'Pâtes au pesto', 'Lasagne', | |
| 'Lasagne végétarienne', 'Raviolis quatre fromages', | |
| 'Escalope à la milanaise', 'Carpaccio de boeuf', | |
| 'Scalopine', 'Pizza chèvre miel', 'Pizza végétarienne', | |
| 'Pizza napolitaine', | |
| ] | |
| for recipe_name in recipe_names: | |
| yield FakeRecipe(recipe_name) | |
| @staticmethod | |
| def catalog_items(recipes: Dict[str, int])\ | |
| -> Generator[CatalogItem, None, None]: | |
| for recipe_name, recipe_id in recipes.items(): | |
| yield FakeCatalogItem( | |
| recipe_name, 'recipe', int(recipe_id) | |
| ) | |
| @staticmethod | |
| def order_status() -> Generator[OrderStatus, None, None]: | |
| labels: List[str] = [ | |
| 'Panier', 'En cours', 'En attente', 'Terminée' | |
| ] | |
| for label in labels: | |
| yield OrderStatus(label) | |
| @staticmethod | |
| def keywords() -> Generator[Keyword, None, None]: | |
| labels: List[str] = [ | |
| 'pizza', 'pâtes', 'végétarien', 'poisson', 'viande', | |
| 'champignon', 'spaghetti', 'macaroni', 'tagliatelle', | |
| 'fromage', 'chèvre', 'artichaut', 'parmesan', 'gruyère', | |
| 'tomate', 'poivron', 'thon', 'saumon', 'carbonara', | |
| 'moule', 'palourde', 'crevette', 'langoustine', 'écrevisse', | |
| 'porc', 'boeuf', 'poulet', 'volaille', 'oignon', 'ail', | |
| 'napolitain', 'truffe', 'veau', 'pesto', 'riz', | |
| ] | |
| for label in labels: | |
| yield Keyword(label) | |
| @staticmethod | |
| def permissions() -> Generator[Permission, None, None]: | |
| labels: List[str] = [ | |
| 'Modifier compte', 'Créer compte', 'Créer commande', | |
| 'Consulter commande tierse', 'Modifier compte tiers', | |
| ] | |
| for label in labels: | |
| yield Permission(label) | |
| @staticmethod | |
| def roles() -> Generator[Role, None, None]: | |
| names: List[str] = [ | |
| 'Client', 'Gestionnaire', 'Pizzaïolo', 'Livreur', | |
| 'Opérateur de commande', | |
| ] | |
| for name in names: | |
| yield Role(name) | |
| @staticmethod | |
| def has_permission_to(role_ids: List[int], | |
| permission_ids: List[int]) -> Dict[str, int]: | |
| return {'role_id': random.choice(role_ids), | |
| 'permission_id': random.choice(permission_ids)} | |
| @staticmethod | |
| def contains_item(taken_order_ids: List[int], | |
| catalog_item_ids: List[int]) -> Dict[str, Any]: | |
| return {'order_id': random.choice(taken_order_ids), | |
| 'item_id': random.choice(catalog_item_ids), | |
| 'quantity': random.randint(1, 10), | |
| 'unit_price_ati': random.uniform(5.00, 80.00)} | |
| @staticmethod | |
| def has_product_in_stock(pizzeria_ids: List[int], | |
| product_ids: List[int]) -> Dict[str, int]: | |
| return {'pizzeria_id': random.choice(pizzeria_ids), | |
| 'product_id': random.choice(product_ids), | |
| 'quantity': random.randint(1, 100)} | |
| @staticmethod | |
| def requires_product(recipe_ids: List[int], | |
| product_ids: List[int]) -> Dict[str, int]: | |
| return {'recipe_id': random.choice(recipe_ids), | |
| 'product_id': random.choice(product_ids), | |
| 'gram_amount': random.randint(1, 1000)} | |
| @staticmethod | |
| def has_keyword(catalog_item_ids: List[int], | |
| keyword_ids: List[int]) -> Dict[str, int]: | |
| return {'item_id': random.choice(catalog_item_ids), | |
| 'keyword_id': random.choice(keyword_ids)} | |
| class DatabaseFeeder: | |
| address_ids: List[int] | |
| member_ids: List[int] | |
| pizzeria_ids: List[int] | |
| user_accounts: Dict[int, int] | |
| recipes: Dict[str, int] | |
| product_ids: List[int] | |
| catalog_item_ids: List[int] | |
| order_status_ids: List[int] | |
| keyword_ids: List[int] | |
| permission_ids: List[int] | |
| role_ids: List[int] | |
| def __init__(self, user: str, password: str, | |
| host: str, dbname: str, size: int = 10) -> None: | |
| self.db = records.Database( | |
| f'postgresql://{user}:{password}@{host}/{dbname}' | |
| ) | |
| self.size = size | |
| def populate(self) -> Any: | |
| s = time.time() | |
| print('START') | |
| print('INSERTING ADDRESS') | |
| self._insert_addresses() | |
| print('INSERTING PIZZERIAS') | |
| self._insert_pizzerias() | |
| print('INSERTING MEMBERS') | |
| self._insert_members() | |
| print('INSERTING USER ACCOUNTS') | |
| self._insert_user_accounts() | |
| print('UPDATING MEMBERS USER ACCOUNTS') | |
| self._update_members_user_account() | |
| print('INSERTING RECIPES') | |
| self._insert_recipes() | |
| print('INSERTING PRODUCTS') | |
| self._insert_products() | |
| print('INSERTING CATALOG ITEMS') | |
| self._insert_catalog_items() | |
| print('INSERTING ORDER STATUS') | |
| self._insert_order_status() | |
| print('INSERTING TAKEN ORDERS') | |
| self._insert_taken_orders() | |
| print('INSERTING BILLS') | |
| self._insert_bills() | |
| print('INSERTING KEYWORDS') | |
| self._insert_keywords() | |
| print('INSERTING PERMISSIONS') | |
| self._insert_permissions() | |
| print('INSERTING ROLES') | |
| self._insert_roles() | |
| print('UPDATING MEMBERS ROLES') | |
| self._update_members_role() | |
| print('POPULATING ASSOCIATIVE ENTITIES') | |
| self._insert_relations_many_to_many() | |
| e = time.time() | |
| print(f'END ({e - s:.2f} sec.)') | |
| def _insert_addresses(self) -> List[int]: | |
| for address in RandomDataGenerator.addresses(size=self.size): | |
| self.db.query( | |
| '''INSERT INTO address | |
| (street_name, home_number, zip_code, country) | |
| VALUES (:street_name, :home_number, | |
| :zip_code, :country);''', **address.__dict__ | |
| ) | |
| rows: records.RecordCollection = self.db.query( | |
| '''SELECT id FROM address;''' | |
| ) | |
| self.address_ids = [r.id for r in rows] | |
| return self.address_ids | |
| def _insert_pizzerias(self) -> List[int]: | |
| for pizzeria in RandomDataGenerator.pizzerias(self.address_ids): | |
| self.db.query( | |
| '''INSERT INTO pizzeria (name, phone_nb, address_id) | |
| VALUES (:name, :phone_nb, :address_id);''', | |
| **pizzeria.__dict__ | |
| ) | |
| rows: records.RecordCollection = self.db.query( | |
| '''SELECT id FROM pizzeria;''' | |
| ) | |
| self.pizzeria_ids = [r.id for r in rows] | |
| return self.pizzeria_ids | |
| def _insert_members(self) -> List[int]: | |
| gen_members: Generator[Member, None, None] = RandomDataGenerator\ | |
| .members(self.address_ids, self.pizzeria_ids, size=self.size) | |
| for member in gen_members: | |
| self.db.query( | |
| '''INSERT INTO member | |
| (name, firstname, works_at_pizzeria_id, | |
| user_account_id, address_id) | |
| VALUES | |
| (:name, :firstname, :works_at_pizzeria_id, | |
| :user_account_id, :address_id);''', | |
| **member.__dict__ | |
| ) | |
| rows: records.RecordCollection = self.db.query( | |
| '''SELECT id FROM member;''' | |
| ) | |
| self.member_ids = [r.id for r in rows] | |
| return self.member_ids | |
| def _insert_user_accounts(self) -> Dict[int, int]: | |
| gen_user_accounts: Generator[UserAccount, None, None] = \ | |
| RandomDataGenerator.user_accounts(self.member_ids, size=self.size) | |
| for user_account in gen_user_accounts: | |
| self.db.query( | |
| '''INSERT INTO user_account (member_id, email, | |
| phone_nb, hashed_pwd) VALUES (:member_id, :email, | |
| :phone_nb, :hashed_pwd);''', | |
| **user_account.__dict__ | |
| ) | |
| rows: records.RecordCollection = self.db.query( | |
| '''SELECT id, member_id FROM user_account;''' | |
| ) | |
| self.user_accounts = {r.member_id: r.id for r in rows} | |
| return self.user_accounts | |
| def _insert_taken_orders(self) -> List[int]: | |
| taken_orders: Generator[TakenOrder, None, None] = RandomDataGenerator\ | |
| .taken_orders(self.member_ids, self.address_ids, | |
| self.order_status_ids, size=self.size) | |
| for taken_order in taken_orders: | |
| self.db.query( | |
| '''INSERT INTO taken_order (member_id, address_id, | |
| status_id, is_paid, bill_id) VALUES (:member_id, :address_id, | |
| :status_id, :is_paid, :bill_id)''', | |
| **taken_order.__dict__ | |
| ) | |
| rows: records.RecordCollection = self.db.query( | |
| '''SELECT id FROM taken_order;''' | |
| ) | |
| self.taken_order_ids = [r.id for r in rows] | |
| return self.taken_order_ids | |
| def _insert_bills(self) -> None: | |
| bills: Generator[Bill, None, None] = RandomDataGenerator\ | |
| .bills(self.taken_order_ids, size=self.size) | |
| for bill in bills: | |
| self.db.query( | |
| '''INSERT INTO bill (emission_date, total_amout_ati, | |
| order_id) VALUES (:emission_date, :total_amout_ati, | |
| :order_id);''', **bill.__dict__ | |
| ) | |
| def _insert_recipes(self) -> Dict[str, int]: | |
| for recipe in RandomDataGenerator.recipes(): | |
| self.db.query( | |
| '''INSERT INTO recipe (name, description, is_public) | |
| VALUES (:name, :description, :is_public);''', | |
| **recipe.__dict__ | |
| ) | |
| rows: records.RecordCollection = self.db.query( | |
| '''SELECT name, id FROM recipe;''' | |
| ) | |
| self.recipes = {r.name: r.id for r in rows} | |
| return self.recipes | |
| def _insert_products(self) -> Any: | |
| for product in RandomDataGenerator.products(): | |
| self.db.query( | |
| '''INSERT INTO product | |
| (name, barcode, gram_weight, unit_price_ati) | |
| VALUES (:name, :barcode, :gram_weight, :unit_price_ati);''', | |
| **product.__dict__ | |
| ) | |
| rows: records.RecordCollection = self.db.query( | |
| '''SELECT id FROM product;''' | |
| ) | |
| self.product_ids = [r.id for r in rows] | |
| return self.product_ids | |
| def _insert_catalog_items(self) -> List[int]: | |
| for item in RandomDataGenerator.catalog_items(self.recipes): | |
| self.db.query( | |
| '''INSERT INTO catalog_item | |
| (name, description, picture_file, | |
| unit_price_ati, is_available, is_displayed, | |
| recipe_id, product_id) VALUES | |
| (:name, :description, :picture_file, | |
| :unit_price_ati, :is_available, :is_displayed, | |
| :recipe_id, :product_id);''', | |
| **item.__dict__ | |
| ) | |
| rows: records.RecordCollection = self.db.query( | |
| '''SELECT id FROM catalog_item;''' | |
| ) | |
| self.catalog_item_ids = [r.id for r in rows] | |
| return self.catalog_item_ids | |
| def _update_members_user_account(self) -> None: | |
| for member_id, ua_id in self.user_accounts.items(): | |
| self.db.query( | |
| '''UPDATE member SET user_account_id = :ua_id | |
| WHERE id = :member_id;''', | |
| **{'member_id': member_id, 'ua_id': ua_id} | |
| ) | |
| def _update_members_role(self) -> None: | |
| rows: records.RecordCollection = self.db.query( | |
| '''SELECT id FROM member WHERE | |
| works_at_pizzeria_id IS NOT NULL;''' | |
| ) | |
| for r in rows: | |
| self.db.query( | |
| '''UPDATE member set role_id = :role_id | |
| WHERE id = :member_id''', | |
| **{'member_id': r.id, | |
| 'role_id': random.choice(self.role_ids)} | |
| ) | |
| def _insert_order_status(self) -> List[int]: | |
| for status in RandomDataGenerator.order_status(): | |
| self.db.query( | |
| '''INSERT INTO order_status (label) VALUES (:label);''', | |
| **status.__dict__ | |
| ) | |
| rows: records.RecordCollection = self.db.query( | |
| '''SELECT id FROM order_status;''' | |
| ) | |
| self.order_status_ids = [r.id for r in rows] | |
| return self.order_status_ids | |
| def _insert_keywords(self) -> Any: | |
| for keyword in RandomDataGenerator.keywords(): | |
| self.db.query( | |
| '''INSERT INTO keyword (name) VALUES (:name);''', | |
| **keyword.__dict__ | |
| ) | |
| rows: records.RecordCollection = self.db.query( | |
| '''SELECT id FROM keyword;''' | |
| ) | |
| self.keyword_ids = [r.id for r in rows] | |
| return self.keyword_ids | |
| def _insert_permissions(self) -> List[int]: | |
| for permission in RandomDataGenerator.permissions(): | |
| self.db.query( | |
| '''INSERT INTO permission (label) VALUES (:label);''', | |
| **permission.__dict__ | |
| ) | |
| rows: records.RecordCollection = self.db.query( | |
| '''SELECT id FROM permission;''' | |
| ) | |
| self.permission_ids = [r.id for r in rows] | |
| return self.permission_ids | |
| def _insert_roles(self) -> List[int]: | |
| for role in RandomDataGenerator.roles(): | |
| self.db.query( | |
| '''INSERT INTO role (name) VALUES (:name);''', | |
| **role.__dict__ | |
| ) | |
| rows: records.RecordCollection = self.db.query( | |
| '''SELECT id FROM role;''' | |
| ) | |
| self.role_ids = [r.id for r in rows] | |
| return self.role_ids | |
| def _insert_relations_many_to_many(self) -> None: | |
| callbacks: List[Callable] = [ | |
| self._insert_has_permission_to, | |
| self._insert_contains_item, | |
| self._insert_requires_product, | |
| self._insert_has_product_in_stock, | |
| self._insert_has_keyword, | |
| ] | |
| for callback in callbacks: | |
| for i in range(self.size): | |
| try: | |
| callback() | |
| except sqlalchemy.exc.IntegrityError: | |
| pass | |
| def _insert_has_permission_to(self) -> None: | |
| self.db.query( | |
| '''INSERT INTO has_permission_to VALUES | |
| (:role_id, :permission_id);''', | |
| **RandomDataGenerator.has_permission_to( | |
| self.role_ids, self.permission_ids | |
| ) | |
| ) | |
| def _insert_contains_item(self) -> None: | |
| self.db.query( | |
| '''INSERT INTO contains_item VALUES | |
| (:order_id, :item_id, :quantity, :unit_price_ati);''', | |
| **RandomDataGenerator.contains_item( | |
| self.taken_order_ids, self.catalog_item_ids | |
| ) | |
| ) | |
| def _insert_has_product_in_stock(self) -> None: | |
| self.db.query( | |
| '''INSERT INTO has_product_in_stock VALUES | |
| (:pizzeria_id, :product_id, :quantity);''', | |
| **RandomDataGenerator.has_product_in_stock( | |
| self.pizzeria_ids, self.product_ids | |
| ) | |
| ) | |
| def _insert_requires_product(self) -> None: | |
| self.db.query( | |
| '''INSERT INTO requires_product VALUES | |
| (:recipe_id, :product_id, :gram_amount);''', | |
| **RandomDataGenerator.requires_product( | |
| list(self.recipes.values()), self.product_ids | |
| ) | |
| ) | |
| def _insert_has_keyword(self) -> None: | |
| self.db.query( | |
| '''INSERT INTO has_keyword VALUES (:item_id, :keyword_id)''', | |
| **RandomDataGenerator.has_keyword( | |
| self.catalog_item_ids, self.keyword_ids | |
| ) | |
| ) | |
| def main() -> None: | |
| arg_parser: ArgumentParser = ArgumentParser( | |
| description='Script populating an OCP6 Database with fake data' | |
| ) | |
| arg_parser.add_argument('-s', '--size', type=int, default=10, | |
| help='Size of batch of inserted data') | |
| args: Namespace = arg_parser.parse_args() | |
| dbfeeder: DatabaseFeeder = DatabaseFeeder( | |
| os.environ['user'], os.environ['password'], | |
| os.environ['host'], os.environ['dbname'], | |
| size=args.size | |
| ) | |
| dbfeeder.populate() | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment