Skip to content

Instantly share code, notes, and snippets.

@Jessime
Created April 25, 2022 01:31
Show Gist options
  • Save Jessime/2ac24f138fc285701d2ea7f003fa6538 to your computer and use it in GitHub Desktop.
Save Jessime/2ac24f138fc285701d2ea7f003fa6538 to your computer and use it in GitHub Desktop.
async posting
import json
import time
import asyncio
import httpx
import random
from functools import partial
from datetime import datetime, timedelta
RANDOM_WORDS = ['Monarch', 'Flagship', 'Russian', 'Thee', 'Species', 'Policies', 'Unique', 'Deutsch', 'Agree', 'Enable']
async def scan_for_favorable_market_conditions(queues):
for i in range(10):
print(f"Loop: {i}")
r = random.randint(0, 10)
if r >= 5:
msg = f"[{random.choice(RANDOM_WORDS)}_{i}]"
stamp = datetime.now()
print(f"found condition {msg}. Send for processing.")
for q in queues:
q.put_nowait((msg, stamp))
await asyncio.sleep(.1)
def sign(secret, endpoint="orders", full_path="api/v2/orders/"):
return secret+endpoint+full_path
def compact_json_dict(data):
return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
def kucoin_post(client: httpx.AsyncClient, symbol, side, size):
url = "http://www.kucoin.com/orders"
data = {
'side': side,
'symbol': symbol,
'type': 'market',
'size': size
}
headers = {
"KC-API-TIMESTAMP": int(time.time() * 1000),
"KC-API-SIGN": sign("asdf")
}
client.post(url, data=data, headers=headers, timeout=10)
async def generic_post_trade_request(trade_type: str, q: asyncio.Queue):
async with httpx.AsyncClient() as client:
while True:
msg, stamp = await q.get()
print(f"Got {msg}. Posting {trade_type=}")
r = await client.post("http://www.example.com/", data={"data": msg})
time_to_post = (datetime.now() - stamp) / timedelta(milliseconds=1)
print(f"{trade_type=}: {r.status_code}. Took {time_to_post=}ms to process {msg=}")
q.task_done()
async def main():
queues = [asyncio.Queue() for _ in range(3)]
scanning_task = asyncio.create_task(scan_for_favorable_market_conditions(queues))
posting_coros = [
partial(generic_post_trade_request, "usd -> btc"),
partial(generic_post_trade_request, "btc -> eth"),
partial(generic_post_trade_request, "eth -> usd"),
]
posting_tasks = []
for i in range(1): # Bump to increase workers
for coro, q in zip(posting_coros, queues):
posting_tasks.append(asyncio.create_task(coro(q)))
await scanning_task
await asyncio.gather(*[q.join() for q in queues], return_exceptions=True)
for task in posting_tasks:
task.cancel()
await asyncio.gather(*posting_tasks, return_exceptions=True)
print("The queues are now empty, and posting tasks have been shutdown.")
if __name__ == '__main__':
asyncio.run(main(), debug=True)
print("Exiting gracefully.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment