This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
async def main() -> dict: | |
async with redis.client() as conn: | |
keys = await conn.keys("votos_*") | |
for key in keys: | |
if key != 'votos_bd': | |
print("Votos de " + key.split("_")[1] + " - " + await conn.get(key)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
async def main() -> dict: | |
async with redis.client() as conn: | |
while True: | |
chunk = [] | |
for _ in range(0, 100): | |
item = await conn.lpop("votos_bd") | |
if item is not None: | |
chunk = chunk + json.loads(item) | |
else: | |
break |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
async def main(): | |
global total_votes | |
async with redis.client() as conn: | |
while True: | |
chunk = [] | |
votes = {} | |
for _ in range(0, 1000): | |
item = await conn.lpop("votos") | |
if item is not None: | |
obj = json.loads(item) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@app.route("/vote", methods=['POST']) | |
async def vote(request): | |
choice = randrange(3) | |
async with redis.conn as conn: | |
await conn.rpush("votos", json.dumps( | |
{"ip": request.ip, "user-agent": request.headers['user-agent'], "body": f"choice={choice}"})) | |
return response.json( | |
{'message': 'Voto Recebido'}, | |
status=202 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import os | |
log_level = logging.DEBUG | |
if os.getenv('LOG_LEVEL', "DEBUG") == "DEBUG": | |
log_level = logging.DEBUG | |
elif os.getenv('LOG_LEVEL') == "INFO": | |
log_level = logging.INFO | |
elif os.getenv('LOG_LEVEL') == "ERROR": |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!make | |
export PYTHON=python | |
default: run_api | |
run_api: | |
$(PYTHON) -m src.xpto.api.main | |
build_api_c: |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
FROM python:3.10-slim | |
ARG SOURCE_FOLDER | |
WORKDIR /src | |
RUN apt-get update && apt-get install -y git tzdata gcc | |
ENV TZ America/Sao_Paulo | |
COPY requirements.txt requirements.txt | |
RUN pip install -r requirements.txt |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import httpx as httpx | |
import uvicorn | |
from fastapi import FastAPI | |
from fastapi.encoders import jsonable_encoder | |
from src.xpto.api.config.api_settings import ApiSettings | |
from src.xpto.common.models.order import Order | |
app = FastAPI() | |
api_settings = ApiSettings() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from src.xpto.common.config.common_settings import CommonSettings | |
class InvoicesSettings(CommonSettings): | |
mongodb_conn_string: str | |
class Config: | |
env_file = 'settings/invoices-dev.env' | |
fields = { | |
'mongodb_conn_string': { |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pydantic import BaseSettings, Field | |
class CommonSettings(BaseSettings): | |
environment: str = Field(default='homolog') | |
http_port: int | |
class Config: | |
env_file_encoding = 'utf-8' | |
fields = { |