Last active
March 16, 2025 11:25
-
-
Save LeeeeT/ab62db218539048e5a0aa08d38e45be0 to your computer and use it in GitHub Desktop.
top 5 sins
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
import asyncio | |
from collections.abc import Callable | |
from dataclasses import dataclass | |
from typing import Protocol | |
import felis | |
from felis import lazy_coroutine_either_list, lazy_coroutine_either, lazy_coroutine, lazy, either, option | |
def acache[T](function: felis.LazyCoroutine[T]) -> felis.LazyCoroutine[T]: | |
result: felis.Option[T] = None | |
async def wrapper() -> T: | |
nonlocal result | |
match result: | |
case None: | |
result = option.Some(value := await function()) | |
case option.Some(value): | |
pass | |
return value | |
return wrapper | |
class PostponeConstructor[From, To](Protocol): | |
def __call__[**P](self, function: Callable[P, From]) -> Callable[P, To]: ... | |
def postpone[From, To](constructor: Callable[[felis.Lazy[From]], To]) -> PostponeConstructor[From, To]: | |
def decorator[**P](function: Callable[P, From]) -> Callable[P, To]: | |
def wrapper(*args: P.args, **kwargs: P.kwargs) -> To: | |
return constructor(lambda: function(*args, **kwargs)) | |
return wrapper | |
return decorator | |
@dataclass(frozen=True) | |
class Lazy[T]: | |
value: felis.Lazy[T] | |
def map[NewT](self, function: Callable[[T], NewT]) -> Lazy[NewT]: | |
return Lazy(lazy.map(function)(self)) | |
def bind[NewT](self, function: Callable[[T], felis.Lazy[NewT]]) -> Lazy[NewT]: | |
return Lazy(lazy.bind(self)(function)) | |
def to_coroutine(self) -> LazyCoroutine[T]: | |
async def wrapper(): | |
return self() | |
return LazyCoroutine(wrapper) | |
def __call__(self) -> T: | |
return self.value() | |
@dataclass(frozen=True) | |
class LazyCoroutine[T]: | |
value: felis.LazyCoroutine[T] | |
def map[NewT](self, function: Callable[[T], NewT]) -> LazyCoroutine[NewT]: | |
return LazyCoroutine(lazy_coroutine.map(function)(self)) | |
def bind[NewT](self, function: Callable[[T], felis.LazyCoroutine[NewT]]) -> LazyCoroutine[NewT]: | |
return LazyCoroutine(lazy_coroutine.bind(self)(function)) | |
def cache(self) -> LazyCoroutine[T]: | |
return LazyCoroutine(acache(self)) | |
def outer(self) -> Lazy[felis.Coroutine[T]]: | |
return Lazy(self) | |
def __call__(self) -> felis.Coroutine[T]: | |
return self.value() | |
@dataclass(frozen=True) | |
class LazyCoroutineEither[L, R]: | |
value: felis.LazyCoroutineEither[L, R] | |
def map[NewR](self, function: Callable[[R], NewR]) -> LazyCoroutineEither[L, NewR]: | |
return LazyCoroutineEither(lazy_coroutine_either.map(function)(self)) | |
def bind[NewR](self, function: Callable[[R], felis.LazyCoroutineEither[L, NewR]]) -> LazyCoroutineEither[L, NewR]: | |
return LazyCoroutineEither(lazy_coroutine_either.bind(self)(function)) | |
def default(self, value: felis.LazyCoroutine[R]) -> LazyCoroutine[R]: | |
return LazyCoroutine(lazy_coroutine_either.default(value)(self)) | |
def add(self, addend: felis.LazyCoroutineEither[L, R]) -> LazyCoroutineEither[L, R]: | |
return LazyCoroutineEither(lazy_coroutine_either.add(addend)(self)) | |
def cache(self) -> LazyCoroutineEither[L, R]: | |
return LazyCoroutineEither(acache(self)) | |
def outer(self) -> LazyCoroutine[felis.Either[L, R]]: | |
return LazyCoroutine(self) | |
def __call__(self) -> felis.Coroutine[felis.Either[L, R]]: | |
return self.value() | |
@dataclass(frozen=True) | |
class LazyCoroutineEitherList[L, R]: | |
value: felis.LazyCoroutineEitherList[L, R] | |
def map[NewR](self, function: Callable[[R], NewR]) -> LazyCoroutineEitherList[L, NewR]: | |
return LazyCoroutineEitherList(lazy_coroutine_either_list.map(function)(self)) | |
def bind[NewR](self, function: Callable[[R], felis.LazyCoroutineEitherList[L, NewR]]) -> LazyCoroutineEitherList[L, NewR]: | |
return LazyCoroutineEitherList(lazy_coroutine_either_list.bind(self)(function)) | |
def cache(self) -> LazyCoroutineEitherList[L, R]: | |
return LazyCoroutineEitherList(acache(self)) | |
def outer(self) -> LazyCoroutineEither[L, felis.List[R]]: | |
return LazyCoroutineEither(self) | |
def __call__(self) -> felis.Coroutine[felis.Either[L, felis.List[R]]]: | |
return self.value() | |
async def main(): | |
@dataclass(frozen=True) | |
class User: | |
pass | |
@dataclass(frozen=True) | |
class Post: | |
views: int | |
@LazyCoroutineEitherList[str, User] | |
async def get_users() -> felis.Either[str, felis.List[User]]: | |
print("Fetching users") | |
return either.Left("Error fetching users") | |
# return either.Right([User(), User()]) | |
@postpone(LazyCoroutineEitherList[str, Post]) | |
async def get_posts(limit: int) -> felis.Either[str, felis.List[Post]]: | |
print("Fetching posts", f"{limit=}") | |
return either.Left("Error fetching posts") | |
# return either.Right([Post(1), Post(2)]) | |
@Lazy | |
def answer_of_life() -> int: | |
print("Computing for 7.5 million years") | |
return 42 | |
approximate_members = get_posts(limit=5)\ | |
.map(lambda post: post.views)\ | |
.outer()\ | |
.map(sum)\ | |
.cache() | |
members_count = get_users\ | |
.outer()\ | |
.map(len)\ | |
.add(approximate_members)\ | |
.cache() | |
count_with_default = members_count\ | |
.default(answer_of_life.to_coroutine())\ | |
.cache() | |
print(await count_with_default(), await members_count(), await approximate_members()) | |
asyncio.run(main()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import collections.abc | |
from collections.abc import Callable | |
from dataclasses import dataclass | |
from typing import Any | |
def curry(function): | |
return lambda *rest: lambda first: function(first, *rest) | |
def acache(function): | |
SENTINEL = object() | |
result = SENTINEL | |
async def wrapper(*args, **kwargs): | |
nonlocal result | |
if result is SENTINEL: | |
result = await function(*args, **kwargs) | |
return result | |
return wrapper | |
# [M : * -> *] -> | |
# ([From : *] -> [To : *] -> (From -> To) -> M From -> M To) -> | |
# ([T : *] -> M (M T) -> M T) -> | |
# [From : *] -> [To : *] -> (From -> M To) -> M From -> M To | |
@curry | |
@curry | |
@curry | |
def bind(m_value, function, join, map): | |
return join(map(function)(m_value)) | |
# [M : * -> *] -> | |
# ([T : *] -> T -> M T) -> | |
# ([From : *] -> [To : *] -> (From -> M To) -> M From -> M To) -> | |
# [From : *] -> [Intermediate : *] -> [To : *] -> (From -> M Intermediate) -> (Intermediate -> M To) -> From -> M To | |
@curry | |
@curry | |
@curry | |
@curry | |
def compose(value, second, first, m_bind, m_pure): | |
return m_bind(second)(first(value)) | |
# [From : *] -> [To : *] -> (From -> To) -> From -> To | |
@curry | |
def identity_map(value, function): | |
return function(value) | |
# [T : *] -> T -> T | |
def identity_pure(value): | |
return value | |
# [T : *] -> T -> T | |
def identity_join(value): | |
return value | |
# [From : *] -> [To : *] -> (From -> To) -> From -> To | |
identity_bind = bind(identity_map)(identity_join) | |
# [From : *] -> [Intermediate : *] -> [To : *] -> (From -> Intermediate) -> (Intermediate -> To) -> From -> To | |
identity_compose = compose(identity_pure)(identity_bind) | |
class Result: | |
def map(self, function): | |
return result_map(function)(self) | |
def bind(self, function): | |
return result_bind(function)(self) | |
@dataclass(frozen=True) | |
class Err(Result): | |
error: ... | |
@dataclass(frozen=True) | |
class Ok(Result): | |
value: ... | |
# [E : *] -> [From : *] -> [To : *] -> (From -> To) -> Result E From -> Result E To | |
@curry | |
def result_map(result_value, function): | |
match result_value: | |
case Ok(value): | |
return Ok(function(value)) | |
case Err(error): | |
return Err(error) | |
# [E : *] -> [T : *] -> T -> Result E T | |
def result_pure(value): | |
return Ok(value) | |
# [M : * -> *] -> | |
# ([T : *] -> T -> M T) -> | |
# ([From : *] -> [To : *] -> (From -> M To) -> M From -> M To) -> | |
# [E : *] -> [T : *] -> M (Result E (M (Result E T))) -> M (Result E T) | |
@curry | |
@curry | |
def result_t_join(m_result_m_result_value, m_bind, m_pure): | |
# Result E (M (Result E T)) -> M (Result E T) | |
def result_m_result_binder(result_m_result_value): | |
match result_m_result_value: | |
case Err(error): | |
return m_pure(Err(error)) | |
case Ok(m_result_value): | |
return m_result_value | |
return m_bind(result_m_result_binder)(m_result_m_result_value) | |
# [M : * -> *] -> | |
# ([T : *] -> T -> M T) -> | |
# ([From : *] -> [To : *] -> (From -> M To) -> M From -> M To) -> | |
# [E : *] -> [T : *] -> M T -> M (Result E T) -> M T | |
@curry | |
@curry | |
@curry | |
def result_t_default(m_result_value, m_default, m_bind, m_pure): | |
def result_binder(result_value): | |
match result_value: | |
case Err(error): | |
return m_default | |
case Ok(value): | |
return m_pure(value) | |
return m_bind(result_binder)(m_result_value) | |
# [M : * -> *] -> | |
# ([T : *] -> T -> M T) -> | |
# ([From : *] -> [To : *] -> (From -> M To) -> M From -> M To) -> | |
# [E : *] -> [T : *] -> M (Result E T) -> M (Result E T) -> M (Result E T) | |
@curry | |
@curry | |
@curry | |
def result_t_add(m_result_augend, m_result_addend, m_bind, m_pure): | |
def result_augend_binder(result_augend): | |
match result_augend: | |
case Err(error): | |
return m_result_addend | |
case Ok(value): | |
return m_pure(Ok(value)) | |
return m_bind(result_augend_binder)(m_result_augend) | |
# [E : *] -> [T : *] -> Result E (Result E T) -> Result E T | |
result_join = result_t_join(identity_pure)(identity_bind) | |
# [E : *] -> [From : *] -> [To : *] -> (From -> Result E To) -> Result E From -> Result E To | |
result_bind = bind(result_map)(result_join) | |
# [E : *] -> [From : *] -> [Intermediate : *] -> [To : *] -> (From -> Result E Intermediate) -> (Intermediate -> Result E To) -> From -> Result E To | |
result_compose = compose(result_pure)(result_bind) | |
# [E : *] -> [T : *] -> T -> Result E T -> T | |
result_default = result_t_default(identity_pure)(identity_bind) | |
# [E : *] -> [T : *] -> Result E T -> Result E T -> Result E T | |
result_add = result_t_add(identity_pure)(identity_bind) | |
class List(list): | |
def map(self, function): | |
return List(list_map(function)(self)) | |
def bind(self, function): | |
return List(list_bind(function)(self)) | |
# [From : *] -> [To : *] -> (From -> To) -> List From -> List To | |
@curry | |
def list_map(list_value, function): | |
match list_value: | |
case []: | |
return [] | |
case [head, *tail]: | |
return [function(head), *list_map(function)(tail)] | |
# [T : *] -> T -> List T | |
def list_pure(value): | |
return [value] | |
# [M : * -> *] -> | |
# ([T : *] -> T -> M T) -> | |
# ([From : *] -> [To : *] -> (From -> M To) -> M From -> M To) -> | |
# [T : *] -> M (List (M (List T))) -> M (List T) | |
@curry | |
@curry | |
def list_t_join(m_list_m_list_value, m_bind, m_pure): | |
# List (M (List T)) -> M (List T) | |
def list_m_list_binder(list_m_list_value): | |
m_list = m_pure([]) | |
for m_list_value in list_m_list_value: | |
m_list = m_bind(m_list)(lambda first: m_bind(m_list_value)(lambda second: m_pure(first + second))) | |
return m_list | |
return m_bind(list_m_list_binder)(m_list_m_list_value) | |
# [T : *] -> List (List T) -> List T | |
list_join = list_t_join(identity_pure)(identity_bind) | |
# [From : *] -> [To : *] -> (From -> List To) -> List From -> List To | |
list_bind = bind(list_map)(list_join) | |
# [From : *] -> [Intermediate : *] -> [To : *] -> (From -> List Intermediate) -> (Intermediate -> List To) -> From -> List To | |
list_compose = compose(list_pure)(list_bind) | |
@dataclass(frozen=True) | |
class Coroutine: | |
value: ... | |
def map(self, function): | |
return Coroutine(coroutine_map(function)(self)) | |
def bind(self, function): | |
return Coroutine(coroutine_bind(function)(self)) | |
async def __await__(self): | |
return await self.value | |
# [From : *] -> [To : *] -> (From -> To) -> Coroutine From -> Coroutine To | |
@curry | |
async def coroutine_map(coroutine_value, function): | |
return function(await coroutine_value) | |
# [T : *] -> T -> Coroutine T | |
async def coroutine_pure(value): | |
return value | |
# [T : *] -> Coroutine (Coroutine T) -> Coroutine T | |
async def coroutine_join(coroutine_coroutine_value): | |
return await (await coroutine_coroutine_value) | |
# [From : *] -> [To : *] -> (From -> Coroutine To) -> Coroutine From -> Coroutine To | |
coroutine_bind = bind(coroutine_map)(coroutine_join) | |
# [From : *] -> [Intermediate : *] -> [To : *] -> (From -> Coroutine Intermediate) -> (Intermediate -> Coroutine To) -> From -> Coroutine To | |
coroutine_compose = compose(coroutine_pure)(coroutine_bind) | |
@dataclass(frozen=True) | |
class Lazy: | |
value: ... | |
def map(self, function): | |
return Lazy(lazy_map(function)(self)) | |
def bind(self, function): | |
return Lazy(lazy_bind(function)(self)) | |
def to_coroutine(self): | |
return Lazy(lazy_to_coroutine(self)) | |
def __call__(self): | |
return self.value() | |
# [From : *] -> [To : *] -> (From -> To) -> Lazy From -> Lazy To | |
@curry | |
def lazy_map(lazy_value, function): | |
return lambda: function(lazy_value()) | |
# [T : *] -> T -> Lazy T | |
def lazy_pure(value): | |
return lambda: value | |
# [M : * -> *] -> | |
# ([T : *] -> T -> M T) -> | |
# ([From : *] -> [To : *] -> (From -> M To) -> M From -> M To) -> | |
# [T : *] -> Lazy (M (Lazy (M T))) -> Lazy (M T) | |
@curry | |
@curry | |
def lazy_t_join(lazy_m_lazy_m_value, m_bind, m_pure): | |
return lambda: m_bind(lambda lazy_m_value: lazy_m_value())(lazy_m_lazy_m_value()) | |
# [T : *] -> Lazy (Lazy T) -> Lazy T | |
lazy_join = lazy_t_join(identity_pure)(identity_bind) | |
# [From : *] -> [To : *] -> (From -> Lazy To) -> Lazy From -> Lazy To | |
lazy_bind = bind(lazy_map)(lazy_join) | |
# [From : *] -> [Intermediate : *] -> [To : *] -> (From -> Lazy Intermediate) -> (Intermediate -> Lazy To) -> From -> Lazy To | |
lazy_compose = compose(lazy_pure)(lazy_bind) | |
# [T : *] -> Lazy T -> Lazy (Coroutine T) | |
def lazy_to_coroutine(lazy_value): | |
async def coroutine(): | |
return lazy_value() | |
return coroutine | |
def postpone(constructor): | |
def decorator(function): | |
def wrapper(*args, **kwargs): | |
return constructor(lambda: function(*args, **kwargs)) | |
return wrapper | |
return decorator | |
@dataclass(frozen=True) | |
class LazyCoroutine: | |
value: ... | |
def map(self, function): | |
return LazyCoroutine(lazy_coroutine_map(function)(self)) | |
def bind(self, function): | |
return LazyCoroutine(lazy_coroutine_bind(function)(self)) | |
def cache(self): | |
return LazyCoroutine(acache(self)) | |
def outer(self): | |
return Lazy(self.value) | |
def __call__(self): | |
return self.value() | |
# [From : *] -> [To : *] -> (From -> To) -> Lazy (Coroutine From) -> Lazy (Coroutine To) | |
lazy_coroutine_map = identity_compose(coroutine_map)(lazy_map) | |
# [T : *] -> T -> Lazy (Coroutine T) | |
lazy_coroutine_pure = identity_compose(coroutine_pure)(lazy_pure) | |
# [T : *] -> Lazy (Coroutine (Lazy (Coroutine T))) -> Lazy (Coroutine T) | |
lazy_coroutine_join = lazy_t_join(coroutine_pure)(coroutine_bind) | |
# [From : *] -> [To : *] -> (From -> Lazy (Coroutine To)) -> Lazy (Coroutine From) -> Lazy (Coroutine To) | |
lazy_coroutine_bind = bind(lazy_coroutine_map)(lazy_coroutine_join) | |
# [From : *] -> [Intermediate : *] -> [To : *] -> (From -> Lazy (Coroutine Intermediate)) -> (Intermediate -> Lazy (Coroutine To)) -> From -> Lazy (Coroutine To) | |
lazy_coroutine_compose = compose(lazy_coroutine_pure)(lazy_coroutine_bind) | |
@dataclass(frozen=True) | |
class LazyCoroutineResult: | |
value: ... | |
def map(self, function): | |
return LazyCoroutineResult(lazy_coroutine_result_map(function)(self)) | |
def bind(self, function): | |
return LazyCoroutineResult(lazy_coroutine_result_bind(function)(self)) | |
def default(self, value): | |
return LazyCoroutine(lazy_coroutine_result_default(value)(self)) | |
def add(self, addend): | |
return LazyCoroutineResult(lazy_coroutine_result_add(addend)(self)) | |
def cache(self): | |
return LazyCoroutineResult(acache(self)) | |
def outer(self): | |
return LazyCoroutine(self.value) | |
def __call__(self): | |
return self.value() | |
# [E : *] -> [From : *] -> [To : *] -> (From -> To) -> Lazy (Coroutine (Result E From)) -> Lazy (Coroutine (Result E To)) | |
lazy_coroutine_result_map = identity_compose(result_map)(lazy_coroutine_map) | |
# [E : *] -> [T : *] -> T -> Lazy (Coroutine (Result E T)) | |
lazy_coroutine_result_pure = identity_compose(result_pure)(lazy_coroutine_pure) | |
# [E : *] -> [T : *] -> Lazy (Coroutine (Result E (Lazy (Coroutine (Result E T))))) -> Lazy (Coroutine (Result E T)) | |
lazy_coroutine_result_join = result_t_join(lazy_coroutine_pure)(lazy_coroutine_bind) | |
# [E : *] -> [From : *] -> [To : *] -> (From -> Lazy (Coroutine (Result E To))) -> Lazy (Coroutine (Result E From)) -> Lazy (Coroutine (Result E To)) | |
lazy_coroutine_result_bind = bind(lazy_coroutine_result_map)(lazy_coroutine_result_join) | |
# [E : *] -> [From : *] -> [Intermediate : *] -> [To : *] -> (From -> Lazy (Coroutine (Result E Intermediate))) -> (Intermediate -> Lazy (Coroutine (Result E To))) -> From -> Lazy (Coroutine (Result E To)) | |
lazy_coroutine_result_compose = compose(lazy_coroutine_result_pure)(lazy_coroutine_result_bind) | |
# [E : *] -> [T : *] -> Lazy (Coroutine T) -> Lazy (Coroutine (Result E T)) -> Lazy (Coroutine T) | |
lazy_coroutine_result_default = result_t_default(lazy_coroutine_pure)(lazy_coroutine_bind) | |
# [E : *] -> [T : *] -> Lazy (Coroutine (Result E T)) -> Lazy (Coroutine (Result E T)) -> Lazy (Coroutine (Result E T)) | |
lazy_coroutine_result_add = result_t_add(lazy_coroutine_pure)(lazy_coroutine_bind) | |
@dataclass(frozen=True) | |
class LazyCoroutineResultList: | |
value: ... | |
def map(self, function): | |
return LazyCoroutineResultList(lazy_coroutine_result_list_map(function)(self)) | |
def bind(self, function): | |
return LazyCoroutineResultList(lazy_coroutine_result_list_bind(function)(self)) | |
def cache(self): | |
return LazyCoroutineResultList(acache(self)) | |
def outer(self): | |
return LazyCoroutineResult(self.value) | |
def __call__(self): | |
return self.value() | |
# [E : *] -> [From : *] -> [To : *] -> (From -> To) -> Lazy (Coroutine (Result E (List From))) -> Lazy (Coroutine (Result E (List To))) | |
lazy_coroutine_result_list_map = identity_compose(list_map)(lazy_coroutine_result_map) | |
# [E : *] -> [T : *] -> T -> Lazy (Coroutine (Result E (List T))) | |
lazy_coroutine_result_list_pure = identity_compose(list_pure)(lazy_coroutine_result_pure) | |
# [E : *] -> [T : *] -> Lazy (Coroutine (Result E (List (Lazy (Coroutine (Result E (List T))))))) -> Lazy (Coroutine (Result E (List T))) | |
lazy_coroutine_result_list_join = list_t_join(lazy_coroutine_result_pure)(lazy_coroutine_result_bind) | |
# [E : *] -> [From : *] -> [To : *] -> (From -> Lazy (Coroutine (Result E (List To)))) -> Lazy (Coroutine (Result E (List From))) -> Lazy (Coroutine (Result E (List To))) | |
lazy_coroutine_result_list_bind = bind(lazy_coroutine_result_list_map)(lazy_coroutine_result_list_join) | |
# [E : *] -> [From : *] -> [Intermediate : *] -> [To : *] -> (From -> Lazy (Coroutine (Result E (List Intermediate)))) -> (Intermediate -> Lazy (Coroutine (Result E (List To)))) -> From -> Lazy (Coroutine (Result E (List To))) | |
lazy_coroutine_result_list_compose = compose(lazy_coroutine_result_list_pure)(lazy_coroutine_result_list_bind) | |
async def main(): | |
@dataclass(frozen=True) | |
class User: | |
pass | |
@dataclass(frozen=True) | |
class Post: | |
views: int | |
@LazyCoroutineResultList | |
async def get_users(): | |
print("Fetching users") | |
return Err("Error fetching users") | |
# return Ok([User(), User()]) | |
@postpone(LazyCoroutineResultList) | |
async def get_posts(limit): | |
print("Fetching posts", f"{limit=}") | |
return Err("Error fetching posts") | |
# return Ok([Post(1), Post(2)]) | |
@Lazy | |
def answer_of_life(): | |
print("Computing for 7.5 million years") | |
return 42 | |
approximate_members = get_posts(limit=5)\ | |
.map(lambda post: post.views)\ | |
.outer()\ | |
.map(sum)\ | |
.cache() | |
members_count = get_users\ | |
.outer()\ | |
.map(len)\ | |
.add(approximate_members)\ | |
.cache() | |
count_with_default = members_count\ | |
.default(answer_of_life.to_coroutine())\ | |
.cache() | |
print(await count_with_default(), await members_count(), await approximate_members()) | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment