Skip to content

Instantly share code, notes, and snippets.

@allenhumphreys
Created June 23, 2021 15:47
Show Gist options
  • Save allenhumphreys/baa7d00c0144a690c483a31f36a34978 to your computer and use it in GitHub Desktop.
Save allenhumphreys/baa7d00c0144a690c483a31f36a34978 to your computer and use it in GitHub Desktop.
Python decorator to inject values into a an azure function trigger entry point
def service_bus_client(_fnc=None, conn_str: str = "", topic_name: str = ""):
def decorate(decorated_func: Callable):
topic_client = AsyncServiceBusTopicClient.from_connection_string(
conn_str=conn_str,
topic_name=topic_name,
)
@wraps(decorated_func)
async def wrapper(*args, **kwargs):
return await decorated_func(topic_client=topic_client, *args, **kwargs)
sig = inspect.signature(wrapper)
new_parameters = dict(filter(lambda item: not item[1].annotation.endswith(AsyncServiceBusTopicClient.__name__), sig.parameters.items()))
sig = sig.replace(parameters=list(new_parameters.values()))
wrapper.__signature__ = sig
return wrapper
if _fnc is None:
return decorate
return decorate(_fnc)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment