Skip to content

Instantly share code, notes, and snippets.

@epsi95
Created November 6, 2020 18:38
Show Gist options
  • Select an option

  • Save epsi95/590c3c7fd560693cbf96d5f5ac00dbe2 to your computer and use it in GitHub Desktop.

Select an option

Save epsi95/590c3c7fd560693cbf96d5f5ac00dbe2 to your computer and use it in GitHub Desktop.
from confluent_kafka import Consumer, KafkaError
import logging
import sys
import json
import datetime
logging.basicConfig(format='%(process)d >> %(asctime)s - %(message)s', level=logging.INFO)
settings = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'over-speed-detect',
'client.id': sys.argv[1],
'enable.auto.commit': True,
'session.timeout.ms': 6000,
'default.topic.config': {'auto.offset.reset': 'smallest'}
}
c = Consumer(settings)
c.subscribe(['car_data'])
try:
while True:
msg = c.poll(0.1)
if msg is None:
continue
elif not msg.error():
# logging.info('Received message: {0}'.format(msg.value()))
data = json.loads(msg.value().decode("utf-8"))
time = datetime.datetime.fromtimestamp(data["timestamp"])
if data["speed"] >= 50:
logging.info(f"{data['id']} has done overs-peeding @ {data['speed']} @ {time.day}/{time.month}/{time.year} {time.time().hour}:{time.time().minute}:{time.time().second}")
elif msg.error().code() == KafkaError._PARTITION_EOF:
logging.info('End of partition reached {0}/{1}'
.format(msg.topic(), msg.partition()))
else:
logging.info('Error occured: {0}'.format(msg.error().str()))
except KeyboardInterrupt:
pass
finally:
c.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment