Created
September 3, 2017 19:13
-
-
Save loganasherjones/ea5180604eb2b706a51750c8d84330f4 to your computer and use it in GitHub Desktop.
A simple in-memory cache.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# | |
# This software may be modified and distributed under the terms | |
# of the MIT license. See the LICENSE file for details. | |
import uuid | |
from datetime import datetime | |
class MemoryCache(object): | |
def __init__(self, *args, **kwargs): | |
self._cache = {} | |
def add_event(self, event): | |
event_id = uuid.uuid4() | |
self._cache[event_id] = { | |
"event_text": event, | |
"pending_delete": False, | |
"entry_date": datetime.now(), | |
"id": event_id | |
} | |
def get_queued_events(self): | |
events = [] | |
for event in self._cache.values(): | |
if not event['pending_delete']: | |
events.append(event) | |
return events | |
def requeue_queued_events(self, events): | |
for event in events: | |
self._cache[event['id']]['pending_delete'] = False | |
def delete_queued_events(self): | |
ids_to_delete = [event['id'] for event in self._cache.values() if event['pending_delete']] | |
for event_id in ids_to_delete: | |
self._cache.pop(event_id) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment