Last active
November 28, 2018 12:29
-
-
Save WalBeh/b3834cdfb1507f28a6450d872cb707d0 to your computer and use it in GitHub Desktop.
run it eg. with `./batch_send.py --count 100`, the AEH partition to write to is randomize (32 partitions assumed)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| # https://github.com/Azure/azure-event-hubs-python | |
| """ | |
| An example to show batch sending events to an Event Hub. | |
| """ | |
| # pylint: disable=C0111 | |
| import sys | |
| import logging | |
| import datetime | |
| import time | |
| import os | |
| import json | |
| import random | |
| import string | |
| import datetime | |
| import sys | |
| import getopt | |
| import tqdm | |
| from azure.eventhub import EventHubClient, Sender, EventData | |
| import examples | |
| try: | |
| opts, args = getopt.getopt(sys.argv[1:], "hc:p", ["help", "count=", "partition="]) | |
| except getopt.GetoptError as err: | |
| print(err) # will print something like "option -a not recognized" | |
| sys.exit(2) | |
| output = None | |
| verbose = False | |
| for o, a in opts: | |
| if o == "-h": | |
| print ("help") | |
| elif o in ("-c", "--count"): | |
| count = int(a) | |
| elif o in ("-p", "--partition"): | |
| party = str(a) | |
| else: | |
| assert False, "unhandled option" | |
| party = str(random.randint(0,31)) | |
| # Address can be in either of these formats: | |
| # "amqps://<URL-encoded-SAS-policy>:<URL-encoded-SAS-key>@<mynamespace>.servicebus.windows.net/myeventhub" | |
| # "amqps://<mynamespace>.servicebus.windows.net/myeventhub" | |
| ADDRESS = os.environ.get('EVENT_HUB_ADDRESS') | |
| # SAS policy and key are not required if they are encoded in the URL | |
| USER = os.environ.get('EVENT_HUB_SAS_POLICY') | |
| KEY = os.environ.get('EVENT_HUB_SAS_KEY') | |
| print(f"Connecting to {ADDRESS} --- Partition: {party}") | |
| mess1=["Buegel", "Bogen", "Oese", "Kante", "Rohrbogen", "Eckstueck"] | |
| mess2=["vorne", "hinten", "unten", "oben", "mitte", "versetzt"] | |
| mess3=["links", "rechts", "vertikal", "horizontal", "diagonal"] | |
| def gen_messstelle(): | |
| return " ".join(random.sample(mess1,1) + random.sample(mess2,1) + random.sample(mess3,1)) | |
| def gen_auftragsnr(): | |
| return "".join(str(random.randint(100000,300000)) + " " + str(random.randint(1000,9999)) + " " \ | |
| + str(random.randint(1001,9999)) + " " + str(random.randint(1,9)) + " " + str(random.randint(1,99)) ) | |
| def gen_kurztxt(chars=string.ascii_uppercase + string.digits + " "): | |
| return "".join(random.choice(chars) for x in range(random.randint(14,28))) | |
| def gen_ts(): | |
| return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") | |
| def data_generator(): | |
| for i in range(1): | |
| sample={ | |
| "str_messstelle": gen_messstelle(), | |
| "str_zeitstempel": gen_ts()[:-3], | |
| "by_zeitstempel_mikrosek": random.randint(100,999), | |
| "ui_messzyklus": random.randint(300,600), | |
| "udi_anzahl_messpunkte": random.randint(300,600), | |
| "by_modulnummer": random.randint(12,99), | |
| "str_auftragsnummer": gen_auftragsnr(), | |
| "str_auftragsbeginn": "".join(datetime.datetime.now().strftime("%H%M%d")), | |
| "udi_teilenummer": random.randint(1000,9000), | |
| "i_umgebungstemp_ks": random.randint(10,900), | |
| "i_umgebungstemp_bg": random.randint(10,900), | |
| "i_elektrodentemperatur_oben": random.randint(10,900), | |
| "i_elektrodentemperatur_unten": random.randint(10,900), | |
| "uid_letzte_elektrobodenreinigung": random.randint(10,900), | |
| "udi_letzte_elektrobodenwechsel": random.randint(10,900), | |
| "udi_letzte_veraenderung": random.randint(10,900), | |
| "udi_letzter_tausch_komponente": random.randint(100000,900000), | |
| "i_werkstuecktraeger_nummer": random.randint(1,900), | |
| "by_betriebszustand_modul": random.randint(1,900), | |
| "by_ergebnis_modul": random.randint(1,900), | |
| "udi_leistung": random.randint(100000,900000), | |
| "udi_energie": random.randint(1000,9999), | |
| "udi_schmelzintegral": random.randint(1000,9999), | |
| "udi_lebensdauer_prognose": random.randint(100000,900000), | |
| "str_version_sps_berechnung": "".join(datetime.datetime.now().strftime("%Y-%m-%d")), | |
| "str_artikelkurztext": gen_kurztxt(), | |
| "by_artikeltyp": random.randint(1,900), | |
| "int_nennlaenge": random.randint(1,900), | |
| "udi_kontaktierkraft_vorgabe": random.randint(100000,900000), | |
| "udi_nachsetzkraft_vorgabe": random.randint(100000,900000), | |
| "int_schweisszeit_vorgabe": random.randint(1,900), | |
| "udi_schweissstrom_vorgabe": random.randint(1,900), | |
| "by_kontaktierung_variante": random.randint(1,900), | |
| "by_nachsetzen_variante": random.randint(1,900), | |
| "int_abkuehlzeit": random.randint(1,900), | |
| "str_servoregler_parameter": gen_kurztxt(), | |
| "measurements": [] | |
| } | |
| for x in range(random.randint(450,500)): | |
| item={ "i_kraft": random.randint(1000,9999), | |
| "i_elektrodenspannung": random.randint(110,380), | |
| "di_schweissstrom": random.randint(1000,9999), | |
| "i_servoposition": random.randint(1000,9999) } | |
| sample["measurements"].append(item) | |
| # logger.info("run ----- {}".format(i)) | |
| # logger.info("event -- {}".format(json.dumps(sample))) | |
| yield json.dumps(sample) | |
| try: | |
| if not ADDRESS: | |
| raise ValueError("No EventHubs URL supplied.") | |
| client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY) | |
| sender = client.add_sender(partition=party) | |
| client.run() | |
| try: | |
| start_time = time.time() | |
| for loop in tqdm.tqdm(range(count)): | |
| start=time.time() | |
| data = EventData(batch=data_generator()) | |
| sender.send(data) | |
| except: | |
| raise | |
| finally: | |
| end_time = time.time() | |
| client.stop() | |
| run_time = end_time - start_time | |
| # logger.info("Runtime: {} seconds".format(run_time)) | |
| except KeyboardInterrupt: | |
| pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment