Skip to content

Instantly share code, notes, and snippets.

@rmb938
Created February 11, 2019 22:59
Show Gist options
  • Select an option

  • Save rmb938/1223f5fc9f6814cf95e9c067ee83c80b to your computer and use it in GitHub Desktop.

Select an option

Save rmb938/1223f5fc9f6814cf95e9c067ee83c80b to your computer and use it in GitHub Desktop.
Implementation of the EventBroadcaster from client-go but using python. It isn't an exact replica but is good enough for my use case.
import enum
import socket
import typing
import arrow
import pylru
from kubernetes import client
MAX_LRU_CACHE_SIZE = 4096
class EventType(enum.Enum):
NORMAL = "Normal"
WARNING = "Warning"
class InvolvedObject(object):
def __init__(self, api_version: str, kind: str, name: str, namespace: str, resource_version: str, uid: str,
field_path: typing.Optional[str] = None):
self.api_version = api_version
self.kind = kind
self.name = name
self.namespace = namespace
self.resource_version = resource_version
self.uid = uid
self.field_path = field_path
class Event(object):
def __init__(self, involved_object: InvolvedObject, message: str, reason: str, type: EventType):
self.involved_object = involved_object
self.message = message
self.reason = reason
self.type = type
self.count = 1
self.name = None
self.first_timestamp = None
self.last_timestamp = None
class EventRecorder(object):
def __init__(self, reporting_component):
self.cache = pylru.lrucache(MAX_LRU_CACHE_SIZE)
self.reporting_component = reporting_component
def _event_key(self, event: Event):
return ''.join([
self.reporting_component,
socket.gethostname(),
event.involved_object.kind,
event.involved_object.namespace,
event.involved_object.name,
event.involved_object.field_path,
event.involved_object.uid,
event.involved_object.api_version,
event.type.value,
event.reason,
event.message
])
def record_event(self, event: Event):
core_api = client.CoreV1Api()
event_key = self._event_key(event)
past_event: Event = self.cache.get(event_key, None)
# As past event exists
if past_event is not None:
# The past event is less than 10 minutes old
# so lets combine the events
if arrow.now() < past_event.last_timestamp.shift(minutes=+10):
event.name = past_event.name
event.first_timestamp = past_event.first_timestamp
event.last_timestamp = arrow.now()
event.count = past_event.count + 1
if event.first_timestamp is None:
event.first_timestamp = arrow.now()
event.last_timestamp = arrow.now()
event_dict = {
'apiVersion': 'v1',
'kind': 'Event',
'metadata': {},
'message': event.message,
'reason': event.reason,
'firstTimestamp': event.first_timestamp.isoformat(),
'lastTimestamp': event.last_timestamp.isoformat(),
'type': event.type.value,
'count': event.count,
'reportingComponent': self.reporting_component,
'reportingInstance': socket.gethostname(),
'involvedObject': {
'apiVersion': event.involved_object.api_version,
'kind': event.involved_object.kind,
'name': event.involved_object.name,
'namespace': event.involved_object.namespace,
'resourceVersion': event.involved_object.resource_version,
'uid': event.involved_object.uid,
},
'source': {
'component': self.reporting_component,
'host': socket.gethostname()
}
}
if event.involved_object.field_path is not None:
event_dict['involvedObject']['fieldPath'] = event.involved_object.field_path
if event.name is None:
event_dict['metadata']['generateName'] = event.involved_object.name + "."
output = core_api.create_namespaced_event(event.involved_object.namespace, event_dict)
event.name = output.metadata.name
self.cache[event_key] = event
else:
event_dict['metadata']['name'] = event.name
core_api.replace_namespaced_event(event.name, event.involved_object.namespace, event_dict)
self.cache[event_key] = event
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment