Created
December 18, 2020 19:37
-
-
Save simon-mo/bcc385a2d577b1353b8b3e93b384b55e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import ray | |
| import numpy as np | |
| import time | |
| import asyncio | |
| @ray.remote | |
| class Upstream: | |
| def __init__(self): | |
| self.q = asyncio.Queue() | |
| async def run_forever(self): | |
| while True: | |
| # assume we are getting data from camera | |
| data = np.zeros((3, 224, 224)) | |
| # do some processing | |
| data *= 2 | |
| time.sleep(1) | |
| # put the data to the queue | |
| await self.q.put(data) | |
| # yield the control to the other method | |
| await asyncio.sleep(0) | |
| async def get_new_result(self): | |
| return await self.q.get() | |
| @ray.remote | |
| class Downstream: | |
| def __init__(self, upstream_actor): | |
| self.upstream_actor = upstream_actor | |
| async def run_forever(self): | |
| while True: | |
| upstream_ref = self.upstream_actor.get_new_result.remote() | |
| # we will show the data is zero-copied | |
| upstream_data = await upstream_ref | |
| upstream_data_copy = await upstream_ref | |
| print("1st array's memory pointer", upstream_data.ctypes.data) | |
| print("2nd array's memory pointer", upstream_data_copy.ctypes.data) | |
| # do some processing | |
| time.sleep(1) | |
| # send to more downstream | |
| ... | |
| if __name__ == "__main__": | |
| ray.init() | |
| upstream = Upstream.remote() | |
| # no need to set downstream as threaded because it just runs a single funciton | |
| downstream = Downstream.remote(upstream) | |
| upstream_ref = upstream.run_forever.remote() | |
| downstream_ref = downstream.run_forever.remote() | |
| ray.get(downstream_ref) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| diff --git a/threaded.py b/async.py | |
| index b218924..bc0b986 100644 | |
| --- a/threaded.py | |
| +++ b/async.py | |
| @@ -1,15 +1,15 @@ | |
| import ray | |
| import numpy as np | |
| import time | |
| -from queue import Queue | |
| +import asyncio | |
| @ray.remote | |
| class Upstream: | |
| def __init__(self): | |
| - self.q = Queue() | |
| + self.q = asyncio.Queue() | |
| - def run_forever(self): | |
| + async def run_forever(self): | |
| while True: | |
| # assume we are getting data from camera | |
| data = np.zeros((3, 224, 224)) | |
| @@ -19,10 +19,13 @@ class Upstream: | |
| time.sleep(1) | |
| # put the data to the queue | |
| - self.q.put(data) | |
| + await self.q.put(data) | |
| - def get_new_result(self): | |
| - return self.q.get(block=True) | |
| + # yield the control to the other method | |
| + await asyncio.sleep(0) | |
| + | |
| + async def get_new_result(self): | |
| + return await self.q.get() | |
| @ray.remote | |
| @@ -30,13 +33,13 @@ class Downstream: | |
| def __init__(self, upstream_actor): | |
| self.upstream_actor = upstream_actor | |
| - def run_forever(self): | |
| + async def run_forever(self): | |
| while True: | |
| upstream_ref = self.upstream_actor.get_new_result.remote() | |
| # we will show the data is zero-copied | |
| - upstream_data = ray.get(upstream_ref) | |
| - upstream_data_copy = ray.get(upstream_ref) | |
| + upstream_data = await upstream_ref | |
| + upstream_data_copy = await upstream_ref | |
| print("1st array's memory pointer", upstream_data.ctypes.data) | |
| print("2nd array's memory pointer", upstream_data_copy.ctypes.data) | |
| @@ -50,7 +53,7 @@ class Downstream: | |
| if __name__ == "__main__": | |
| ray.init() | |
| - upstream = Upstream.options(max_concurrency=2).remote() | |
| + upstream = Upstream.remote() | |
| # no need to set downstream as threaded because it just runs a single funciton | |
| downstream = Downstream.remote(upstream) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import ray | |
| import numpy as np | |
| import time | |
| from queue import Queue | |
| @ray.remote | |
| class Upstream: | |
| def __init__(self): | |
| self.q = Queue() | |
| def run_forever(self): | |
| while True: | |
| # assume we are getting data from camera | |
| data = np.zeros((3, 224, 224)) | |
| # do some processing | |
| data *= 2 | |
| time.sleep(1) | |
| # put the data to the queue | |
| self.q.put(data) | |
| def get_new_result(self): | |
| return self.q.get(block=True) | |
| @ray.remote | |
| class Downstream: | |
| def __init__(self, upstream_actor): | |
| self.upstream_actor = upstream_actor | |
| def run_forever(self): | |
| while True: | |
| upstream_ref = self.upstream_actor.get_new_result.remote() | |
| # we will show the data is zero-copied | |
| upstream_data = ray.get(upstream_ref) | |
| upstream_data_copy = ray.get(upstream_ref) | |
| print("1st array's memory pointer", upstream_data.ctypes.data) | |
| print("2nd array's memory pointer", upstream_data_copy.ctypes.data) | |
| # do some processing | |
| time.sleep(1) | |
| # send to more downstream | |
| ... | |
| if __name__ == "__main__": | |
| ray.init() | |
| upstream = Upstream.options(max_concurrency=2).remote() | |
| # no need to set downstream as threaded because it just runs a single funciton | |
| downstream = Downstream.remote(upstream) | |
| upstream_ref = upstream.run_forever.remote() | |
| downstream_ref = downstream.run_forever.remote() | |
| ray.get(downstream_ref) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment