Skip to content

Instantly share code, notes, and snippets.

View cassioeskelsen's full-sized avatar
🏠
Working from home

Cassio Rogerio Eskelsen cassioeskelsen

🏠
Working from home
  • Unico IDTech
  • Blumenau, SC, Brazil
  • 15:31 (UTC -03:00)
  • X @sricanesh
View GitHub Profile
apiVersion: apps/v1
kind: Deployment
metadata:
name: keda-python-rabbit
namespace: default
labels:
app: keda-python-rabbit
spec:
selector:
matchLabels:
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt
COPY . .
CMD ["python", "-m", "app.rabbit_consumer"]
@cassioeskelsen
cassioeskelsen / rabbit_consumer.py
Last active May 23, 2021 17:15
exemplo de consumidor de mensagens do rabbit
from time import sleep
from kombu import Connection, Exchange, Queue, Consumer, eventloop
from app.utils.logger import logger
exchange = Exchange('keda_demo', type='direct')
queue = Queue('keda_demo', exchange, routing_key='keda_demo')
def handle_message(body, message):
logger.debug(f'Received message: {body!r}')
@cassioeskelsen
cassioeskelsen / rabbit_publisher.py
Last active May 23, 2021 17:15
Publicação de mensagens de exemplo no Rabbit
import sys
from kombu import Connection, Producer, Exchange, Queue
exchange = Exchange('kombu_demo', type='direct')
messages = 1
if len(sys.argv) > 1:
try:
messages = int(sys.argv[1])
except:
class RavenDBDocumentStoreConnection(metaclass=Singleton):
def __init__(self, connection_string, database_name):
self.store = DocumentStore(urls=[connection_string], database=database_name)
self.store.initialize()
import threading
lock = threading.Lock()
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class Transaction:
id: str
account_id: str
date: datetime
operation_type: str # credit/debit
operation_description: str
import datetime
from typing import Optional
from fastapi import FastAPI
from pyravendb.store.document_store import DocumentStore
from src.models import Romaneio, Destinatario, Transportador, Item
app = FastAPI(title="Romaneios")
server = "http://192.168.0.156:8080" # altere para o seu servidor
store = None
from typing import List
class Destinatario:
def __init__(self, nome, endereco, cidade, uf):
self.nome = nome
self.endereco = endereco
self.cidade = cidade
self.uf = uf
{"numero":3,"data_hora_carga":"2021-04-07T08:25:00.000Z","destinatario":{"nome":"xxxx","endereco":"xxxx","cidade":"xxxx","uf":"xx"},"transportador":{"empresa":"Transportadora YY","nome_motorista":"Pedro","placa":"FPL-3433"},"itens":[{"qt_volume":12,"descricao":"Celular XPTO"}],"updates":[],"@metadata":{"@collection":"romaneios"}}