|
import argparse |
|
import random |
|
import time |
|
import string |
|
import sys |
|
import os |
|
import socket |
|
|
|
class DogStatsDMessage: |
|
def __init__(self, metric_name, metric_values, metric_type, metric_tags): |
|
self.metric_name = metric_name |
|
self.metric_values = metric_values |
|
self.metric_type = metric_type |
|
self.metric_tags = metric_tags |
|
|
|
def __str__(self): |
|
tags = ','.join(self.metric_tags) |
|
values = ":".join([str(v) for v in self.metric_values]) |
|
return f"{self.metric_name}:{values}|{self.metric_type}|{tags}" |
|
|
|
def send_to_socket(socket_path, msgs): |
|
# Check if the socket exists |
|
if not os.path.exists(socket_path): |
|
print(f"Socket at path {socket_path} does not exist") |
|
exit(1) |
|
|
|
# Create a socket object |
|
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) |
|
|
|
try: |
|
# Connect to the socket |
|
sock.connect(socket_path) |
|
|
|
bytes_sent = 0 |
|
bytes_to_megabytes = 1 / (1024 * 1024) |
|
|
|
start_time = time.time() |
|
# Iterate over strings and send each one |
|
while True: |
|
for m in msgs: |
|
bytes_sent += sock.send(str(m).encode()) |
|
# Calculate the elapsed time and print the sending rate |
|
elapsed_time = time.time() - start_time |
|
sending_rate = (bytes_sent * bytes_to_megabytes) / elapsed_time |
|
print(f"\rSending rate: {sending_rate:.2f} MB/s", end='') |
|
sys.stdout.flush() |
|
|
|
|
|
finally: |
|
# Close the socket connection |
|
sock.close() |
|
|
|
def generate_dogstatsd_msgs(args): |
|
random.seed(args.rng_seed) |
|
|
|
messages = [] |
|
metric_types = ["c", "g", "h", "d", "s"] |
|
|
|
for _ in range(random.randint(args.min_num_contexts, args.max_num_contexts)): |
|
# Generate metric name |
|
metric_name = ''.join(random.choices(string.ascii_lowercase, k=50)) |
|
# Generate metric value |
|
metric_values = [random.randint(-sys.maxsize - 1, sys.maxsize)] |
|
# Generate metric type |
|
metric_type = random.choice(metric_types) |
|
# Generate metric tags |
|
num_tags = random.randint(args.min_tags_per_msg, args.max_tags_per_msg) |
|
metric_tags = [] |
|
|
|
for _ in range(num_tags): |
|
key_value = ''.join(random.choices(string.ascii_lowercase, k=20)) |
|
key_value = key_value[:10] + ':' + key_value[10:] |
|
metric_tags.append(key_value) |
|
|
|
if random.random() < args.multivalue_pack_probability: |
|
num_packed_values = random.randint(args.min_multivalue, args.max_multivalue) |
|
for _ in range(num_packed_values): |
|
metric_values.append(random.randint(-sys.maxsize - 1, sys.maxsize)) |
|
|
|
|
|
message = DogStatsDMessage(metric_name, metric_values, metric_type, metric_tags) |
|
messages.append(message) |
|
|
|
return messages |
|
|
|
def extend_dataset(messages, args): |
|
extended_messages = messages.copy() # start with existing messages |
|
serialized_length = sum(sys.getsizeof(str(message)) for message in extended_messages) |
|
|
|
while serialized_length < args.data_size_in_bytes: |
|
# Select a random message to clone |
|
clone_subject = random.choice(extended_messages) |
|
# Generate a new metric values |
|
new_metric_values = [random.randint(-sys.maxsize - 1, sys.maxsize)] |
|
|
|
if random.random() < args.multivalue_pack_probability: |
|
num_packed_values = random.randint(args.min_multivalue, args.max_multivalue) |
|
for _ in range(num_packed_values): |
|
new_metric_values.append(random.randint(-sys.maxsize - 1, sys.maxsize)) |
|
|
|
# Create a new message with the same metric_name, metric_tags, and metric_type, but a new metric_value |
|
new_message = DogStatsDMessage(clone_subject.metric_name, new_metric_values, |
|
clone_subject.metric_type, clone_subject.metric_tags) |
|
|
|
# Check if adding the new message would exceed args.data_size_in_bytes |
|
if serialized_length + sys.getsizeof(str(new_message)) > args.data_size_in_bytes: |
|
break |
|
|
|
# Add the new message and update the serialized length |
|
extended_messages.append(new_message) |
|
serialized_length += sys.getsizeof(str(new_message)) |
|
|
|
return extended_messages |
|
|
|
def main(): |
|
parser = argparse.ArgumentParser(description='Generate dogstatsd messages.') |
|
parser.add_argument('--min_num_contexts', type=int, default=500, |
|
help='Minimum number of contexts') |
|
parser.add_argument('--max_num_contexts', type=int, default=10000, |
|
help='Maximum number of contexts') |
|
parser.add_argument('--rng_seed', type=int, default=42, |
|
help='RNG seed for pseudo-random generation') |
|
parser.add_argument('--min_tags_per_msg', type=int, default=2, |
|
help='Minimum tags per message') |
|
parser.add_argument('--max_tags_per_msg', type=int, default=50, |
|
help='Maximum tags per message') |
|
parser.add_argument('--multivalue_pack_probability', type=float, default=0.1, |
|
help='Probability of packing multiple values') |
|
parser.add_argument('--min_multivalue', type=int, default=2, |
|
help='Minimum multivalue') |
|
parser.add_argument('--max_multivalue', type=int, default=50, |
|
help='Maximum multivalue') |
|
parser.add_argument('--data_size_in_bytes', type=int, default=50 * 1024 * 1024, # 50MB |
|
help='Data size in bytes') |
|
parser.add_argument('--print', action='store_true', |
|
help='Print the messages instead of writing to a file') |
|
parser.add_argument('--unix', type=str, |
|
help='Print the messages instead of writing to a file') |
|
|
|
args = parser.parse_args() |
|
|
|
msgs = generate_dogstatsd_msgs(args) |
|
# msgs now contains data with the specified number of contexts |
|
# Lets extend it to fill up the entire desired data size in bytes |
|
msgs = extend_dataset(msgs, args) |
|
|
|
if args.print: |
|
for msg in msgs: |
|
print(msg) |
|
elif args.unix: |
|
send_to_socket(args.unix, msgs) |
|
else: |
|
with open('dogstatsd_msgs.txt', 'w') as f: |
|
f.write('\n'.join([str(msg) for msg in msgs])) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|