Skip to content

Instantly share code, notes, and snippets.

@behitek
Created August 26, 2021 02:23
Show Gist options
  • Save behitek/1db4ded7dabc4f1ecf26632e9f6ddbc8 to your computer and use it in GitHub Desktop.
Save behitek/1db4ded7dabc4f1ecf26632e9f6ddbc8 to your computer and use it in GitHub Desktop.
import time
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
x = 1
while True:
print('Send message to queue!')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World {}!'.format(x))
x += 1
time.sleep(0.1)
import threading
import time
from datetime import datetime
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
res = channel.queue_declare(queue='hello', passive=True)
is_process_job = False
SLEEP_TIME = 5
IS_IDLE = False
def callback(ch, method, properties, body):
global is_process_job
is_process_job = True
print(" [x] Received %r" % body)
time.sleep(5)
t = threading.Thread(target=worker_timer, daemon=True)
t.start()
is_process_job = False
def worker_timer():
print('Worker', datetime.now())
def timer_task(res_obj):
while True:
time.sleep(SLEEP_TIME)
global is_process_job
print('Debug', res_obj.method.message_count)
if not is_process_job and res_obj.method.message_count == 0:
print('Timer', datetime.now())
else:
print('Timer', 'skipping...')
if __name__ == '__main__':
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
t = threading.Thread(target=timer_task, args=(res,), daemon=True)
t.start()
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment