Skip to content

Instantly share code, notes, and snippets.

@linzeyang
Created May 16, 2022 04:38
Show Gist options
  • Select an option

  • Save linzeyang/3bbe5fc40c40be90468926550f789006 to your computer and use it in GitHub Desktop.

Select an option

Save linzeyang/3bbe5fc40c40be90468926550f789006 to your computer and use it in GitHub Desktop.
import asyncio
import functools
import threading
from contextlib import asynccontextmanager, contextmanager
from typing import Any, Callable
@contextmanager
def my_cont_mgr(tag: str):
print(f"{tag}: f entering!")
try:
yield True
finally:
print(f"{tag}: f exiting!")
@asynccontextmanager
async def my_asy_cont_mgr(tag: str):
print(f"{tag}: async f entering!")
await asyncio.sleep(0.5)
try:
yield True
finally:
print(f"{tag}: async f exiting!")
await asyncio.sleep(0.3)
class MyContMgr:
def __init__(self, tag: str):
self._tag = tag
def __enter__(self):
print(f"{self._tag}: Entering")
return self
def __exit__(self, exc_type, exc_value, exc_tb):
print(f"{self._tag} - {exc_type} {exc_value} {exc_tb}: Exiting")
async def __aenter__(self):
print(f"{self._tag}: Async Entering")
await asyncio.sleep(0.5)
return self
async def __aexit__(self, exc_type, exc_value, exc_tb):
print(f"{self._tag} - {exc_type} {exc_value} {exc_tb}: Async Exiting")
await asyncio.sleep(0.3)
def behave(self):
print(f"{self._tag}: Correct!")
def on_purpose(self):
raise RuntimeError("Wrong!")
def main():
with MyContMgr("tag1") as mgr:
mgr.behave()
with MyContMgr("tag2") as mgr2:
mgr2.on_purpose()
async def amain():
async with MyContMgr("tag3") as mgr3:
mgr3.behave()
async with MyContMgr("tag4") as mgr4:
mgr4.on_purpose()
def fmain():
with my_cont_mgr("tag5") as v5:
print(f"{v5=}")
with my_cont_mgr("tag6") as v6:
raise RuntimeError(f"{v6} Wrong!")
async def afmain():
async with my_asy_cont_mgr("tag7") as v7:
print(f"{v7=}")
async with my_asy_cont_mgr("tag8") as v8:
raise RuntimeError(f"{v8} Wrong!")
class Indenter:
def __init__(self):
self._indent_level = None
def __enter__(self):
if self._indent_level is None:
self._indent_level = 0
else:
self._indent_level += 1
return self
def __exit__(self, *args):
if self._indent_level is not None:
self._indent_level -= 1
def print(self, text: str):
print(" " * 4 * self._indent_level + text)
def ind_main():
with Indenter() as indent:
indent.print("hi!")
with indent:
indent.print("hello")
with indent:
indent.print("bonjour")
indent.print("hey")
def gen_fibonacci(n: int):
if n < 1:
raise ValueError
a, b = 0, 1
counter = 0
while n > counter:
a, b = b, a + b
yield b
counter += 1
def cache_fibonacci(func):
local_cache = dict()
@functools.wraps(func)
def wrapper(n):
if n not in local_cache:
res = func(n)
local_cache[n] = res
else:
res = local_cache[n]
return res
return wrapper
@cache_fibonacci
def recur_fibonacci(n: int):
if n < 1:
raise ValueError
if n in (1, 2):
return n
return recur_fibonacci(n - 1) + recur_fibonacci(n - 2)
def do_x_times(_func: Callable = None, *, x: int = 1) -> Callable:
if x < 1:
raise ValueError
def inner(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
for _ in range(x):
val = func(*args, **kwargs)
return val
return wrapper
if _func:
return inner(_func)
return inner
@do_x_times(x=3)
def greetings(name: str):
print(f"Hello, {name}!")
class Counter:
def __init__(self, func: Callable) -> None:
functools.update_wrapper(self, func)
self._func = func
self._num_called = 0
def __call__(self, *args: Any, **kwargs: Any) -> Any:
self._num_called += 1
print(f"Num of calls to {self._func.__name__}: {self._num_called}")
return self._func(*args, **kwargs)
@Counter
def new_greetings(name: str):
print(f"Hello, {name}!")
def singleton(cls):
cls._instance = None
@functools.wraps(cls)
def wrapper(*args, **kwargs):
if not cls._instance:
cls._instance = cls(*args, **kwargs)
return cls._instance
return wrapper
@singleton
class MyCls:
pass
def factorial(n: int):
if n < 0 or not isinstance(n, int):
raise ValueError
if n in (0, 1, 2):
return n
return functools.reduce(lambda a, b: a * b, range(2, n + 1))
def my_filter(func: Callable, sequence):
return functools.reduce(lambda x, y: x + ([y] if func(y) else []), sequence, [])
if __name__ == "__main__":
# main()
# asyncio.run(amain())
# fmain()
# ind_main()
# asyncio.run(afmain())
# print([i for i in gen_fibonacci(100)])
# print([recur_fibonacci(i) for i in range(1, 100)])
# greetings("joe")
# new_greetings("joe")
# new_greetings("joe")
# assert MyCls() is MyCls()
print(list(my_filter(lambda a: a % 2 == 0, range(10))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment