Created
May 2, 2024 04:16
-
-
Save RyanBalfanz/23d0ad7bf304fc0ca2bd92d1e8f7720d to your computer and use it in GitHub Desktop.
Monads in Python with Type Hint Annotations
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3.12 | |
# -*- coding: utf-8 -*- | |
import typing | |
from dataclasses import dataclass, field | |
@dataclass | |
class Functor[T]: | |
value: T | |
def fmap[R](self, f: typing.Callable[[T], R]): | |
return Functor(f(self.value)) | |
@dataclass | |
class Maybe[T](Functor): | |
value: T | |
@typing.override | |
def fmap[R](self, f: typing.Callable[[T], R]): | |
if self.value is None: | |
return Maybe(None) | |
return Maybe(f(self.value)) | |
@dataclass | |
class Result[T](Functor): | |
value: T | |
exceptions: typing.Tuple[typing.Type[Exception]] = field( | |
repr=False, default=(Exception,) | |
) | |
@typing.override | |
def fmap[R](self, f: typing.Callable[[T], R]): | |
if isinstance(self.value, Exception): | |
return self | |
try: | |
return Result(f(self.value)) | |
except self.exceptions as e: | |
return Result(e) | |
def is_err(self): | |
return isinstance(self.value, self.exceptions) | |
def is_ok(self): | |
return not self.is_err() | |
if __name__ == "__main__": | |
# Maybe protects against NPEs (Null Pointer Exceptions). | |
print(Maybe(None).fmap(lambda x: x.foo)) | |
# Maybe does not protect against exceptions. | |
try: | |
# AttributeError: 'function' object has no attribute 'foo' | |
print(Maybe(object()).fmap(lambda x: x.foo)) | |
except AttributeError: | |
pass | |
else: | |
raise AssertionError("Expected AttributeError") | |
# Result protects against exceptions. | |
print(Result(0).fmap(lambda x: 1 / x)) | |
# Here Result protects against ZeroDivisionError specifically. | |
print(Result(0, exceptions=(ZeroDivisionError,)).fmap(lambda x: 1 / x)) | |
# class SomeOtherException(Exception): ... | |
# def raise_some_other_exception(o: typing.Any) -> typing.NoReturn: | |
# raise SomeOtherException(id(o)) | |
# o = object() | |
# es = (ZeroDivisionError,) | |
# try: | |
# print( | |
# Result(0, exceptions=es).fmap( | |
# lambda _: raise_some_other_exception(o) | |
# ) | |
# ) | |
# except es as e: | |
# raise AssertionError("Expected SomeOtherException") | |
# except Exception as e: | |
# assert e.args[0] == id(o), e.args | |
# Result protects against NPEs, but only because of the exception handling. | |
@dataclass | |
class Thing[T]: | |
value: T | |
def __post_init__(self): | |
del self.value | |
try: | |
print((lambda x: x.value)(Thing(1))) | |
assert False, "This should assertion should not be reached." | |
except AttributeError: | |
pass | |
else: | |
raise AssertionError("Expected AttributeError") | |
print(Result(None).fmap(lambda x: x.foo)) | |
print(Result(object()).fmap(lambda x: x.foo)) | |
# Inspect the result object to check if the result is an exception. | |
r = Result(None).fmap(lambda x: x.value) | |
assert not r.is_ok() and r.is_err() | |
assert type(r.value) is AttributeError, type(r.value) | |
# print(Maybe([1,2,3]).fmap(lambda x: [x, x+1])) |
Author
RyanBalfanz
commented
May 19, 2024
•
import abc
import json
import sys
import typing
from dataclasses import dataclass
@dataclass
class Maybe[T](abc.ABC):
def fmap[**_, R](self, f: typing.Callable[[T], R]) -> "Maybe[R]": ...
def bind[**_, R](self, f: typing.Callable[[T], "Maybe[R]"]) -> "Maybe[R]": ...
@dataclass
class Just[T](Maybe[T]):
value: T
@typing.override
def fmap[**_, R](self, f: typing.Callable[[T], R]) -> Maybe[R]:
try:
return Just(f(self.value))
except Exception as e:
print(e, file=sys.stderr)
return Nothing()
@typing.override
def bind[**_, R](self, f: typing.Callable[[T], Maybe[R]]) -> Maybe[R]:
try:
return f(self.value)
except Exception as e:
print(e, file=sys.stderr)
return Nothing()
@dataclass
class Nothing[T](Maybe[T]):
value: T | None = None
@typing.override
def fmap[**_, R](self, f: typing.Callable[[T], R]):
return self
@typing.override
def bind[**_, R](self, f: typing.Callable[[T], Maybe[R]]):
return self
@dataclass
class Data[T]:
value: T
def fmap[R](self, f: typing.Callable[[T], R]):
return Data(f(self.value))
# @dataclass
# class IData[T](Data[Iterable[T]]):
# value: Iterable[T]
# # @typing.overload
# def fmap(self, f: typing.Callable):
# return (Data(value=f(self.value)) for v in self.value)
# R = typing.TypeVar("R")
@dataclass
class Pipeline[T]:
functions: typing.Iterable[typing.Callable]
def run(self, d: T):
df = Data(d)
for f in self.functions:
df = df.fmap(f)
return df
if __name__ == "__main__":
d = Data(1).fmap(lambda a: a + 1)
p = Pipeline[int](functions=(lambda a: a * 2, lambda a: a * 2, lambda a: a * 2))
print(p.run(1))
print(Just("-123").fmap(int))
print(
Just(None)
.fmap(lambda _: int(input("Insert number a: ")))
.fmap(lambda x: x / int(input("Insert number b: ")))
)
print(Nothing(value=1))
def tfmap[**T, S](f: typing.Callable[T, S], i: typing.Iterable):
return tuple(map(f, i))
print(
Just("123")
.fmap(lambda x: tuple(map(int, x)))
.fmap(lambda x: tuple(map(lambda a: a**2, x)))
.fmap(lambda x: tfmap(lambda a: a**2, x))
)
class JD(typing.TypedDict):
objects: typing.Collection[dict]
s = json.dumps(JD({"objects": [{"foo": 1}, {"foo": "2"}, {"foo": 3}]}))
print(
ss := Just(s)
.fmap(lambda s: JD(json.loads(s)))
.fmap(lambda x: x["objects"])
.fmap(lambda x: tfmap(lambda o: o["foo"], x))
.fmap(lambda x: tfmap(lambda o: {"fooPlusOne": o + 1}, x))
.fmap(lambda x: JD({"objects": x}))
)
# p = Pipeline(
# (
# lambda s: JD(json.loads(s)),
# lambda x: x["objects"],
# lambda x: tfmap(lambda o: o["foo"], x),
# lambda x: tfmap(lambda o: {"fooPlusOne": o + 1}, x),
# lambda x: JD({"objects": x}),
# )
# )
# print(p.run(s))
import httpx
print(
ss := Just("https://my-json-server.typicode.com/typicode/demo/posts")
.fmap(lambda url: httpx.get(url))
.fmap(lambda s: s.json())
# .fmap(lambda s: s.json() if not isinstance(s, Nothing) else [{"id": 1, "title": str(s)}])
.fmap(lambda posts: (p["id"] for p in posts))
.fmap(
lambda ids: (
Just((i, id)).fmap(lambda iid: (10 / iid[0], 10 / iid[1]))
for i, id in enumerate(ids)
)
)
.fmap(lambda titles_upper: tuple(titles_upper))
)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment