Skip to content

Instantly share code, notes, and snippets.

@slayton
Created January 22, 2024 19:37
Show Gist options
  • Save slayton/3467747c06eb633c3cd2d0064682a202 to your computer and use it in GitHub Desktop.
Save slayton/3467747c06eb633c3cd2d0064682a202 to your computer and use it in GitHub Desktop.
Consumer Group
import asyncio
from faststream import FastStream, Logger
from faststream.redis import RedisBroker, StreamSub
broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker, logger=None)
chan = "chan"
word_publisher = broker.publisher(chan)
@broker.subscriber(stream=StreamSub(chan, group='group1', consumer='1'))
async def upper(word: str, logger: Logger) -> None:
upper = word.upper()
logger.info(f"UPPER:{upper}")
@broker.subscriber(stream=StreamSub(chan, group='group2', consumer='2'))
async def lower(word: str, logger: Logger) -> None:
lower = word.lower()
logger.info(f"LOWER:{lower}")
async def send(word) -> None:
await word_publisher.publish(word)
@app.after_startup
async def run():
words = ['APPle', 'banaNA']
await asyncio.gather(*[send(w) for w in words])
@slayton
Copy link
Author

slayton commented Jan 22, 2024

faststream run consumer_group:app
2024-01-22 14:35:02,920 INFO     - chan |                 - `Upper` waiting for messages

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment