Skip to content

Instantly share code, notes, and snippets.

@sincejune
Last active May 24, 2018 13:46
Show Gist options
  • Save sincejune/d1e48379e48fe657ae547981886d6762 to your computer and use it in GitHub Desktop.
Save sincejune/d1e48379e48fe657ae547981886d6762 to your computer and use it in GitHub Desktop.
#! /usr/bin/env python
import json
import datetime
import pytz
from kafka import KafkaConsumer
def is_json(o):
try:
json.loads(o)
return True
except ValueError:
return False
consumer = KafkaConsumer("test",
value_deserializer=lambda m: json.loads(m) if is_json(m) else m,
auto_offset_reset='earliest',
bootstrap_servers=['localhost:9092'])
for index, msg in enumerate(consumer):
if index > 100: break
print("{}: {}".format(
datetime.datetime.fromtimestamp(msg.timestamp / 1000, pytz.timezone('Asia/Shanghai')),
msg.value))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment