Skip to content

Instantly share code, notes, and snippets.

@secemp9
Created November 28, 2022 17:20
Show Gist options
  • Save secemp9/96dd0ad46e2e02928c9bb30c8440e01c to your computer and use it in GitHub Desktop.
Save secemp9/96dd0ad46e2e02928c9bb30c8440e01c to your computer and use it in GitHub Desktop.
pickle + queue/deque inside a file
import pickle
import collections
import queue
def f():
return "hello" # it's a test
class PickleQueue:
filename = "queue.pickle"
default_type = collections.deque()
def __init__(self, lock=False, default_type=default_type):
print(lock)
with open('queue.pickle', 'wb') as handle:
pickle.dump(default_type, handle, protocol=pickle.HIGHEST_PROTOCOL)
pass
def put_pickle_inside_file(a):
with open('queue.pickle', 'wb') as handle:
pickle.dump(a, handle, protocol=pickle.HIGHEST_PROTOCOL)
def queue(self):
with open('queue.pickle', 'rb') as handle:
return pickle.load(handle)
def put(self, a=None):
if a is None:
return
else:
with open('queue.pickle', 'rb') as handle:
try:
b = pickle.load(handle)
b.append(a)
except EOFError:
pass
with open('queue.pickle', 'wb') as handle:
pickle.dump(b, handle, protocol=pickle.HIGHEST_PROTOCOL)
def get(self):
with open('queue.pickle', 'rb') as handle:
try:
b = pickle.load(handle)
wow = b.popleft()
except EOFError:
pass
except IndexError:
pass
with open('queue.pickle', 'wb') as handle:
pickle.dump(b, handle, protocol=pickle.HIGHEST_PROTOCOL)
return wow
def size(self):
with open('queue.pickle', 'rb') as handle:
try:
b = pickle.load(handle)
return len(b)
except EOFError:
pass
test1 = PickleQueue()
test2 = queue.Queue()
test1.put(f)
test2.put(f)
print(test1.get())
print(test2.get())
print("test1", test1)
print("test2", test2)
test1.put(f)
test2.put(f)
print("test1", test1.queue)
print("test2", test2.queue)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment