Skip to content

Instantly share code, notes, and snippets.

@synodriver
Created January 19, 2025 08:59
Show Gist options
  • Save synodriver/91336dd4c72c5c8014d4920da12f4590 to your computer and use it in GitHub Desktop.
Save synodriver/91336dd4c72c5c8014d4920da12f4590 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
import asyncio
from siokcp.asyncio import open_kcp_connection
async def cb(reader, writer: asyncio.StreamWriter):
data = await reader.read(10)
print(data)
writer.write(data)
await writer.drain()
writer.close()
await writer.wait_closed()
print("Connection closed")
async def main():
reader, writer = await open_kcp_connection(("127.0.0.1", 11000), 10, print)
for i in range(5):
writer.write(b"gugu")
await writer.drain()
print(await reader.read(100))
writer.write(b"\0") # ask server to close
await writer.drain()
if __name__ == "__main__":
asyncio.run(main())
# -*- coding: utf-8 -*-
import asyncio
from loguru import logger
from siokcp.asyncio import start_kcp_server
from siokcp._kcp import (
IKCP_LOG_IN_ACK,
IKCP_LOG_IN_DATA,
IKCP_LOG_IN_PROBE,
IKCP_LOG_IN_WINS,
IKCP_LOG_INPUT,
IKCP_LOG_OUT_ACK,
IKCP_LOG_OUT_DATA,
IKCP_LOG_OUT_PROBE,
IKCP_LOG_OUT_WINS,
IKCP_LOG_OUTPUT,
IKCP_LOG_RECV,
IKCP_LOG_SEND,
)
async def cb(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
writer.transport.connection.logmask = (
IKCP_LOG_OUTPUT
| IKCP_LOG_INPUT
| IKCP_LOG_SEND
| IKCP_LOG_RECV
| IKCP_LOG_IN_DATA
| IKCP_LOG_IN_ACK
| IKCP_LOG_IN_PROBE
| IKCP_LOG_IN_WINS
| IKCP_LOG_OUT_DATA
| IKCP_LOG_OUT_ACK
| IKCP_LOG_OUT_PROBE
| IKCP_LOG_OUT_WINS
)
while True:
data = await reader.read(100)
if data == b"\0":
break
print("recv:", data)
writer.write(data)
await writer.drain()
writer.close()
await writer.wait_closed()
print("client disconnected")
async def main():
await start_kcp_server(cb, ("0.0.0.0", 11000), logger.info)
print("kcp server start", ("0.0.0.0", 11000))
try:
await asyncio.get_running_loop().create_future()
except (KeyboardInterrupt, asyncio.CancelledError):
pass
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment