Skip to content

Instantly share code, notes, and snippets.

@Bouni
Created September 3, 2021 15:10
Show Gist options
  • Select an option

  • Save Bouni/616504817fc984d5d1ca85f63529b494 to your computer and use it in GitHub Desktop.

Select an option

Save Bouni/616504817fc984d5d1ca85f63529b494 to your computer and use it in GitHub Desktop.
Read JLCPCB library CSV and write it to a Postgres DB
from typing import Optional
from slugify import slugify
import time
import csv
from sqlmodel import Field, SQLModel, create_engine, Session, select
class Part(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
lcsc_part: str
first_cat_id: Optional[int] = Field(default=None, foreign_key="firstcat.id")
second_cat_id: Optional[int] = Field(default=None, foreign_key="secondcat.id")
mfr_part: str
package_id: Optional[int] = Field(default=None, foreign_key="package.id")
solder_joints: int
manufacturer_id: Optional[int] = Field(default=None, foreign_key="manufacturer.id")
library_type_id: Optional[int] = Field(default=None, foreign_key="librarytype.id")
description: str
datasheet: str
price: str
stock: int
class LibraryType(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str
class Manufacturer(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str
class Package(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str
class FirstCat(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str
class SecondCat(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str
class DBSync:
def __init__(self):
self.engine = create_engine(
"postgresql://postgres:jlcpcbpw@localhost:5432/jlcpcb" # , echo=True
)
self.create_db()
self.counters = {
"relations": {"created": 0},
"parts": {"found": 0, "updated": 0, "created":0}
}
self.read_data()
def create_db(self):
SQLModel.metadata.create_all(self.engine)
def read_data(self):
with open("parts.csv", newline="", encoding="iso-8859-1") as csvfile:
reader = csv.DictReader(
csvfile,
fieldnames=[
"lcsc_part",
"first_cat",
"second_cat",
"mfr_part",
"package",
"solder_joints",
"manufacturer",
"library_type",
"description",
"datasheet",
"price",
"stock",
],
)
next(reader)
start = time.time()
for n, row in enumerate(reader):
if n % 1000 == 0:
elapsed = int(time.time() - start)
print(f"Processed {n} rows so far, run for {elapsed} seconds now")
print(f"Parts: {self.counters['parts']['found']} found, {self.counters['parts']['updated']} updated, {self.counters['parts']['created']} created")
print(f"Relations: {self.counters['relations']['created']} created")
self.create_or_update(row)
end = time.time()
print(f"Total runtime was {end-start}")
def create_or_update(self, data):
with Session(self.engine) as session:
# First Category
# ==================================================================
first_cat = session.exec(
select(FirstCat).where(FirstCat.name == data.get("first_cat"))
).first()
if not first_cat:
print(
f"First Category { data.get('first_cat')} not found, create it now"
)
first_cat = FirstCat(name=data.get("first_cat"))
session.add(first_cat)
session.commit()
self.counters["relations"]["created"] += 1
# Second Category
# ==================================================================
second_cat = session.exec(
select(SecondCat).where(SecondCat.name == data.get("second_cat"))
).first()
if not second_cat:
print(
f"Second Category { data.get('second_cat')} not found, create it now"
)
second_cat = SecondCat(name=data.get("second_cat"))
session.add(second_cat)
session.commit()
self.counters["relations"]["created"] += 1
# Package
# ==================================================================
package = session.exec(
select(Package).where(Package.name == data.get("package"))
).first()
if not package:
print(f"Package { data.get('package')} not found, create it now")
package = Package(name=data.get("package"))
session.add(package)
session.commit()
self.counters["relations"]["created"] += 1
# Manufacturer
# ==================================================================
manufacturer = session.exec(
select(Manufacturer).where(
Manufacturer.name == data.get("manufacturer")
)
).first()
if not manufacturer:
print(
f"Manufacturer { data.get('manufacturer')} not found, create it now"
)
manufacturer = Manufacturer(name=data.get("manufacturer"))
session.add(manufacturer)
session.commit()
self.counters["relations"]["created"] += 1
# LibraryType
# ==================================================================
library_type = session.exec(
select(LibraryType).where(LibraryType.name == data.get("library_type"))
).first()
if not library_type:
print(
f"LibraryType { data.get('library_type')} not found, create it now"
)
library_type = LibraryType(name=data.get("library_type"))
session.add(library_type)
session.commit()
self.counters["relations"]["created"] += 1
# Part
# ==================================================================
part = session.exec(
select(Part).where(Part.lcsc_part == data.get("lcsc_part"))
).first()
if not part:
print(f"Part { data.get('lcsc_part')} not found, create it now")
part = Part(
lcsc_part=data.get("lcsc_part"),
first_cat_id=first_cat.id,
second_cat_id=second_cat.id,
mfr_part=data.get("mfr_part"),
package_id=package.id,
solder_joints=data.get("solder_joints"),
manufacturer_id=manufacturer.id,
library_type_id=library_type.id,
description=data.get("description"),
datasheet=data.get("datasheet"),
price=data.get("price"),
stock=data.get("stock"),
)
session.add(part)
session.commit()
self.counters["parts"]["created"] += 1
else:
part = session.exec(
select(Part).where(
Part.lcsc_part == data.get("lcsc_part"),
Part.first_cat_id == first_cat.id,
Part.second_cat_id == second_cat.id,
Part.mfr_part == data.get("mfr_part"),
Part.package_id == package.id,
Part.solder_joints == data.get("solder_joints"),
Part.manufacturer_id == manufacturer.id,
Part.library_type_id == library_type.id,
Part.description == data.get("description"),
Part.datasheet == data.get("datasheet"),
Part.price == data.get("price"),
Part.stock == data.get("stock")
)
).first()
if not part:
# print(f"Part { data.get('lcsc_part')} found, update it now")
part.first_cat_id = first_cat.id
part.second_cat_id = second_cat.id
part.mfr_part = data.get("mfr_part")
part.package_id = package.id
part.solder_joints = data.get("solder_joints")
part.manufacturer_id = manufacturer.id
part.library_type_id = library_type.id
part.description = data.get("description")
part.datasheet = data.get("datasheet")
part.price = data.get("price")
part.stock = data.get("stock")
session.add(part)
session.commit()
session.refresh(part)
self.counters["parts"]["updated"] += 1
else:
self.counters["parts"]["found"] += 1
if __name__ == "__main__":
DBSync()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment