Created
June 7, 2020 06:37
-
-
Save rednafi/ee2731cefcedfa0366f88d65aabd8006 to your computer and use it in GitHub Desktop.
Seems like aioredis isn't being maintained anymore. This uses redis-py in async mode
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import os | |
import sys | |
from datetime import timedelta | |
import httpx | |
import redis | |
from dotenv import load_dotenv | |
from fastapi import FastAPI | |
from asgiref.sync import sync_to_async | |
import asyncio | |
load_dotenv() | |
@sync_to_async | |
def redis_connect() -> redis.Redis: | |
try: | |
client = redis.Redis( | |
host=os.environ.get("HOST"), | |
port=int(os.environ.get("REDIS_PORT")), | |
password=os.environ.get("REDIS_PASSWORD"), | |
db=0, | |
socket_timeout=5, | |
) | |
ping = client.ping() | |
if ping is True: | |
return client | |
except redis.AuthenticationError: | |
print("AuthenticationError") | |
sys.exit(1) | |
async def get_routes_from_api(coordinates: str) -> dict: | |
"""Data from mapbox api.""" | |
async with httpx.AsyncClient() as client: | |
base_url = "https://api.mapbox.com/optimized-trips/v1/mapbox/driving" | |
geometries = "geojson" | |
access_token = os.environ.get("MAPBOX_ACCESS_TOKEN") | |
url = f"{base_url}/{coordinates}?geometries={geometries}&access_token={access_token}" | |
response = await client.get(url) | |
return response.json() | |
@sync_to_async | |
def get_routes_from_cache(client: redis.Redis, key: str) -> str: | |
"""Data from redis.""" | |
val = client.get(key) | |
return val | |
@sync_to_async | |
def set_routes_to_cache(client: redis.Redis, key: str, value: str) -> bool: | |
"""Data to redis.""" | |
state = client.setex(key, timedelta(seconds=3600), value=value,) | |
return state | |
async def route_optima(coordinates: str) -> dict: | |
redis_client = await redis_connect() | |
# First it looks for the data in redis cache | |
data = await get_routes_from_cache(redis_client, key=coordinates) | |
# If cache is found then serves the data from cache | |
if data is not None: | |
data = json.loads(data) | |
data["cache"] = True | |
return data | |
else: | |
# If cache is not found then sends request to the MapBox API | |
data = await get_routes_from_api(coordinates) | |
# This block sets saves the respose to redis and serves it directly | |
if data["code"] == "Ok": | |
data["cache"] = False | |
data = json.dumps(data) | |
state = await set_routes_to_cache(redis_client, key=coordinates, value=data) | |
if state is True: | |
return json.loads(data) | |
return data | |
app = FastAPI() | |
@app.get("/route-optima/{coordinates}") | |
async def view(coordinates: str) -> dict: | |
"""This will wrap our original route optimization API and | |
incorporate Redis Caching. You'll only expose this API to | |
the end user. """ | |
# coordinates = "90.3866,23.7182;90.3742,23.7461" | |
# asyncio.run(route_optima(coordinates)) | |
return await route_optima(coordinates) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment