Skip to content

Instantly share code, notes, and snippets.

@simon-mo
Created December 18, 2020 19:37
Show Gist options
  • Select an option

  • Save simon-mo/bcc385a2d577b1353b8b3e93b384b55e to your computer and use it in GitHub Desktop.

Select an option

Save simon-mo/bcc385a2d577b1353b8b3e93b384b55e to your computer and use it in GitHub Desktop.
import ray
import numpy as np
import time
import asyncio
@ray.remote
class Upstream:
def __init__(self):
self.q = asyncio.Queue()
async def run_forever(self):
while True:
# assume we are getting data from camera
data = np.zeros((3, 224, 224))
# do some processing
data *= 2
time.sleep(1)
# put the data to the queue
await self.q.put(data)
# yield the control to the other method
await asyncio.sleep(0)
async def get_new_result(self):
return await self.q.get()
@ray.remote
class Downstream:
def __init__(self, upstream_actor):
self.upstream_actor = upstream_actor
async def run_forever(self):
while True:
upstream_ref = self.upstream_actor.get_new_result.remote()
# we will show the data is zero-copied
upstream_data = await upstream_ref
upstream_data_copy = await upstream_ref
print("1st array's memory pointer", upstream_data.ctypes.data)
print("2nd array's memory pointer", upstream_data_copy.ctypes.data)
# do some processing
time.sleep(1)
# send to more downstream
...
if __name__ == "__main__":
ray.init()
upstream = Upstream.remote()
# no need to set downstream as threaded because it just runs a single funciton
downstream = Downstream.remote(upstream)
upstream_ref = upstream.run_forever.remote()
downstream_ref = downstream.run_forever.remote()
ray.get(downstream_ref)
diff --git a/threaded.py b/async.py
index b218924..bc0b986 100644
--- a/threaded.py
+++ b/async.py
@@ -1,15 +1,15 @@
import ray
import numpy as np
import time
-from queue import Queue
+import asyncio
@ray.remote
class Upstream:
def __init__(self):
- self.q = Queue()
+ self.q = asyncio.Queue()
- def run_forever(self):
+ async def run_forever(self):
while True:
# assume we are getting data from camera
data = np.zeros((3, 224, 224))
@@ -19,10 +19,13 @@ class Upstream:
time.sleep(1)
# put the data to the queue
- self.q.put(data)
+ await self.q.put(data)
- def get_new_result(self):
- return self.q.get(block=True)
+ # yield the control to the other method
+ await asyncio.sleep(0)
+
+ async def get_new_result(self):
+ return await self.q.get()
@ray.remote
@@ -30,13 +33,13 @@ class Downstream:
def __init__(self, upstream_actor):
self.upstream_actor = upstream_actor
- def run_forever(self):
+ async def run_forever(self):
while True:
upstream_ref = self.upstream_actor.get_new_result.remote()
# we will show the data is zero-copied
- upstream_data = ray.get(upstream_ref)
- upstream_data_copy = ray.get(upstream_ref)
+ upstream_data = await upstream_ref
+ upstream_data_copy = await upstream_ref
print("1st array's memory pointer", upstream_data.ctypes.data)
print("2nd array's memory pointer", upstream_data_copy.ctypes.data)
@@ -50,7 +53,7 @@ class Downstream:
if __name__ == "__main__":
ray.init()
- upstream = Upstream.options(max_concurrency=2).remote()
+ upstream = Upstream.remote()
# no need to set downstream as threaded because it just runs a single funciton
downstream = Downstream.remote(upstream)
import ray
import numpy as np
import time
from queue import Queue
@ray.remote
class Upstream:
def __init__(self):
self.q = Queue()
def run_forever(self):
while True:
# assume we are getting data from camera
data = np.zeros((3, 224, 224))
# do some processing
data *= 2
time.sleep(1)
# put the data to the queue
self.q.put(data)
def get_new_result(self):
return self.q.get(block=True)
@ray.remote
class Downstream:
def __init__(self, upstream_actor):
self.upstream_actor = upstream_actor
def run_forever(self):
while True:
upstream_ref = self.upstream_actor.get_new_result.remote()
# we will show the data is zero-copied
upstream_data = ray.get(upstream_ref)
upstream_data_copy = ray.get(upstream_ref)
print("1st array's memory pointer", upstream_data.ctypes.data)
print("2nd array's memory pointer", upstream_data_copy.ctypes.data)
# do some processing
time.sleep(1)
# send to more downstream
...
if __name__ == "__main__":
ray.init()
upstream = Upstream.options(max_concurrency=2).remote()
# no need to set downstream as threaded because it just runs a single funciton
downstream = Downstream.remote(upstream)
upstream_ref = upstream.run_forever.remote()
downstream_ref = downstream.run_forever.remote()
ray.get(downstream_ref)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment