Last active
September 9, 2022 11:42
-
-
Save selimslab/24bb989f97b61d77ff58e71672d530ce to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from flask.json import JSONEncoder | |
import datetime | |
class CustomJSONEncoder(JSONEncoder): | |
"Add support for serializing time" | |
def default(self, o): | |
if type(o) == datetime.datetime: | |
return o.strftime("%Y-%m-%d %H:%M:%S.%f") | |
else: | |
return super().default(o) | |
basedir = os.path.abspath(os.path.dirname(__file__)) | |
DB_PATH = os.path.join(basedir, "tasks.sqlite") | |
SQLALCHEMY_DATABASE_URI = "sqlite:///" + DB_PATH | |
SQLALCHEMY_MIGRATE_REPO = os.path.join(basedir, "db_repository") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import os | |
from dataclasses import dataclass | |
from flask import Flask, request, jsonify, render_template, make_response | |
from flask_sqlalchemy import SQLAlchemy | |
from sqlalchemy import Column, Integer, Float, DateTime, String | |
from flask_restful import Api, Resource | |
from config import DB_PATH, CustomJSONEncoder | |
app = Flask(__name__) | |
app.config.from_pyfile("config.py") | |
app.json_encoder = CustomJSONEncoder | |
db = SQLAlchemy(app) | |
api = Api(app) | |
# Dataclasses used for simpler serialization | |
@dataclass | |
class Log(db.Model): | |
id: int | |
timestamp: datetime.datetime | |
type: str | |
description: str | |
id = Column(Integer, primary_key=True, autoincrement=True) | |
timestamp = Column(DateTime) | |
type = Column(String) | |
description = Column(String) | |
@dataclass | |
class Task(db.Model): | |
id: int | |
timestamp: datetime.datetime | |
temperature: float | |
duration: str | |
id = Column(Integer, primary_key=True) | |
timestamp = Column(DateTime) | |
temperature = Column(Float) | |
duration = Column(String) | |
class LogListView(Resource): | |
def get(self): | |
headers = {"Content-Type": "text/html"} | |
logs = Log.query.all() | |
return make_response(render_template("logs.html", logs=logs), 200, headers) | |
# shows a list of all todos, and lets you POST to add new tasks | |
class TaskListView(Resource): | |
def get(self): | |
headers = {"Content-Type": "text/html"} | |
tasks = Task.query.all() | |
log = Log(timestamp=datetime.datetime.now(), type="get", description="all") | |
db.session.add(log) | |
db.session.commit() | |
return make_response(render_template("index.html", tasks=tasks), 200, headers) | |
def post(self): | |
id = request.json.get("id") | |
timestamp = request.json.get("timestamp") | |
timestamp = datetime.datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S.%f") | |
task = Task( | |
id=id, | |
timestamp=timestamp, | |
temperature=request.json.get("temperature"), | |
duration=request.json.get("duration"), | |
) | |
db.session.add(task) | |
db.session.commit() | |
return jsonify({"id": id, "status": "201 created"}) | |
class TaskDetailView(Resource): | |
def get(self, task_id): | |
log = Log( | |
timestamp=datetime.datetime.now(), type="get", description=str(task_id) | |
) | |
db.session.add(log) | |
db.session.commit() | |
return jsonify(Task.query.get(task_id)) | |
def put(self): | |
pass | |
def delete(self): | |
pass | |
api.add_resource(TaskDetailView, "/tasks/<task_id>") | |
api.add_resource(TaskListView, "/tasks/") | |
api.add_resource(LogListView, "/logs/") | |
if __name__ == "__main__": | |
if not os.path.exists(DB_PATH): | |
db.create_all() | |
app.run(debug=True) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
from typing import Iterator | |
import requests | |
import itertools | |
from tqdm import tqdm | |
def row_generator(csv_path: str) -> Iterator[dict]: | |
""" | |
Read a csv file row by row | |
:param csv_path: | |
:return: dicts of csv rows | |
""" | |
try: | |
with open(csv_path, "r") as csv_file: | |
reader = csv.DictReader(csv_file) | |
for row in reader: | |
yield row | |
except FileNotFoundError as e: | |
print(e) | |
except IOError as e: | |
print(e) | |
def post_csv_rows_the_api(api_endpoint: str, data_generator: Iterator[dict]): | |
print("importing..") | |
for row in tqdm(data_generator): | |
r = requests.post(api_endpoint, json=row) | |
if r.status_code != 200: | |
print(r.status_code, r.reason) | |
print("IMPORT SUCCESSFUL!") | |
def test_post_csv_rows_the_api(): | |
""" Create 3 example rows on DB """ | |
test_generator = itertools.islice(ROW_GENERATOR, 3) | |
for row in test_generator: | |
print(row) | |
post_csv_rows_the_api(API_ENDPOINT, test_generator) | |
if __name__ == "__main__": | |
API_ENDPOINT = "http://127.0.0.1:5000/tasks/" | |
DATA_PATH = "task_data.csv" | |
ROW_GENERATOR = row_generator(DATA_PATH) | |
# test_post_csv_rows_the_api() | |
post_csv_rows_the_api(API_ENDPOINT, ROW_GENERATOR) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment