Created
December 7, 2017 17:24
-
-
Save RyanKung/658f89ceca0904f09f570e2b566ce91a to your computer and use it in GitHub Desktop.
actor-pika.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pika | |
import json | |
import asyncio | |
from pulsar import send | |
from pulsar.async.consts import ACTOR_STATES | |
from pulsar.apps import Application | |
from pulsar import command | |
@command(ack=False) | |
async def fire(request, event, **kw): | |
if request.actor.state is not ACTOR_STATES.STOPPING: | |
request.actor.fire_event(event, **kw) | |
class Measurement: | |
name = 'test_measure' | |
rate = 1 | |
async def fetch(self): | |
return { | |
'hello': 'world' | |
} | |
async def mapper(self, datum: dict): | |
datum.update({'world': 'hello'}) | |
return datum | |
async def measure(self, actor, exc=None): | |
while actor.state is not ACTOR_STATES.STOPPING: | |
await asyncio.sleep(self.rate) | |
datum = await self.mapper( | |
await self.fetch() | |
) | |
await send( | |
actor.monitor, | |
'fire', | |
'spout', | |
data=datum | |
) | |
class Entanglement: | |
name = 'test_entangle' | |
rate = 1 | |
def __init__( | |
self, | |
exchange: str, | |
*args, | |
**kwargs | |
): | |
self.exchange = exchange | |
async def mapper(self, datum): | |
return datum | |
async def connect(self): | |
self.connection = pika.BlockingConnection() | |
self.channel = self.connection.channel() | |
self.queue = self.channel.queue_declare(exclusive=True).method.queue | |
async def handler(self, channel, method, properties, body): | |
await send(self.actor.monitor, 'fire', 'spout', body) | |
async def entangle(self, actor, exc=None): | |
await self.connect() | |
self.channel.queue_bind( | |
exchange=self.exchange, | |
queue=self.queue | |
) | |
# Check pika.adepter.process_data_events | |
while actor.state is not ACTOR_STATES.STOPPING: | |
await asyncio.sleep(self.rate) | |
body: bytes = self.channel.basic_get(self.queue)[2] | |
if not body: | |
continue | |
datum = await self.mapper(body.decode()) | |
await send( | |
actor.monitor, | |
'fire', | |
'spout', | |
data=datum | |
) | |
self.connection.close() | |
class Monitor(Application): | |
def __init__( | |
self, | |
name: str, | |
exchange: str, | |
exchange_type: str, | |
*args, | |
**kwargs | |
): | |
self.name = name | |
self.exchange: str = exchange | |
self.exchange_type: str = exchange_type | |
super().__init__(*args, **kwargs) | |
self._measurements = [] | |
self._measuring = [] | |
self._entanglements = [] | |
self._entangleing = [] | |
def spout( | |
self, | |
monitor, | |
data, | |
**kwargs | |
): | |
assert self.channel.basic_publish( | |
exchange=self.exchange, | |
routing_key='', | |
body=json.dumps(data) | |
) | |
return | |
def connect_channel(self): | |
self.connection = pika.BlockingConnection() | |
self.channel = self.connection.channel() | |
self.channel.exchange_declare( | |
exchange=self.exchange, | |
exchange_type=self.exchange_type | |
) | |
async def monitor_start(self, monitor, exc=None): | |
monitor.bind_event('spout', self.spout) | |
self.connect_channel() | |
self._measuring: list = [ | |
self.spawn_measuring_actor(monitor, m) | |
for m in self._measurements | |
] | |
self._entangling: list = [ | |
self.spawn_entangling_actor(monitor, e) | |
for e in self._entanglements | |
] | |
async def monitor_stopping(self, monitor, exc=None): | |
self.connection.close() | |
def actor_info(self): | |
return { | |
'test': 'test' | |
} | |
def spawn_measuring_actor(self, monitor, m: Measurement): | |
actor = monitor.spawn( | |
name=m.name, | |
start=m.measure | |
) | |
return actor | |
def spawn_entangling_actor(self, monitor, e: Entanglement): | |
actor = monitor.spawn( | |
name=e.name, | |
start=e.entangle | |
) | |
return actor | |
def measure(self, m: Measurement): | |
self._measurements.append(m) | |
def entangle(self, e: Entanglement): | |
self._entanglements.append(e) | |
if __name__ == '__main__': | |
monitor = Monitor( | |
name='pikachu_monitor', | |
exchange_type='fanout', | |
exchange='test', | |
workers=2 | |
) | |
monitor.measure(Measurement()) | |
monitor.entangle(Entanglement('test')) | |
monitor.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
requirements:
pika==0.11.0
pulsar==1.6.4