Created
January 19, 2025 08:59
-
-
Save synodriver/91336dd4c72c5c8014d4920da12f4590 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import asyncio | |
from siokcp.asyncio import open_kcp_connection | |
async def cb(reader, writer: asyncio.StreamWriter): | |
data = await reader.read(10) | |
print(data) | |
writer.write(data) | |
await writer.drain() | |
writer.close() | |
await writer.wait_closed() | |
print("Connection closed") | |
async def main(): | |
reader, writer = await open_kcp_connection(("127.0.0.1", 11000), 10, print) | |
for i in range(5): | |
writer.write(b"gugu") | |
await writer.drain() | |
print(await reader.read(100)) | |
writer.write(b"\0") # ask server to close | |
await writer.drain() | |
if __name__ == "__main__": | |
asyncio.run(main()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import asyncio | |
from loguru import logger | |
from siokcp.asyncio import start_kcp_server | |
from siokcp._kcp import ( | |
IKCP_LOG_IN_ACK, | |
IKCP_LOG_IN_DATA, | |
IKCP_LOG_IN_PROBE, | |
IKCP_LOG_IN_WINS, | |
IKCP_LOG_INPUT, | |
IKCP_LOG_OUT_ACK, | |
IKCP_LOG_OUT_DATA, | |
IKCP_LOG_OUT_PROBE, | |
IKCP_LOG_OUT_WINS, | |
IKCP_LOG_OUTPUT, | |
IKCP_LOG_RECV, | |
IKCP_LOG_SEND, | |
) | |
async def cb(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): | |
writer.transport.connection.logmask = ( | |
IKCP_LOG_OUTPUT | |
| IKCP_LOG_INPUT | |
| IKCP_LOG_SEND | |
| IKCP_LOG_RECV | |
| IKCP_LOG_IN_DATA | |
| IKCP_LOG_IN_ACK | |
| IKCP_LOG_IN_PROBE | |
| IKCP_LOG_IN_WINS | |
| IKCP_LOG_OUT_DATA | |
| IKCP_LOG_OUT_ACK | |
| IKCP_LOG_OUT_PROBE | |
| IKCP_LOG_OUT_WINS | |
) | |
while True: | |
data = await reader.read(100) | |
if data == b"\0": | |
break | |
print("recv:", data) | |
writer.write(data) | |
await writer.drain() | |
writer.close() | |
await writer.wait_closed() | |
print("client disconnected") | |
async def main(): | |
await start_kcp_server(cb, ("0.0.0.0", 11000), logger.info) | |
print("kcp server start", ("0.0.0.0", 11000)) | |
try: | |
await asyncio.get_running_loop().create_future() | |
except (KeyboardInterrupt, asyncio.CancelledError): | |
pass | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment