Skip to content

Instantly share code, notes, and snippets.

@Bouni
Created September 3, 2021 15:09
Show Gist options
  • Select an option

  • Save Bouni/6abb2137aa36bc26de22f37fef22de0e to your computer and use it in GitHub Desktop.

Select an option

Save Bouni/6abb2137aa36bc26de22f37fef22de0e to your computer and use it in GitHub Desktop.
Read JLCPCB library CSV and write async it to a Postgres DB
import asyncio
import csv
import time
from typing import Optional
from slugify import slugify
from sqlalchemy.ext.asyncio import create_async_engine
from sqlmodel import Field, Session, SQLModel, create_engine, select
from sqlmodel.ext.asyncio.session import AsyncSession
class Part(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
lcsc_part: str
first_cat_id: Optional[int] = Field(default=None, foreign_key="firstcat.id")
second_cat_id: Optional[int] = Field(default=None, foreign_key="secondcat.id")
mfr_part: str
package_id: Optional[int] = Field(default=None, foreign_key="package.id")
solder_joints: int
manufacturer_id: Optional[int] = Field(default=None, foreign_key="manufacturer.id")
library_type_id: Optional[int] = Field(default=None, foreign_key="librarytype.id")
description: str
datasheet: str
price: str
stock: int
class LibraryType(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str
class Manufacturer(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str
class Package(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str
class FirstCat(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str
class SecondCat(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str
async def create_or_update(session, data, counters):
# First Category
# ==================================================================
first_cat = (await session.exec(
select(FirstCat).where(FirstCat.name == data.get("first_cat"))
)).first()
if not first_cat:
print(
f"First Category { data.get('first_cat')} not found, create it now"
)
first_cat = FirstCat(name=data.get("first_cat"))
session.add(first_cat)
await session.commit()
counters["relations"]["created"] += 1
# Second Category
# ==================================================================
second_cat = (await session.exec(
select(SecondCat).where(SecondCat.name == data.get("second_cat"))
)).first()
if not second_cat:
print(
f"Second Category { data.get('second_cat')} not found, create it now"
)
second_cat = SecondCat(name=data.get("second_cat"))
session.add(second_cat)
await session.commit()
counters["relations"]["created"] += 1
# Package
# ==================================================================
package = (await session.exec(
select(Package).where(Package.name == data.get("package"))
)).first()
if not package:
print(f"Package { data.get('package')} not found, create it now")
package = Package(name=data.get("package"))
session.add(package)
await session.commit()
counters["relations"]["created"] += 1
# Manufacturer
# ==================================================================
manufacturer = (await session.exec(
select(Manufacturer).where(
Manufacturer.name == data.get("manufacturer")
)
)).first()
if not manufacturer:
print(
f"Manufacturer { data.get('manufacturer')} not found, create it now"
)
manufacturer = Manufacturer(name=data.get("manufacturer"))
session.add(manufacturer)
await session.commit()
counters["relations"]["created"] += 1
# LibraryType
# ==================================================================
library_type = (await session.exec(
select(LibraryType).where(LibraryType.name == data.get("library_type"))
)).first()
if not library_type:
print(
f"LibraryType { data.get('library_type')} not found, create it now"
)
library_type = LibraryType(name=data.get("library_type"))
session.add(library_type)
await session.commit()
counters["relations"]["created"] += 1
# Part
# ==================================================================
part = (await session.exec(
select(Part).where(Part.lcsc_part == data.get("lcsc_part"))
)).first()
if not part:
print(f"Part { data.get('lcsc_part')} not found, create it now")
part = Part(
lcsc_part=data.get("lcsc_part"),
first_cat_id=first_cat.id,
second_cat_id=second_cat.id,
mfr_part=data.get("mfr_part"),
package_id=package.id,
solder_joints=int(data.get("solder_joints")),
manufacturer_id=manufacturer.id,
library_type_id=library_type.id,
description=data.get("description"),
datasheet=data.get("datasheet"),
price=data.get("price"),
stock=int(data.get("stock")),
)
session.add(part)
await session.commit()
counters["parts"]["created"] += 1
else:
part = (await session.exec(
select(Part).where(
Part.lcsc_part == data.get("lcsc_part"),
Part.first_cat_id == first_cat.id,
Part.second_cat_id == second_cat.id,
Part.mfr_part == data.get("mfr_part"),
Part.package_id == package.id,
Part.solder_joints == int(data.get("solder_joints")),
Part.manufacturer_id == manufacturer.id,
Part.library_type_id == library_type.id,
Part.description == data.get("description"),
Part.datasheet == data.get("datasheet"),
Part.price == data.get("price"),
Part.stock == int(data.get("stock"))
)
)).first()
if not part:
# print(f"Part { data.get('lcsc_part')} found, update it now")
part.first_cat_id = first_cat.id
part.second_cat_id = second_cat.id
part.mfr_part = data.get("mfr_part")
part.package_id = package.id
part.solder_joints = int(data.get("solder_joints"))
part.manufacturer_id = manufacturer.id
part.library_type_id = library_type.id
part.description = data.get("description")
part.datasheet = data.get("datasheet")
part.price = data.get("price")
part.stock = int(data.get("stock"))
session.add(part)
await session.commit()
session.refresh(part)
counters["parts"]["updated"] += 1
else:
counters["parts"]["found"] += 1
async def main():
engine = create_async_engine(
"postgresql+asyncpg://postgres:jlcpcbpw@localhost:5432/jlcpcb" # , echo=True
)
counters = {
"relations": {"created": 0},
"parts": {"found": 0, "updated": 0, "created":0}
}
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
async with AsyncSession(engine) as session:
with open("parts.csv", newline="", encoding="iso-8859-1") as csvfile:
reader = csv.DictReader(
csvfile,
fieldnames=[
"lcsc_part",
"first_cat",
"second_cat",
"mfr_part",
"package",
"solder_joints",
"manufacturer",
"library_type",
"description",
"datasheet",
"price",
"stock",
],
)
next(reader)
start = time.time()
for n, row in enumerate(reader):
if n % 1000 == 0:
elapsed = int(time.time() - start)
print(f"Processed {n} rows so far, run for {elapsed} seconds now")
print(f"Parts: {counters['parts']['found']} found, {counters['parts']['updated']} updated, {counters['parts']['created']} created")
print(f"Relations: {counters['relations']['created']} created")
await create_or_update(session, row, counters)
end = time.time()
print(f"Total runtime was {int(end-start)} seconds")
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment