Skip to content

Instantly share code, notes, and snippets.

@kkirsanov
Last active January 14, 2017 11:55
Show Gist options
  • Save kkirsanov/16009e49f807472c57962324fd538904 to your computer and use it in GitHub Desktop.
Save kkirsanov/16009e49f807472c57962324fd538904 to your computer and use it in GitHub Desktop.
Кафка
{"namespace": "sms.smsgate.rfi",
"type": "record",
"name": "SMS",
"fields": [
{"name": "message", "type": "string"},
{"name": "operator", "type": "string"},
{"name": "date", "type": "string"}
]
}
from kafka import KafkaConsumer
import avro.schema
import avro.io
import io
consumer = KafkaConsumer('sms',
group_id='sms',
bootstrap_servers=['localhost:9092'])
schema_path = "sms.avsc"
schema = avro.schema.Parse(open(schema_path).read())
# блокирующий цикл чтения
# Нужно понять как его правильно стопать с сохранением оффсета
for msg in consumer:
print(msg.value) # b'\x0etest123\x0ebeeline42017-01-14 14:45:29.860567'
bytes_reader = io.BytesIO(msg.value)
decoder = avro.io.BinaryDecoder(bytes_reader)
reader = avro.io.DatumReader(schema)
sms = reader.read(decoder)
print (sms)
import io
import datetime
import avro.schema
from kafka import SimpleProducer, SimpleClient
from avro.io import DatumWriter
kafka = SimpleClient('localhost:9092')
producer = SimpleProducer(kafka)
topic = "sms"
schema = avro.schema.Parse(open("sms.avsc").read())
writer = DatumWriter(schema)
for i in range(100):
bytes_writer = io.BytesIO()
encoder = avro.io.BinaryEncoder(bytes_writer)
writer.write(dict(message='test123', operator="beeline", date=str(datetime.datetime.now())), encoder)
raw_bytes = bytes_writer.getvalue()
a = producer.send_messages(topic, raw_bytes)
print(a) # [ProduceResponsePayload(topic='sms', partition=0, error=0, offset=1423)]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment