Skip to content

Instantly share code, notes, and snippets.

@WalBeh
Last active November 28, 2018 12:29
Show Gist options
  • Select an option

  • Save WalBeh/b3834cdfb1507f28a6450d872cb707d0 to your computer and use it in GitHub Desktop.

Select an option

Save WalBeh/b3834cdfb1507f28a6450d872cb707d0 to your computer and use it in GitHub Desktop.
run it eg. with `./batch_send.py --count 100`, the AEH partition to write to is randomize (32 partitions assumed)
#!/usr/bin/env python
# https://github.com/Azure/azure-event-hubs-python
"""
An example to show batch sending events to an Event Hub.
"""
# pylint: disable=C0111
import sys
import logging
import datetime
import time
import os
import json
import random
import string
import datetime
import sys
import getopt
import tqdm
from azure.eventhub import EventHubClient, Sender, EventData
import examples
try:
opts, args = getopt.getopt(sys.argv[1:], "hc:p", ["help", "count=", "partition="])
except getopt.GetoptError as err:
print(err) # will print something like "option -a not recognized"
sys.exit(2)
output = None
verbose = False
for o, a in opts:
if o == "-h":
print ("help")
elif o in ("-c", "--count"):
count = int(a)
elif o in ("-p", "--partition"):
party = str(a)
else:
assert False, "unhandled option"
party = str(random.randint(0,31))
# Address can be in either of these formats:
# "amqps://<URL-encoded-SAS-policy>:<URL-encoded-SAS-key>@<mynamespace>.servicebus.windows.net/myeventhub"
# "amqps://<mynamespace>.servicebus.windows.net/myeventhub"
ADDRESS = os.environ.get('EVENT_HUB_ADDRESS')
# SAS policy and key are not required if they are encoded in the URL
USER = os.environ.get('EVENT_HUB_SAS_POLICY')
KEY = os.environ.get('EVENT_HUB_SAS_KEY')
print(f"Connecting to {ADDRESS} --- Partition: {party}")
mess1=["Buegel", "Bogen", "Oese", "Kante", "Rohrbogen", "Eckstueck"]
mess2=["vorne", "hinten", "unten", "oben", "mitte", "versetzt"]
mess3=["links", "rechts", "vertikal", "horizontal", "diagonal"]
def gen_messstelle():
return " ".join(random.sample(mess1,1) + random.sample(mess2,1) + random.sample(mess3,1))
def gen_auftragsnr():
return "".join(str(random.randint(100000,300000)) + " " + str(random.randint(1000,9999)) + " " \
+ str(random.randint(1001,9999)) + " " + str(random.randint(1,9)) + " " + str(random.randint(1,99)) )
def gen_kurztxt(chars=string.ascii_uppercase + string.digits + " "):
return "".join(random.choice(chars) for x in range(random.randint(14,28)))
def gen_ts():
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
def data_generator():
for i in range(1):
sample={
"str_messstelle": gen_messstelle(),
"str_zeitstempel": gen_ts()[:-3],
"by_zeitstempel_mikrosek": random.randint(100,999),
"ui_messzyklus": random.randint(300,600),
"udi_anzahl_messpunkte": random.randint(300,600),
"by_modulnummer": random.randint(12,99),
"str_auftragsnummer": gen_auftragsnr(),
"str_auftragsbeginn": "".join(datetime.datetime.now().strftime("%H%M%d")),
"udi_teilenummer": random.randint(1000,9000),
"i_umgebungstemp_ks": random.randint(10,900),
"i_umgebungstemp_bg": random.randint(10,900),
"i_elektrodentemperatur_oben": random.randint(10,900),
"i_elektrodentemperatur_unten": random.randint(10,900),
"uid_letzte_elektrobodenreinigung": random.randint(10,900),
"udi_letzte_elektrobodenwechsel": random.randint(10,900),
"udi_letzte_veraenderung": random.randint(10,900),
"udi_letzter_tausch_komponente": random.randint(100000,900000),
"i_werkstuecktraeger_nummer": random.randint(1,900),
"by_betriebszustand_modul": random.randint(1,900),
"by_ergebnis_modul": random.randint(1,900),
"udi_leistung": random.randint(100000,900000),
"udi_energie": random.randint(1000,9999),
"udi_schmelzintegral": random.randint(1000,9999),
"udi_lebensdauer_prognose": random.randint(100000,900000),
"str_version_sps_berechnung": "".join(datetime.datetime.now().strftime("%Y-%m-%d")),
"str_artikelkurztext": gen_kurztxt(),
"by_artikeltyp": random.randint(1,900),
"int_nennlaenge": random.randint(1,900),
"udi_kontaktierkraft_vorgabe": random.randint(100000,900000),
"udi_nachsetzkraft_vorgabe": random.randint(100000,900000),
"int_schweisszeit_vorgabe": random.randint(1,900),
"udi_schweissstrom_vorgabe": random.randint(1,900),
"by_kontaktierung_variante": random.randint(1,900),
"by_nachsetzen_variante": random.randint(1,900),
"int_abkuehlzeit": random.randint(1,900),
"str_servoregler_parameter": gen_kurztxt(),
"measurements": []
}
for x in range(random.randint(450,500)):
item={ "i_kraft": random.randint(1000,9999),
"i_elektrodenspannung": random.randint(110,380),
"di_schweissstrom": random.randint(1000,9999),
"i_servoposition": random.randint(1000,9999) }
sample["measurements"].append(item)
# logger.info("run ----- {}".format(i))
# logger.info("event -- {}".format(json.dumps(sample)))
yield json.dumps(sample)
try:
if not ADDRESS:
raise ValueError("No EventHubs URL supplied.")
client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
sender = client.add_sender(partition=party)
client.run()
try:
start_time = time.time()
for loop in tqdm.tqdm(range(count)):
start=time.time()
data = EventData(batch=data_generator())
sender.send(data)
except:
raise
finally:
end_time = time.time()
client.stop()
run_time = end_time - start_time
# logger.info("Runtime: {} seconds".format(run_time))
except KeyboardInterrupt:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment