Skip to content

Instantly share code, notes, and snippets.

View mrmamongo's full-sized avatar
🗿
To code or not to code?

mrmamongo

🗿
To code or not to code?
View GitHub Profile
import asyncio
import logging
import nats
from pydantic import BaseModel
class User:
id: int = 1
name: str = "John Doe"
import asyncio
import enum
import logging
import nats
import nats.errors
from nats.aio.msg import Msg
from pydantic import BaseModel, ValidationError
import asyncio
import nats.errors
from nats import connect
from nats.aio.msg import Msg
PRIORITY = "high"
async def main():
@mrmamongo
mrmamongo / nats_microframework.py
Created May 22, 2023 12:05
This is a micro nats microframework scratch with handlers
import asyncio
import functools
import inspect
import logging
from typing import Type, TypeVar, Any
import aiofiles
import nats
import nats.js as njs
from nats.aio.msg import Msg
import asyncio
import aiohttp
async def insert_doc(index: str, document: dict, host: str = "localhost", port: int = 9308):
async with aiohttp.ClientSession(headers={"Content-Type": "application/json"}) as session:
async with session.post(url=f"http://{host}:{port}/insert", json={"index": index, "doc": document}) as resp:
if resp.status != 200:
raise RuntimeError("Error while inserting doc: manticore is not available")
import asyncio
import logging
import os
from application.common.exceptions import ApplicationException
from src.app import Application
from config import Config
logger = logging.getLogger(__name__)
package main
import (
"encoding/json"
"fmt"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
"time"
)
@mrmamongo
mrmamongo / thread.py
Created September 20, 2023 06:26
Convenience class for stoppable threads
class BaseThread(threading.Thread):
def __init__(self):
threading.Thread.init(self)
if hasattr(self, "daemon"):
self.daemon = True
else:
self.setDaemon(True)
self._stopped_event = threading.Event()
import queue
import threading
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from threading import Event
from time import sleep
class BaseThread(threading.Thread):
"""Convenience class for creating stoppable threads."""
import logging
from dataclasses import dataclass
from typing import Callable, Any, NamedTuple
from telethon import TelegramClient
from telethon.events import NewMessage
from telethon.events.common import EventBuilder
Handler = Callable[[Any], None]