Skip to content

Instantly share code, notes, and snippets.

@SF-300
SF-300 / sdi.py
Last active February 10, 2023 10:25
Standalone FastAPI-like dependency injector POC
import asyncio
import contextlib
import inspect
from inspect import Parameter, Signature
from dataclasses import dataclass
from contextlib import AsyncExitStack, AbstractContextManager, AbstractAsyncContextManager
from typing import Callable, ParamSpec, TypeVar
__all__ = "Depends", "inject_and_run"
@SF-300
SF-300 / fastapi_utils.py
Last active July 25, 2024 14:13
Mounting ASGI subapps through FastAPI's router
import unittest.mock
from fastapi import FastAPI as _FastAPI, APIRouter as _APIRouter, routing
__all__ = "FastAPI", "APIRouter"
class APIRouter(_APIRouter):
def include_router(self, router, **kwargs) -> None:
prefix = kwargs.get("prefix", "")
@SF-300
SF-300 / TestBinaryComparator.scala
Last active August 29, 2015 14:19
Akka-reactive-streams ByteString comparator junction for arbitary number of inputs. Emits stream of boolean true until first mismatch occured/main input terminated, then emits single false value and terminates.
import akka.util.ByteString
import akka.stream._
import FanInShape.Name
import FanInShape.Init
class BinaryComparatorPorts(n: Int, _init: Init[Boolean] = Name("BinaryComparator")) extends UniformFanInShape[ByteString, Boolean](n, _init) {
// comparison target; other ports are used for possible duplicates
val src = newInlet[ByteString]("src")
protected override def construct(init: Init[Boolean]) = new BinaryComparatorPorts(n, init)