Skip to content

Instantly share code, notes, and snippets.

@dmateos
Created August 5, 2020 10:39
Show Gist options
  • Save dmateos/bc65215b7639d49f26d1beb24aa9f51b to your computer and use it in GitHub Desktop.
Save dmateos/bc65215b7639d49f26d1beb24aa9f51b to your computer and use it in GitHub Desktop.
1 import fastapi
2 import pydantic
3 import typing
4 import redis
5
6
7 class Blob(pydantic.BaseModel):
8 handle: str
9 data: str
10 lifetime: int
11
12
13 class BlobException(Exception):
14 pass
15
16
17 class BlobStorage:
18 def __init__(self):
19 self.engine = redis.Redis(host="redis0.mateos.lan", port=6379)
20
21 def save(self, key, value):
22 try:
23 self.engine.set(key, value)
24 except Exception:
25 raise BlobException("could not get value")
26
27 def get(self, key):
28 try:
29 return self.engine.get(key)
30 except Exception:
31 raise BlobException("could not set value")
32
33
34 app = fastapi.FastAPI()
35 storage = BlobStorage()
36
37
38 @app.get("/{blob_handle}")
39 def get_blob(blob_handle: str, q: typing.Optional[str] = None):
40 try:
41 print(blob_handle)
42 data = storage.get(blob_handle)
43 return {"data": data}
44 except BlobException:
45 raise fastapi.HTTPException(status_code=404, detail="blob not found")
46
47
48 @app.post("/")
49 def push_blob(blob: Blob):
50 try:
51 storage.save(blob.handle, blob.data)
52 return {"id": blob.handle}
53 except BlobException:
54 raise fastapi.HTTPException(status_code=403, detail="blob exists")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment