Created
April 25, 2022 01:31
-
-
Save Jessime/2ac24f138fc285701d2ea7f003fa6538 to your computer and use it in GitHub Desktop.
async posting
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import time | |
import asyncio | |
import httpx | |
import random | |
from functools import partial | |
from datetime import datetime, timedelta | |
RANDOM_WORDS = ['Monarch', 'Flagship', 'Russian', 'Thee', 'Species', 'Policies', 'Unique', 'Deutsch', 'Agree', 'Enable'] | |
async def scan_for_favorable_market_conditions(queues): | |
for i in range(10): | |
print(f"Loop: {i}") | |
r = random.randint(0, 10) | |
if r >= 5: | |
msg = f"[{random.choice(RANDOM_WORDS)}_{i}]" | |
stamp = datetime.now() | |
print(f"found condition {msg}. Send for processing.") | |
for q in queues: | |
q.put_nowait((msg, stamp)) | |
await asyncio.sleep(.1) | |
def sign(secret, endpoint="orders", full_path="api/v2/orders/"): | |
return secret+endpoint+full_path | |
def compact_json_dict(data): | |
return json.dumps(data, separators=(',', ':'), ensure_ascii=False) | |
def kucoin_post(client: httpx.AsyncClient, symbol, side, size): | |
url = "http://www.kucoin.com/orders" | |
data = { | |
'side': side, | |
'symbol': symbol, | |
'type': 'market', | |
'size': size | |
} | |
headers = { | |
"KC-API-TIMESTAMP": int(time.time() * 1000), | |
"KC-API-SIGN": sign("asdf") | |
} | |
client.post(url, data=data, headers=headers, timeout=10) | |
async def generic_post_trade_request(trade_type: str, q: asyncio.Queue): | |
async with httpx.AsyncClient() as client: | |
while True: | |
msg, stamp = await q.get() | |
print(f"Got {msg}. Posting {trade_type=}") | |
r = await client.post("http://www.example.com/", data={"data": msg}) | |
time_to_post = (datetime.now() - stamp) / timedelta(milliseconds=1) | |
print(f"{trade_type=}: {r.status_code}. Took {time_to_post=}ms to process {msg=}") | |
q.task_done() | |
async def main(): | |
queues = [asyncio.Queue() for _ in range(3)] | |
scanning_task = asyncio.create_task(scan_for_favorable_market_conditions(queues)) | |
posting_coros = [ | |
partial(generic_post_trade_request, "usd -> btc"), | |
partial(generic_post_trade_request, "btc -> eth"), | |
partial(generic_post_trade_request, "eth -> usd"), | |
] | |
posting_tasks = [] | |
for i in range(1): # Bump to increase workers | |
for coro, q in zip(posting_coros, queues): | |
posting_tasks.append(asyncio.create_task(coro(q))) | |
await scanning_task | |
await asyncio.gather(*[q.join() for q in queues], return_exceptions=True) | |
for task in posting_tasks: | |
task.cancel() | |
await asyncio.gather(*posting_tasks, return_exceptions=True) | |
print("The queues are now empty, and posting tasks have been shutdown.") | |
if __name__ == '__main__': | |
asyncio.run(main(), debug=True) | |
print("Exiting gracefully.") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment