Skip to content

Instantly share code, notes, and snippets.

View cassioeskelsen's full-sized avatar
🏠
Working from home

Cassio Rogerio Eskelsen cassioeskelsen

🏠
Working from home
  • Unico IDTech
  • Blumenau, SC, Brazil
  • 04:54 (UTC -03:00)
  • X @sricanesh
View GitHub Profile
async def main() -> dict:
async with redis.client() as conn:
keys = await conn.keys("votos_*")
for key in keys:
if key != 'votos_bd':
print("Votos de " + key.split("_")[1] + " - " + await conn.get(key))
async def main() -> dict:
async with redis.client() as conn:
while True:
chunk = []
for _ in range(0, 100):
item = await conn.lpop("votos_bd")
if item is not None:
chunk = chunk + json.loads(item)
else:
break
async def main():
global total_votes
async with redis.client() as conn:
while True:
chunk = []
votes = {}
for _ in range(0, 1000):
item = await conn.lpop("votos")
if item is not None:
obj = json.loads(item)
@app.route("/vote", methods=['POST'])
async def vote(request):
choice = randrange(3)
async with redis.conn as conn:
await conn.rpush("votos", json.dumps(
{"ip": request.ip, "user-agent": request.headers['user-agent'], "body": f"choice={choice}"}))
return response.json(
{'message': 'Voto Recebido'},
status=202
import logging
import os
log_level = logging.DEBUG
if os.getenv('LOG_LEVEL', "DEBUG") == "DEBUG":
log_level = logging.DEBUG
elif os.getenv('LOG_LEVEL') == "INFO":
log_level = logging.INFO
elif os.getenv('LOG_LEVEL') == "ERROR":
#!make
export PYTHON=python
default: run_api
run_api:
$(PYTHON) -m src.xpto.api.main
build_api_c:
FROM python:3.10-slim
ARG SOURCE_FOLDER
WORKDIR /src
RUN apt-get update && apt-get install -y git tzdata gcc
ENV TZ America/Sao_Paulo
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt
import httpx as httpx
import uvicorn
from fastapi import FastAPI
from fastapi.encoders import jsonable_encoder
from src.xpto.api.config.api_settings import ApiSettings
from src.xpto.common.models.order import Order
app = FastAPI()
api_settings = ApiSettings()
from src.xpto.common.config.common_settings import CommonSettings
class InvoicesSettings(CommonSettings):
mongodb_conn_string: str
class Config:
env_file = 'settings/invoices-dev.env'
fields = {
'mongodb_conn_string': {
from pydantic import BaseSettings, Field
class CommonSettings(BaseSettings):
environment: str = Field(default='homolog')
http_port: int
class Config:
env_file_encoding = 'utf-8'
fields = {