Created
April 22, 2025 20:20
-
-
Save arthurcolle/6a5d5c4f19e6ff0ef1e55f5c0967715d to your computer and use it in GitHub Desktop.
GIC backend main.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import httpx | |
import os | |
import redis | |
from fastapi import FastAPI, Request | |
from pydantic import BaseModel, BaseConfig | |
import httpx | |
import asyncio | |
API = os.getenv("FAKEMAIL", "https://1fef-216-158-152-64.ngrok-free.app") | |
WEBHOOK_URL = os.getenv("WEBHOOK_PUBLIC_URL", "https://260d-2600-4040-4930-d500-ad7a-1287-fa48-5d5d.ngrok-free.app") | |
global r | |
r = redis.Redis(host="localhost", port=6379, db=0) # ran locally for speed to delivery | |
lock = asyncio.Lock() | |
class Hook(BaseModel): | |
history_id: int | |
app = FastAPI() | |
# last_history_id - stored in redis | |
@app.on_event("startup") | |
async def init(): | |
payload_json = (await httpx.AsyncClient().post(f"{API}/watch")).json() | |
if 'history_id' in payload_json: | |
history_id = payload_json['history_id'] | |
print(f"history id: {history_id}") | |
r.set('last_history_id', history_id) | |
print("[redis] set hid") | |
print(payload_json) | |
@app.post("/webhook") | |
async def webhook_process(h: Hook): | |
async with lock, httpx.AsyncClient() as c: | |
last = int(r.get('last_history_id') or 0) | |
print('last digit: ', last) | |
email_ids = (await c.get(f"{API}/emails", params={"from_history_id": last})).json() | |
if 'email_ids' in email_ids: | |
email_ids = email_ids['email_ids'] | |
else: | |
print("critical failure, the end") | |
print('num IDs: ', len(email_ids)) | |
print("ids: ", email_ids) | |
for email_id in email_ids: | |
email = (await c.get(f"{API}/email/" + f"{email_id}")).json() | |
print(email) | |
email_id = email['id'] | |
email_body = email["body"] | |
email_subject = email["subject"] | |
cls = (await c.post(f"{API}/classify", json={"subject": email_subject, "body": email_body})).json() | |
print(cls) | |
print("--------") | |
classification_result = cls["classification"] | |
r.set(f"classification:{email_id}", classification_result) | |
r.set("last_history_id", h.history_id) | |
@app.get("/results/{email_id}") | |
async def results(eid: str): | |
classification_result = r.get(f"classification:{eid}") | |
if classification_result is None: | |
raise Exception(404, "email / classification not found") | |
return {"email_id": eid, "classification": classification_result} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment