Last active
April 23, 2019 11:39
-
-
Save mstump/147fc4a29bd265f4328dfdd6096a7336 to your computer and use it in GitHub Desktop.
Python Faust with an expiring cache reading from historical data, also using protobuf3 as the serializer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import calendar | |
import faust | |
import itertools | |
import random | |
import signal | |
import string | |
import time | |
import uuid | |
from contextlib import suppress | |
from datetime import datetime, timedelta | |
from faust import windows | |
from faust.serializers import codecs | |
from google.protobuf.timestamp_pb2 import Timestamp | |
from threading import Thread, RLock | |
from typing import Any | |
import page_views_pb2 | |
def copy_attributes(source, dest): | |
fields = None | |
if isinstance(source, dict): | |
for name in source.keys(): | |
if hasattr(dest, name): | |
setattr(dest, name, source.get(name)) | |
else: | |
if hasattr(source, "_meta"): | |
fields = source._meta.fields | |
else: | |
fields = dir(source) | |
for name in fields: | |
if not name.startswith('_'): | |
value = getattr(source, name) | |
setattr(dest, name, value) | |
return dest | |
class PageView(faust.Record, serializer='page_view_pb'): | |
id: str = None | |
user: str = None | |
occurred_at: datetime = None | |
@property | |
def occurred_at_posix(self): | |
return calendar.timegm(self.occurred_at.timetuple()) | |
@staticmethod | |
def id_hash(pv): | |
return pv.id | |
def _pbSupport(*args): | |
class PBSupport(codecs.Codec): | |
def _dumps(self, obj: Any) -> bytes: | |
out_obj = page_views_pb2.PageView() | |
out_obj.id = obj["id"] | |
out_obj.user = obj["user"] | |
out_obj.occurred_at.FromDatetime(obj["occurred_at"]) | |
return out_obj.SerializeToString() | |
def _loads(self, s: bytes) -> Any: | |
pb_obj = page_views_pb2.PageView() | |
pb_obj.ParseFromString(s) | |
out_obj = PageView() | |
out_obj.id = pb_obj.id | |
out_obj.user = pb_obj.user | |
out_obj.occurred_at = pb_obj.occurred_at.ToDatetime() | |
return out_obj | |
codecs.register('page_view_pb', PBSupport()) | |
_pbSupport() | |
class CacheEntry(): | |
def __init__(self, value, ttl=20, expires_at=None): | |
self.value = value | |
if expires_at: | |
self.expires_at = expires_at | |
else: | |
self.expires_at = time.time() + ttl | |
self._expired = False | |
def expired(self, expire_clock=None): | |
if not expire_clock: | |
expire_clock = time.time() | |
if self._expired is False: | |
return (self.expires_at < expire_clock) | |
else: | |
return self._expired | |
class CacheList(): | |
def __init__(self): | |
self.entries = [] | |
self.lock = RLock() | |
def add_entry(self, value, ttl=20, expires_at=None): | |
with self.lock: | |
self.entries.append(CacheEntry(value, ttl, expires_at)) | |
def read_entries(self, expire_clock=None): | |
with self.lock: | |
self.entries = list(itertools.dropwhile(lambda x: x.expired(expire_clock), self.entries)) | |
return [entry.value for entry in self.entries] | |
class CacheSet(): | |
def __init__(self, hash_function=hash): | |
self.entries = dict() | |
self.lock = RLock() | |
self.hash_function = hash_function | |
def add_entry(self, value, ttl=20, expires_at=None): | |
with self.lock: | |
self.entries[self.hash_function(value)] = CacheEntry(value, ttl, expires_at) | |
def read_entries(self, expire_clock=None): | |
with self.lock: | |
self.entries = dict(itertools.dropwhile(lambda x: x[1].expired(expire_clock), self.entries.items())) | |
return [entry.value for entry in self.entries.values()] | |
cache = CacheSet(PageView.id_hash) | |
ttl = 10 | |
app = faust.App( | |
'page_views', | |
broker='kafka://localhost:9092', | |
topic_partitions=4, | |
) | |
page_view_topic = app.topic('page_views', value_type=PageView) | |
active_users_table = app.Table( | |
'active_users', | |
default=None).tumbling( | |
ttl, | |
expires=timedelta(seconds=30), | |
key_index=True | |
).relative_to_field(PageView.occurred_at) | |
# @app.timer(interval=2, on_leader=True) | |
# async def generator(): | |
# user = ''.join(random.choices(string.ascii_uppercase + string.digits, k=7)) | |
# page_view = PageView(str(uuid.uuid4()), user, datetime.utcnow()) | |
# await page_view_topic.send(value=page_view) | |
@app.agent(page_view_topic) | |
async def print_windowed_events(stream): | |
async for page_view in stream: | |
cache.add_entry(page_view, expires_at=page_view.occurred_at_posix + ttl) | |
cache_entries = cache.read_entries(page_view.occurred_at_posix) | |
print(f"seconds in the past: {datetime.utcnow() - page_view.occurred_at}") | |
print(f"{len(cache_entries)}, {repr(cache_entries)}") | |
@asyncio.coroutine | |
def custom_sleep(): | |
print("SLEEP", datetime.now()) | |
yield from asyncio.sleep(1) | |
async def historical_producer(): | |
d = datetime.utcnow() - timedelta(seconds=100) | |
for i in range(100): | |
user = ''.join(random.choices(string.ascii_uppercase + string.digits, k=7)) | |
page_view = PageView(str(uuid.uuid4()), user, d) | |
await page_view_topic.send(value=page_view) | |
d += timedelta(seconds=1) | |
await custom_sleep() | |
if __name__ == "__main__": | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(historical_producer()) | |
loop.stop() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
syntax = "proto3"; | |
import "google/protobuf/timestamp.proto"; | |
package page_views; | |
message PageView { | |
string id = 1; | |
string user = 2; | |
google.protobuf.Timestamp occurred_at = 3; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment