Skip to content

Instantly share code, notes, and snippets.

@johanste
Created April 11, 2019 16:08
Show Gist options
  • Save johanste/ef943ff4e73e53166044d337115b2731 to your computer and use it in GitHub Desktop.
Save johanste/ef943ff4e73e53166044d337115b2731 to your computer and use it in GitHub Desktop.
Experimenting common sync and async code for wrapped autogenerated clients...
import asyncio
import time
class GeneratedSyncClient:
def mapper(self, result, mappings):
if result['timeout'] in mappings:
raise mappings[result['timeout']]()
return result
def do_operation(self, name, timeout):
time.sleep(timeout)
return {
'name': name,
'timeout': timeout
}
class GeneratedAsyncClient:
async def mapper(self, result, mappings):
res = await result
if res['timeout'] in mappings:
raise mappings[res['timeout']]()
return res
async def do_operation(self, name, timeout):
await asyncio.sleep(timeout)
return {
'name': name,
'timeout': timeout
}
class WrappingClient:
def __init__(self, wrapped):
self.wrapped = wrapped
def do_operation(self, name, timeout):
mappings = {
3: ValueError,
5: ValueError,
}
return self.wrapped.mapper(
self.wrapped.do_operation(name, timeout),
mappings
)
async def arun():
client = WrappingClient(GeneratedAsyncClient())
result = await client.do_operation('hello', 0)
print(result)
def run():
client = WrappingClient(GeneratedSyncClient())
result = client.do_operation('hello', 3)
print(result)
try:
print('async')
asyncio.run(arun())
except Exception as e:
print(f'Failed {e}')
try:
print('sync')
run()
except Exception as e:
print(f'Failed {e}')
@YijunXieMS
Copy link

Nice code. Python is so flexible that a sync look-like function can be actually async as long as the underlying call chain is async somewhere. I didn't realize this infectious async works in python.
We can utilize this to have a single wrapper and greatly reduce the amount of duplicate code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment