Skip to content

Instantly share code, notes, and snippets.

"""
Start process with different options
- detached
- with new console
- with custom envs
Example:
# python start_process.py --new-console --env PYTHONPATH=/my/path python myscript.py
"""
import subprocess
import os
@paulwinex
paulwinex / docker-compose.yml
Created December 26, 2024 16:40
Kafka priority queues
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
from contextlib import asynccontextmanager
from typing import Any, Optional
from fastapi import FastAPI, HTTPException, Depends
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker, selectinload, Mapped, mapped_column, relationship, declarative_base
from sqlalchemy import ForeignKey, JSON, select
from sqlalchemy.types import Enum
from pydantic import BaseModel
import enum
import json
import os
import time
import orjson
import ujson
file = 'data.json'
with open(file, 'r') as f:
file_data = f.read()
data = json.loads(file_data)
@paulwinex
paulwinex / main.py
Created December 11, 2024 14:51
fastapi app structure example
from fastapi import FastAPI, APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy import String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.future import select
from sqlalchemy.orm import mapped_column, Mapped
from typing import TypeVar, Generic, Annotated, List
from contextlib import asynccontextmanager
from pydantic import BaseModel, ConfigDict
"""
Requirements
python = "^3.11"
uvicorn = "^0.32.0"
SQLAlchemy = "^2.0.36"
asyncpg = "^0.30.0"
fastapi = "^0.115.4"
"""
import hou
class Connector:
container_node_type = ""
input_node_type = ""
input_connector_name = ""
def __init__(self, out_node):
self.out_node = out_node
from typing import List, Optional
import aiosqlite
from fastapi import FastAPI, HTTPException, Depends
from sqlalchemy import Column, Integer, String, Table, ForeignKey, select, UniqueConstraint
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_scoped_session, async_sessionmaker
from sqlalchemy.orm import relationship, mapped_column, Mapped, backref, DeclarativeBase, selectinload
from pydantic import BaseModel as BaseSchema, ConfigDict
from contextlib import asynccontextmanager
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey, Table
from sqlalchemy.orm import sessionmaker, relationship, backref, declared_attr, Mapped
from sqlalchemy.orm import declarative_base, Mapped, mapped_column
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.sql.expression import and_
from sqlalchemy import select, func, union_all, text
import asyncio
from sqlalchemy.ext.asyncio import (
AsyncSession,
create_async_engine,
import asyncio
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
from sqlalchemy.orm import declarative_base, relationship, selectin_polymorphic
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import Mapped
from sqlalchemy.future import select
from sqlalchemy.ext.asyncio import (
AsyncSession,
create_async_engine,
async_sessionmaker,