Skip to content

Instantly share code, notes, and snippets.

View mrmamongo's full-sized avatar
🗿
To code or not to code?

mrmamongo

🗿
To code or not to code?
View GitHub Profile
import queue
import threading
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from threading import Event
from time import sleep
class BaseThread(threading.Thread):
"""Convenience class for creating stoppable threads."""
@mrmamongo
mrmamongo / thread.py
Created September 20, 2023 06:26
Convenience class for stoppable threads
class BaseThread(threading.Thread):
def __init__(self):
threading.Thread.init(self)
if hasattr(self, "daemon"):
self.daemon = True
else:
self.setDaemon(True)
self._stopped_event = threading.Event()
package main
import (
"encoding/json"
"fmt"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
"time"
)
import asyncio
import logging
import os
from application.common.exceptions import ApplicationException
from src.app import Application
from config import Config
logger = logging.getLogger(__name__)
import asyncio
import aiohttp
async def insert_doc(index: str, document: dict, host: str = "localhost", port: int = 9308):
async with aiohttp.ClientSession(headers={"Content-Type": "application/json"}) as session:
async with session.post(url=f"http://{host}:{port}/insert", json={"index": index, "doc": document}) as resp:
if resp.status != 200:
raise RuntimeError("Error while inserting doc: manticore is not available")
@mrmamongo
mrmamongo / nats_microframework.py
Created May 22, 2023 12:05
This is a micro nats microframework scratch with handlers
import asyncio
import functools
import inspect
import logging
from typing import Type, TypeVar, Any
import aiofiles
import nats
import nats.js as njs
from nats.aio.msg import Msg
import asyncio
import nats.errors
from nats import connect
from nats.aio.msg import Msg
PRIORITY = "high"
async def main():
import asyncio
import enum
import logging
import nats
import nats.errors
from nats.aio.msg import Msg
from pydantic import BaseModel, ValidationError
import asyncio
import logging
import nats
from pydantic import BaseModel
class User:
id: int = 1
name: str = "John Doe"