Created
November 10, 2017 02:32
-
-
Save wil3/0e20a898f7d29bc987a2dbe7159d3065 to your computer and use it in GitHub Desktop.
Python 2 Producer/Consumer demonstration
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Introduce producer/consumer model with thread safe queues | |
This demo can be first illustrated as an M/M/K queue in which we have incoming | |
messages at some rate added to a single queue, each consumer thread will | |
continually get and process items from the queue. | |
This is to demonstrate tradeoffs with the queue size, and number | |
of workers to use during processing and how it affects throughput performance. | |
""" | |
__author__ = "William Koch" | |
__email__ = "wfkoch [at] bu.edu" | |
# Import libraries we will be using | |
# This library is for multi-threading | |
import threading | |
# For Python 3 | |
#import queue | |
import Queue | |
#Lets log stuff | |
import logging | |
#Helper stuff | |
import time | |
import random | |
import string | |
import hashlib | |
logging.basicConfig(level=logging.DEBUG) | |
logger = logging.getLogger("main") | |
logger_p = logging.getLogger("Producer") | |
logger_c = logging.getLogger("Consumer") | |
RUNNING = True | |
# Create our base case | |
class SimBase(threading.Thread): | |
def __init__(self, q, rate): | |
self.running = True | |
self.q = q | |
self.rate = rate | |
self.count = 0 | |
super(SimBase, self).__init__() | |
class SimProducer(SimBase): | |
def __init__(self, q, rate): | |
super(SimProducer, self).__init__(q, rate) | |
def _generate_message(self): | |
return str(random.randint(0, 1000)) | |
def run(self): | |
while RUNNING: | |
message = self._generate_message() | |
if not self.q.full(): | |
logger_p.info("{} Producing {}".format(threading.current_thread().name, message)) | |
self.q.put(message) | |
logger_p.debug("{} Q = {}".format(threading.current_thread().name, self.q.queue)) | |
self.count += 1 | |
else: | |
logger_p.warn("{} Q Full! Dropping {}".format(threading.current_thread().name, message)) | |
# Time between events from poisson distirubtion are exponential | |
wait = random.expovariate(self.rate) | |
logger_p.debug("{} Sleeping for = {}".format(threading.current_thread().name, wait)) | |
time.sleep(wait) | |
logger_p.debug("{} Done".format(threading.current_thread().name)) | |
class DistributedPOW(SimBase): | |
def __init__(self, q, rate, difficulty): | |
self.difficulty = difficulty | |
super(DistributedPOW, self).__init__(q, rate) | |
def _pow(self, message): | |
h = None | |
nonce = 0 | |
while True: | |
message = "{}{}".format(message, nonce ) | |
message = message.encode("utf-8") | |
h = hashlib.sha256(message).hexdigest() | |
nonce += 1 | |
if self.valid_prefix(h): | |
break | |
return (h, nonce) | |
def _send(self, message, h, nonce): | |
# Simulate waiting for a resource whether a network interface, database, etc | |
# to do something with the POW | |
# | |
# Note that previously adding threads does not increase throughput because | |
# the threads are not split on the cores. However here we are blocked not | |
# consuimg any resources thus another thread calculating the POW can | |
# access the CPU | |
wait = random.expovariate(self.rate) | |
logger_c.debug("{} Sleeing for = {}".format(threading.current_thread().name, wait)) | |
time.sleep(wait) | |
def valid_prefix(self, hash): | |
"""Determine if the hash is prepended by the right number of zeros""" | |
return hash[:self.difficulty] == "".join(['0'] * self.difficulty) | |
def run(self): | |
while RUNNING: | |
# If we block here we could reach a deadlock | |
try: | |
message = self.q.get(block=False) | |
logger_c.info("{} Consuming {}".format(threading.current_thread().name, message)) | |
(h, nonce) = self._pow(message) | |
self._send(message, h, nonce) | |
logger_c.info("{} Created POW ({}, {}, {})".format(threading.current_thread().name, message, nonce, h)) | |
self.count += 1 | |
# According to the docs if empty returns true its not gaurenteed | |
# get will not try and get from an empty queue so just | |
# ignore this if it happens | |
except Queue.Empty: | |
pass | |
logger_c.debug("{} Done".format(threading.current_thread().name)) | |
if __name__ == "__main__": | |
random.seed(3) | |
BUFF_SIZE = 10 | |
SIM_TIME = 10.0 | |
difficulty = 3 | |
q = Queue.Queue(BUFF_SIZE) | |
num_consumers = 10 | |
consumers = [] | |
# If we have a really fast producer the queue will overflow | |
# so we can either do 1 or two things, increase workers | |
# or increase time to process | |
producer_rate = 1#0.1 | |
consumer_rate = 1#3.0 | |
# init | |
producer = SimProducer(q, 1.0/producer_rate) | |
for i in range(num_consumers): | |
consumers.append(DistributedPOW(q, 1.0/consumer_rate, difficulty)) | |
# start | |
producer.start() | |
for w in consumers: | |
w.start() | |
# run | |
start_time = time.time() | |
end_time = start_time + SIM_TIME | |
while True: | |
if time.time() > end_time: | |
RUNNING = False | |
break | |
time.sleep(1) | |
# cleanup and compute tput | |
# What we can demonstrate here is that although threads may not increase | |
# paramlization in some applications it can increase throughput | |
# If we play around with the number of consumers and the rate we can show this | |
producer.join() | |
produce_count = producer.count | |
consumer_count = 0 | |
for w in consumers: | |
w.join() | |
consumer_count += w.count | |
logger.info("Tput P={} C={}".format(produce_count/SIM_TIME, consumer_count/SIM_TIME)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment