Skip to content

Instantly share code, notes, and snippets.

@RyanKung
Created December 7, 2017 17:24
Show Gist options
  • Save RyanKung/658f89ceca0904f09f570e2b566ce91a to your computer and use it in GitHub Desktop.
Save RyanKung/658f89ceca0904f09f570e2b566ce91a to your computer and use it in GitHub Desktop.
actor-pika.py
import pika
import json
import asyncio
from pulsar import send
from pulsar.async.consts import ACTOR_STATES
from pulsar.apps import Application
from pulsar import command
@command(ack=False)
async def fire(request, event, **kw):
if request.actor.state is not ACTOR_STATES.STOPPING:
request.actor.fire_event(event, **kw)
class Measurement:
name = 'test_measure'
rate = 1
async def fetch(self):
return {
'hello': 'world'
}
async def mapper(self, datum: dict):
datum.update({'world': 'hello'})
return datum
async def measure(self, actor, exc=None):
while actor.state is not ACTOR_STATES.STOPPING:
await asyncio.sleep(self.rate)
datum = await self.mapper(
await self.fetch()
)
await send(
actor.monitor,
'fire',
'spout',
data=datum
)
class Entanglement:
name = 'test_entangle'
rate = 1
def __init__(
self,
exchange: str,
*args,
**kwargs
):
self.exchange = exchange
async def mapper(self, datum):
return datum
async def connect(self):
self.connection = pika.BlockingConnection()
self.channel = self.connection.channel()
self.queue = self.channel.queue_declare(exclusive=True).method.queue
async def handler(self, channel, method, properties, body):
await send(self.actor.monitor, 'fire', 'spout', body)
async def entangle(self, actor, exc=None):
await self.connect()
self.channel.queue_bind(
exchange=self.exchange,
queue=self.queue
)
# Check pika.adepter.process_data_events
while actor.state is not ACTOR_STATES.STOPPING:
await asyncio.sleep(self.rate)
body: bytes = self.channel.basic_get(self.queue)[2]
if not body:
continue
datum = await self.mapper(body.decode())
await send(
actor.monitor,
'fire',
'spout',
data=datum
)
self.connection.close()
class Monitor(Application):
def __init__(
self,
name: str,
exchange: str,
exchange_type: str,
*args,
**kwargs
):
self.name = name
self.exchange: str = exchange
self.exchange_type: str = exchange_type
super().__init__(*args, **kwargs)
self._measurements = []
self._measuring = []
self._entanglements = []
self._entangleing = []
def spout(
self,
monitor,
data,
**kwargs
):
assert self.channel.basic_publish(
exchange=self.exchange,
routing_key='',
body=json.dumps(data)
)
return
def connect_channel(self):
self.connection = pika.BlockingConnection()
self.channel = self.connection.channel()
self.channel.exchange_declare(
exchange=self.exchange,
exchange_type=self.exchange_type
)
async def monitor_start(self, monitor, exc=None):
monitor.bind_event('spout', self.spout)
self.connect_channel()
self._measuring: list = [
self.spawn_measuring_actor(monitor, m)
for m in self._measurements
]
self._entangling: list = [
self.spawn_entangling_actor(monitor, e)
for e in self._entanglements
]
async def monitor_stopping(self, monitor, exc=None):
self.connection.close()
def actor_info(self):
return {
'test': 'test'
}
def spawn_measuring_actor(self, monitor, m: Measurement):
actor = monitor.spawn(
name=m.name,
start=m.measure
)
return actor
def spawn_entangling_actor(self, monitor, e: Entanglement):
actor = monitor.spawn(
name=e.name,
start=e.entangle
)
return actor
def measure(self, m: Measurement):
self._measurements.append(m)
def entangle(self, e: Entanglement):
self._entanglements.append(e)
if __name__ == '__main__':
monitor = Monitor(
name='pikachu_monitor',
exchange_type='fanout',
exchange='test',
workers=2
)
monitor.measure(Measurement())
monitor.entangle(Entanglement('test'))
monitor.start()
@RyanKung
Copy link
Author

RyanKung commented Dec 7, 2017

requirements:

pika==0.11.0
pulsar==1.6.4

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment