Created
April 25, 2023 08:10
-
-
Save MaxClerkwell/2176ef050d26ae6276ed4965e0bd4857 to your computer and use it in GitHub Desktop.
FastAPI to upload .csv and write into Database
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
import io | |
from fastapi import FastAPI, File, UploadFile, HTTPException | |
from sqlalchemy import create_engine, Column, Integer, String, Float, ForeignKey | |
from sqlalchemy.ext.declarative import declarative_base | |
from sqlalchemy.orm import declarative_base, relationship, sessionmaker | |
# Datenbankkonfiguration | |
DATABASE_URL = "sqlite:///./sensordaten.db" | |
engine = create_engine(DATABASE_URL) | |
Base = declarative_base() | |
# Tabellen | |
class Sensor(Base): | |
__tablename__ = "sensors" | |
id = Column(Integer, primary_key=True) | |
sensor_id = Column(String, unique=True) | |
measurements = relationship("Measurement", back_populates="sensor") | |
class Measurement(Base): | |
__tablename__ = "measurements" | |
id = Column(Integer, primary_key=True) | |
sensor_id = Column(Integer, ForeignKey("sensors.id")) | |
timestamp = Column(String) | |
humidity = Column(Float) | |
sensor = relationship("Sensor", back_populates="measurements") | |
# Tabellen erstellen | |
Base.metadata.create_all(bind=engine) | |
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) | |
app = FastAPI() | |
@app.post("/uploadcsv/") | |
async def upload_csv(file: UploadFile = File(...)): | |
if file.content_type != "text/csv": | |
raise HTTPException(status_code=400, detail="File must be in CSV format.") | |
content = (await file.read()).decode("utf-8") | |
csvfile = csv.reader(io.StringIO(content), delimiter=",") | |
session = SessionLocal() | |
try: | |
for row in csvfile: | |
sensor_id, timestamp, humidity = row | |
sensor = session.query(Sensor).filter(Sensor.sensor_id == sensor_id).first() | |
if not sensor: | |
sensor = Sensor(sensor_id=sensor_id) | |
session.add(sensor) | |
session.flush() | |
measurement = Measurement(sensor_id=sensor.id, timestamp=timestamp, humidity=float(humidity)) | |
session.add(measurement) | |
session.flush() | |
session.commit() | |
except Exception as e: | |
session.rollback() | |
raise HTTPException(status_code=400, detail=str(e)) | |
finally: | |
session.close() | |
return {"status": "success"} | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="127.0.0.1", port=8000) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment