Skip to content

Instantly share code, notes, and snippets.

@Gsantomaggio
Created March 3, 2025 13:32
Show Gist options
  • Save Gsantomaggio/3baff80b8b74ce7a41719698099ed34f to your computer and use it in GitHub Desktop.
Save Gsantomaggio/3baff80b8b74ce7a41719698099ed34f to your computer and use it in GitHub Desktop.
# type: ignore
from rabbitmq_amqp_python_client import ( # PosixSSlConfigurationContext,; PosixClientCert,
AddressHelper,
AMQPMessagingHandler,
Connection,
Environment,
Event,
ExchangeSpecification,
ExchangeToQueueBindingSpecification,
Message,
OutcomeState,
QuorumQueueSpecification,
)
MESSAGES_TO_PUBLISH = 100
def create_connection(environment: Environment) -> Connection:
connection = environment.connection()
connection.dial()
return connection
def main() -> None:
exchange_name = "orders"
queue_goods_warehouse = "goods_warehouse"
queue_sales_office = "sales_office"
routing_key = "#"
print("connection to amqp server")
environment = Environment(uri="amqp://guest:guest@localhost:5672/")
connection = create_connection(environment)
management = connection.management()
print("declaring exchange and queue")
management.declare_exchange(ExchangeSpecification(name=exchange_name))
management.declare_queue(
QuorumQueueSpecification(name=queue_goods_warehouse)
# QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter")
)
management.declare_queue(
QuorumQueueSpecification(name=queue_sales_office)
# QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter")
)
print("binding queue to exchange")
bind_name = management.bind(
ExchangeToQueueBindingSpecification(
source_exchange=exchange_name,
destination_queue=queue_goods_warehouse,
binding_key=routing_key,
)
)
bind_name = management.bind(
ExchangeToQueueBindingSpecification(
source_exchange=exchange_name,
destination_queue=queue_sales_office,
binding_key=routing_key,
)
)
addr = AddressHelper.exchange_address(exchange_name, routing_key)
print("create a publisher and publish a test message")
publisher = connection.publisher(addr)
print("purging the queue")
# publish 10 messages
for i in range(MESSAGES_TO_PUBLISH):
print("publishing")
status = publisher.publish(Message(body="test"))
if status.remote_state == OutcomeState.ACCEPTED:
print("message accepted")
elif status.remote_state == OutcomeState.RELEASED:
print("message not routed")
elif status.remote_state == OutcomeState.REJECTED:
print("message not rejected")
publisher.close()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment