Last active
April 22, 2021 20:28
-
-
Save gleicon/189ce5cef3d8d1ea3dba509f395a578f to your computer and use it in GitHub Desktop.
es -> kafka
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import xmltodict | |
import json | |
import sys | |
import getopt | |
import time | |
from datetime import datetime | |
import collections | |
from elasticsearch import Elasticsearch | |
import logging | |
from kafka import KafkaProducer | |
# pip install kafka-python elasticsearch python-snappy | |
logger = logging.getLogger("kafka_reinject") | |
logger.setLevel(logging.DEBUG) | |
ch = logging.StreamHandler() | |
ch.setLevel(logging.DEBUG) | |
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
ch.setFormatter(formatter) | |
logger.addHandler(ch) | |
ES_SERVER = "internal-elasticsearch-prd-archive-2002760023" | |
ES_INDEX = "tracker_archive-2017.03.01" | |
KAFKA_BROKERS = ["event-hub-1","event-hub-2", "event-hub-3"] | |
KAFKA_TOPIC = "tracker.replaydata.v1" | |
ES = None | |
KAFKA = None | |
try: | |
ES = Elasticsearch(hosts=[ES_SERVER]) | |
logger.info(ES) | |
except Exception, e: | |
logger.error("Exception ES: %s" % e) | |
sys.exit(-1) | |
try: | |
KAFKA_PRODUCER = KafkaProducer(bootstrap_servers=KAFKA_BROKERS, client_id="kafka_reinject", compression_type="snappy", retries=3, value_serializer=lambda m: json.dumps(m).encode('utf-8')) | |
logger.info(KAFKA_PRODUCER) | |
except Exception, e: | |
logger.error("Exception Kafka: %s", e) | |
sys.exit(-1) | |
def index_on_es(obj): | |
try: | |
pid = None | |
if obj.has_key("item:item"): | |
pid = obj["item:item"]["item:itemId"] | |
elif obj.has_key("item1:item"): | |
pid = obj["item1:item"]["item:itemId"] | |
elif obj.has_key("item:itemId"): | |
pid = obj["item:itemId"] | |
else: | |
pid = surrogate_counter | |
surrogate_counter+=1 | |
logger.error("no item id, surrogate key: %d" % surrogate_counter) | |
r = es.index(index="produtos", doc_type="produto", id=pid, body=json.dumps(obj)) | |
logger.info(r) | |
except Exception, e: | |
logger.error("Exception index on es: %s" % e) | |
def doc_adapter_vtnc(doc): | |
# from js | |
# const toArray = (v => typeof(v) === 'string' ? [v] : v); | |
# const massagers = { | |
# device_id: d=>d.device?d.device.id:undefined, | |
# referrer: d=>d.referer, | |
# creative_id: d=>toArray(d.creative_id), | |
# placement_id: d=>toArray(d.placement_id), | |
# site_id: d=>toArray(d.site_id) | |
#}; | |
doc["device_id"] = doc.get("device_id", "undefined") | |
if doc.get("referer", None) is not None: | |
doc["referrer"] = doc["referer"] | |
doc["creative_id"] = [doc.get("creative_id", "")] | |
doc["placement_id"] = [doc.get("placement_id", "")] | |
doc["site_id"] = [doc.get("site_id", "")] | |
return doc | |
def main(): | |
if ES is None or KAFKA_PRODUCER is None: | |
print "No ES or KAFKA ready" | |
sys.exit(-1) | |
res = ES.search(index=ES_INDEX, body={"query": {"match_all": {}}}, size=10, from_=0) | |
for hit in res["hits"]["hits"]: | |
hs = hit["_source"] | |
hs["_type"] = hit["_type"] | |
print "Injecting from %s - campaign_id: %s - visitor_id: %s - type: %s" % (hs["timestamp"], hs["campaign_id"], hs["visitor_id"], hs["_type"]) | |
try: | |
doc = doc_adapter_vtnc(hs) | |
fs = KAFKA_PRODUCER.send(KAFKA_TOPIC, doc) | |
record = fs.get(timeout=60) # future | |
print record.offset | |
except Exception, ke: | |
logger.error("Kafka error: %s" % ke) | |
pass | |
# print json.dumps(hit["_source"]) | |
KAFKA_PRODUCER.flush() | |
time.sleep(60) # test to push to kafka | |
if __name__ == "__main__": | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment