Skip to content

Instantly share code, notes, and snippets.

@agoose77
Created August 8, 2016 14:22
Show Gist options
  • Save agoose77/ecf95064ab54f93d37ff0860081fc1c1 to your computer and use it in GitHub Desktop.
Save agoose77/ecf95064ab54f93d37ff0860081fc1c1 to your computer and use it in GitHub Desktop.
import time
from collections import deque
class Future:
@property
def is_done(self):
raise NotImplementedError
def __iter__(self):
while True:
if self.is_done:
return
yield
class Signal(Future):
is_done = False
def signal(self):
self.is_done = True
def wait(interval):
started = time.clock()
while True:
waited = time.clock() - started
if waited >= interval:
return waited
yield
def go_to_object(this, obj, range):
while True:
this.worldPosition += (obj.worldPosition-this.worldPosition).normalized() * 0.1
dist = this.getDistanceTo(obj)
if dist <= range:
return
yield
class EventLoop:
def __init__(self):
self._tasks = deque()
def run(self, task):
self._tasks.append(task)
def update(self):
to_do = len(self._tasks)
for i in range(to_do):
task = self._tasks.popleft()
try:
next(task)
except StopIteration:
break
self._tasks.append(task)
def some_other_task():
print("Waiting in some other task")
# Wait for 2 seconds
yield from wait(2)
# Return result to caller
return 12
def some_task(signal):
yield from wait(2)
print("2 s elapsed")
# Await the result of subtask
result = yield from some_other_task()
from bge import logic
owner = logic.getCurrentController().owner
yield from go_to_object(owner, owner.scene.objects['Cube.001'], 1)
print(result)
signal.signal()
def parallel_task(signal):
yield from signal
print("Signalled!")
loop = EventLoop()
signal = Signal()
loop.run(some_task(signal))
loop.run(parallel_task(signal))
def update(cont):
loop.update()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment