Skip to content

Instantly share code, notes, and snippets.

@serihiro
Created April 29, 2019 01:32
Show Gist options
  • Save serihiro/aa3939051b02dd08bfbcc2fd0905d84a to your computer and use it in GitHub Desktop.
Save serihiro/aa3939051b02dd08bfbcc2fd0905d84a to your computer and use it in GitHub Desktop.
my simple thread safe queue implementation
import os
import sys
import time
import multiprocessing
import threading
import random
class SimpleQueue:
def __init__(self, max_size, response_time=0.1):
self._max_size = max_size
self._queue = []
self._lock = threading.Lock()
self._not_empty_cond = threading.Condition(self._lock)
self._not_full_cond = threading.Condition(self._lock)
self._response_time = response_time
def get(self, timeout=0.1):
with self._lock:
while not self._queue:
self._not_empty_cond.wait(self._response_time)
data = self._queue.pop()
self._not_full_cond.notify()
return data
def put(self, data, timeout=0.1):
with self._lock:
if len(self._queue) >= self._max_size:
while len(self._queue) >= self._max_size:
self._not_full_cond.wait(self._response_time)
else:
self._queue.append(data)
self._not_empty_cond.notify()
def __len__(self):
return len(self._queue)
if __name__ == '__main__':
queue = SimpleQueue(max_size = 1000000)
def generate_data(i):
time.sleep(0.01)
return random.randint(0, 100000)
def enqueue(_queue):
i = 0
while i < 10000:
future = pool.map_async(generate_data, list(range(1,32)))
data = future.get()
_queue.put(data)
print(i)
i += 32
global pool
pool = multiprocessing.Pool(
processes=16
)
t = threading.Thread(target=enqueue, args=[queue])
t.start()
t.join()
print(len(queue))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment