Created
September 3, 2021 15:10
-
-
Save Bouni/616504817fc984d5d1ca85f63529b494 to your computer and use it in GitHub Desktop.
Read JLCPCB library CSV and write it to a Postgres DB
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from typing import Optional | |
| from slugify import slugify | |
| import time | |
| import csv | |
| from sqlmodel import Field, SQLModel, create_engine, Session, select | |
| class Part(SQLModel, table=True): | |
| id: Optional[int] = Field(default=None, primary_key=True) | |
| lcsc_part: str | |
| first_cat_id: Optional[int] = Field(default=None, foreign_key="firstcat.id") | |
| second_cat_id: Optional[int] = Field(default=None, foreign_key="secondcat.id") | |
| mfr_part: str | |
| package_id: Optional[int] = Field(default=None, foreign_key="package.id") | |
| solder_joints: int | |
| manufacturer_id: Optional[int] = Field(default=None, foreign_key="manufacturer.id") | |
| library_type_id: Optional[int] = Field(default=None, foreign_key="librarytype.id") | |
| description: str | |
| datasheet: str | |
| price: str | |
| stock: int | |
| class LibraryType(SQLModel, table=True): | |
| id: Optional[int] = Field(default=None, primary_key=True) | |
| name: str | |
| class Manufacturer(SQLModel, table=True): | |
| id: Optional[int] = Field(default=None, primary_key=True) | |
| name: str | |
| class Package(SQLModel, table=True): | |
| id: Optional[int] = Field(default=None, primary_key=True) | |
| name: str | |
| class FirstCat(SQLModel, table=True): | |
| id: Optional[int] = Field(default=None, primary_key=True) | |
| name: str | |
| class SecondCat(SQLModel, table=True): | |
| id: Optional[int] = Field(default=None, primary_key=True) | |
| name: str | |
| class DBSync: | |
| def __init__(self): | |
| self.engine = create_engine( | |
| "postgresql://postgres:jlcpcbpw@localhost:5432/jlcpcb" # , echo=True | |
| ) | |
| self.create_db() | |
| self.counters = { | |
| "relations": {"created": 0}, | |
| "parts": {"found": 0, "updated": 0, "created":0} | |
| } | |
| self.read_data() | |
| def create_db(self): | |
| SQLModel.metadata.create_all(self.engine) | |
| def read_data(self): | |
| with open("parts.csv", newline="", encoding="iso-8859-1") as csvfile: | |
| reader = csv.DictReader( | |
| csvfile, | |
| fieldnames=[ | |
| "lcsc_part", | |
| "first_cat", | |
| "second_cat", | |
| "mfr_part", | |
| "package", | |
| "solder_joints", | |
| "manufacturer", | |
| "library_type", | |
| "description", | |
| "datasheet", | |
| "price", | |
| "stock", | |
| ], | |
| ) | |
| next(reader) | |
| start = time.time() | |
| for n, row in enumerate(reader): | |
| if n % 1000 == 0: | |
| elapsed = int(time.time() - start) | |
| print(f"Processed {n} rows so far, run for {elapsed} seconds now") | |
| print(f"Parts: {self.counters['parts']['found']} found, {self.counters['parts']['updated']} updated, {self.counters['parts']['created']} created") | |
| print(f"Relations: {self.counters['relations']['created']} created") | |
| self.create_or_update(row) | |
| end = time.time() | |
| print(f"Total runtime was {end-start}") | |
| def create_or_update(self, data): | |
| with Session(self.engine) as session: | |
| # First Category | |
| # ================================================================== | |
| first_cat = session.exec( | |
| select(FirstCat).where(FirstCat.name == data.get("first_cat")) | |
| ).first() | |
| if not first_cat: | |
| print( | |
| f"First Category { data.get('first_cat')} not found, create it now" | |
| ) | |
| first_cat = FirstCat(name=data.get("first_cat")) | |
| session.add(first_cat) | |
| session.commit() | |
| self.counters["relations"]["created"] += 1 | |
| # Second Category | |
| # ================================================================== | |
| second_cat = session.exec( | |
| select(SecondCat).where(SecondCat.name == data.get("second_cat")) | |
| ).first() | |
| if not second_cat: | |
| print( | |
| f"Second Category { data.get('second_cat')} not found, create it now" | |
| ) | |
| second_cat = SecondCat(name=data.get("second_cat")) | |
| session.add(second_cat) | |
| session.commit() | |
| self.counters["relations"]["created"] += 1 | |
| # Package | |
| # ================================================================== | |
| package = session.exec( | |
| select(Package).where(Package.name == data.get("package")) | |
| ).first() | |
| if not package: | |
| print(f"Package { data.get('package')} not found, create it now") | |
| package = Package(name=data.get("package")) | |
| session.add(package) | |
| session.commit() | |
| self.counters["relations"]["created"] += 1 | |
| # Manufacturer | |
| # ================================================================== | |
| manufacturer = session.exec( | |
| select(Manufacturer).where( | |
| Manufacturer.name == data.get("manufacturer") | |
| ) | |
| ).first() | |
| if not manufacturer: | |
| print( | |
| f"Manufacturer { data.get('manufacturer')} not found, create it now" | |
| ) | |
| manufacturer = Manufacturer(name=data.get("manufacturer")) | |
| session.add(manufacturer) | |
| session.commit() | |
| self.counters["relations"]["created"] += 1 | |
| # LibraryType | |
| # ================================================================== | |
| library_type = session.exec( | |
| select(LibraryType).where(LibraryType.name == data.get("library_type")) | |
| ).first() | |
| if not library_type: | |
| print( | |
| f"LibraryType { data.get('library_type')} not found, create it now" | |
| ) | |
| library_type = LibraryType(name=data.get("library_type")) | |
| session.add(library_type) | |
| session.commit() | |
| self.counters["relations"]["created"] += 1 | |
| # Part | |
| # ================================================================== | |
| part = session.exec( | |
| select(Part).where(Part.lcsc_part == data.get("lcsc_part")) | |
| ).first() | |
| if not part: | |
| print(f"Part { data.get('lcsc_part')} not found, create it now") | |
| part = Part( | |
| lcsc_part=data.get("lcsc_part"), | |
| first_cat_id=first_cat.id, | |
| second_cat_id=second_cat.id, | |
| mfr_part=data.get("mfr_part"), | |
| package_id=package.id, | |
| solder_joints=data.get("solder_joints"), | |
| manufacturer_id=manufacturer.id, | |
| library_type_id=library_type.id, | |
| description=data.get("description"), | |
| datasheet=data.get("datasheet"), | |
| price=data.get("price"), | |
| stock=data.get("stock"), | |
| ) | |
| session.add(part) | |
| session.commit() | |
| self.counters["parts"]["created"] += 1 | |
| else: | |
| part = session.exec( | |
| select(Part).where( | |
| Part.lcsc_part == data.get("lcsc_part"), | |
| Part.first_cat_id == first_cat.id, | |
| Part.second_cat_id == second_cat.id, | |
| Part.mfr_part == data.get("mfr_part"), | |
| Part.package_id == package.id, | |
| Part.solder_joints == data.get("solder_joints"), | |
| Part.manufacturer_id == manufacturer.id, | |
| Part.library_type_id == library_type.id, | |
| Part.description == data.get("description"), | |
| Part.datasheet == data.get("datasheet"), | |
| Part.price == data.get("price"), | |
| Part.stock == data.get("stock") | |
| ) | |
| ).first() | |
| if not part: | |
| # print(f"Part { data.get('lcsc_part')} found, update it now") | |
| part.first_cat_id = first_cat.id | |
| part.second_cat_id = second_cat.id | |
| part.mfr_part = data.get("mfr_part") | |
| part.package_id = package.id | |
| part.solder_joints = data.get("solder_joints") | |
| part.manufacturer_id = manufacturer.id | |
| part.library_type_id = library_type.id | |
| part.description = data.get("description") | |
| part.datasheet = data.get("datasheet") | |
| part.price = data.get("price") | |
| part.stock = data.get("stock") | |
| session.add(part) | |
| session.commit() | |
| session.refresh(part) | |
| self.counters["parts"]["updated"] += 1 | |
| else: | |
| self.counters["parts"]["found"] += 1 | |
| if __name__ == "__main__": | |
| DBSync() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment