Last active
June 5, 2023 16:56
-
-
Save synodriver/1f3c6be897d0dd3e3a169672c3434867 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Copyright (c) 2008-2023 synodriver <[email protected]> | |
""" | |
import asyncio | |
import os | |
import sys | |
import threading | |
import time | |
from threading import Thread | |
from typing import Dict, Tuple | |
import pygame | |
pygame.init() | |
class Game: | |
def __init__(self, fps: int = 60, size: Tuple[int, int] = (600, 400)): | |
self.update_duration = 1 / fps | |
self.size = size | |
self.ball = pygame.image.load("./PYG02-ball.gif") # https://python123.io/PY15/PYG02-ball.gif | |
self._event_queue = asyncio.Queue() | |
self.closed = False | |
self.speed = [1, 1] | |
self._key_waiters: Dict[int, asyncio.Future] = {} | |
self._loop = asyncio.get_running_loop() | |
self._startup_waiter = self._loop.create_future() | |
self._get_event_task = self._loop.run_in_executor(None, self.__get_event) | |
async def startup(self): | |
await self._startup_waiter | |
def __get_event(self): | |
""" | |
should run in executor | |
:return: | |
""" | |
self.screen = pygame.display.set_mode(self.size) | |
pygame.display.set_caption("实验") | |
self._startup_waiter.set_result(None) | |
while True: | |
event = pygame.event.wait(1000) | |
if event != pygame.NOEVENT: | |
asyncio.run_coroutine_threadsafe(self._event_queue.put(event), self._loop) | |
if self.closed: # 这里不能设置守护线程,因此必须让这个线程推出,不然整个进程都要等他 | |
break | |
async def change_speed(self, index: int, size: int, waiter: asyncio.Future): | |
while not waiter.done() and not waiter.cancelled(): | |
self.speed[index] += size | |
await asyncio.sleep(5 * self.update_duration) | |
async def handle_event(self): | |
while True: | |
event: pygame.event.Event = await self._event_queue.get() | |
if event.type in (pygame.QUIT, pygame.WINDOWCLOSE): | |
break | |
elif event.type == pygame.KEYDOWN: | |
fut = self._loop.create_future() | |
if event.key == pygame.K_LEFT: | |
self._key_waiters[pygame.K_LEFT] = fut | |
asyncio.create_task(self.change_speed(0, -1, fut)) | |
# self.speed[0] -= 1 | |
elif event.key == pygame.K_RIGHT: | |
self._key_waiters[pygame.K_RIGHT] = fut | |
asyncio.create_task(self.change_speed(0, 1, fut)) | |
# self.speed[0] += 1 | |
elif event.key == pygame.K_UP: | |
self._key_waiters[pygame.K_UP] = fut | |
asyncio.create_task(self.change_speed(1, -1, fut)) | |
# self.speed[1] -= 1 | |
elif event.key == pygame.K_DOWN: | |
self._key_waiters[pygame.K_DOWN] = fut | |
asyncio.create_task(self.change_speed(1, 1, fut)) | |
# self.speed[1] += 1 | |
elif event.type == pygame.KEYUP: | |
fut = self._key_waiters.pop(event.key) | |
fut.set_result(None) | |
else: | |
print(event) | |
async def update_screen(self): | |
while True: | |
await asyncio.sleep(self.update_duration) | |
pygame.display.update() | |
async def handle_timer(self): | |
ball_rect = self.ball.get_rect() | |
while True: | |
ball_rect = ball_rect.move(self.speed[0], self.speed[1]) # 框才能动 | |
if ball_rect.left < 0 or ball_rect.right > self.size[0]: | |
self.speed[0] = -self.speed[0] | |
if ball_rect.top < 0 or ball_rect.bottom > self.size[1]: | |
self.speed[1] = -self.speed[1] | |
self.screen.fill((0, 0, 0)) | |
self.screen.blit(self.ball, ball_rect) # 显示动的效果 | |
await asyncio.sleep(self.update_duration) | |
async def run(self): | |
# poller = Poller(self.size, self._event_queue, asyncio.get_running_loop()) | |
# poller.start() | |
event_task = asyncio.create_task(self.handle_event()) | |
update_screen_task = asyncio.create_task(self.update_screen()) | |
timer_task = asyncio.create_task(self.handle_timer()) | |
tasks = [event_task, update_screen_task, timer_task] | |
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) | |
self.closed = True | |
# poller.closed = True | |
# poller.join() | |
# | |
# for task in tasks: | |
# task.cancel() | |
# try: | |
# await task | |
# except asyncio.CancelledError: | |
# pass | |
# print("loop") | |
# print(time.time()) | |
async def main(): | |
game = Game(60, (600, 400)) | |
await game.startup() | |
await game.run() | |
print("exit") | |
# os._exit(0) | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment