Skip to content

Instantly share code, notes, and snippets.

@scottopell
Last active August 10, 2023 22:13
Show Gist options
  • Save scottopell/bd1956ce2549efe6bd1b9ffa10fbee20 to your computer and use it in GitHub Desktop.
Save scottopell/bd1956ce2549efe6bd1b9ffa10fbee20 to your computer and use it in GitHub Desktop.
dogstatsd_msgs.txt

DogStatsD Msg Generator

This implementation of a dogstatsd msg generator comes from my mental model of what configuration options I'd like to be able to define in order to flex some specific behavior of a dogstatsd server.

This is not meant to be comprehensive or infinitely customizable, but rather demonstrate the structure of my thought process.

Heavily inspired by (and a prototype for) lading

import argparse
import socket
import asyncio
import random
import string
import threading
from collections import deque
class DogStatsDTemplate:
def __init__(self, metric_name, metric_type, metric_tags):
self.metric_name = metric_name
self.metric_type = metric_type
self.metric_tags = metric_tags
class DogStatsDMessage:
def __init__(self, metric_name, metric_value, metric_type, metric_tags):
self.metric_name = metric_name
self.metric_value = metric_value
self.metric_type = metric_type
self.metric_tags = metric_tags
def __str__(self):
tags = ','.join(self.metric_tags)
return f"{self.metric_name}:{self.metric_value}|{self.metric_type}|{tags}"
def parse_byte_quantity(quantity_str):
quantity_str = quantity_str.lower()
if 'kb' in quantity_str:
return int(quantity_str.replace('kb', '')) * 1024
elif 'mb' in quantity_str:
return int(quantity_str.replace('mb', '')) * 1024 * 1024
elif 'gb' in quantity_str:
return int(quantity_str.replace('gb', '')) * 1024 * 1024 * 1024
else:
return int(quantity_str)
def generate_dogstatsd_templates(num_templates, min_tags, max_tags):
templates = []
metric_types = ["c", "g", "h", "d", "s"]
for _ in range(num_templates):
metric_name = ''.join(random.choices(string.ascii_lowercase, k=50))
metric_type = random.choice(metric_types)
num_tags = random.randint(min_tags, max_tags)
metric_tags = []
for _ in range(num_tags):
key_value = ''.join(random.choices(string.ascii_lowercase, k=20))
key_value = key_value[:10] + ':' + key_value[10:]
metric_tags.append(key_value)
template = DogStatsDTemplate(metric_name, metric_type, metric_tags)
templates.append(template)
return templates
async def generate_traffic(templates, traffic_buffer, buffer_size):
while True:
while len(traffic_buffer) < buffer_size:
print(f"generating\t{len(traffic_buffer)}")
template = random.choice(templates)
metric_value = random.randint(-2**31, 2**31 - 1)
message = DogStatsDMessage(template.metric_name, metric_value, template.metric_type, template.metric_tags)
traffic_buffer.append(message)
await asyncio.sleep(0)
class LeakyBucket:
def __init__(self, rate):
self.rate = rate # bytes per second
self.tokens = 0
self.last_fill_time = asyncio.get_running_loop().time()
async def consume(self, bytes_count):
current_time = asyncio.get_running_loop().time()
elapsed_time = current_time - self.last_fill_time
self.last_fill_time = current_time
self.tokens += self.rate * elapsed_time
if self.tokens > self.rate:
self.tokens = self.rate
if bytes_count <= self.tokens:
self.tokens -= bytes_count
else:
sleep_time = (bytes_count - self.tokens) / self.rate
await asyncio.sleep(sleep_time)
self.tokens = 0
async def send_traffic(sock, traffic_buffer, templates, buffer_size, bytes_per_second):
loop = asyncio.get_running_loop()
leaky_bucket = LeakyBucket(bytes_per_second)
asyncio.ensure_future(generate_traffic(templates, traffic_buffer, buffer_size))
while len(traffic_buffer) < buffer_size:
await asyncio.sleep(0) # wait for buffer to fill up
while True:
print(f"consumingg\t{len(traffic_buffer)}")
message_to_send = str(traffic_buffer.popleft())
await leaky_bucket.consume(len(message_to_send.encode()))
await loop.sock_sendall(sock, message_to_send.encode())
await protocol.send_messages()
def main():
parser = argparse.ArgumentParser(description='DogStatsD Load Generator')
parser.add_argument('--unix_socket_path', default='/path/to/socket.sock', help='Unix domain socket path')
parser.add_argument('--num_templates', type=int, default=100, help='Number of DogStatsD templates')
parser.add_argument('--min_tags', type=int, default=2, help='Minimum number of tags per message')
parser.add_argument('--max_tags', type=int, default=50, help='Maximum number of tags per message')
parser.add_argument('--buffer_size', type=int, default=100, help='Buffer size for DogStatsD traffic')
parser.add_argument('--bytes_per_second', type=parse_byte_quantity, default='1000', help='Bytes per second for leaky bucket rate control (e.g., "50mb" or "1024kb")')
args = parser.parse_args()
templates = generate_dogstatsd_templates(args.num_templates, args.min_tags, args.max_tags)
traffic_buffer = deque(maxlen=args.buffer_size)
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
sock.connect(args.unix_socket_path)
sock.setblocking(False)
# Run the event loop for sending traffic
asyncio.run(send_traffic(sock, traffic_buffer, templates, args.buffer_size, args.bytes_per_second))
if __name__ == '__main__':
main()
import argparse
import random
import time
import string
import sys
import os
import socket
class DogStatsDMessage:
def __init__(self, metric_name, metric_values, metric_type, metric_tags):
self.metric_name = metric_name
self.metric_values = metric_values
self.metric_type = metric_type
self.metric_tags = metric_tags
def __str__(self):
tags = ','.join(self.metric_tags)
values = ":".join([str(v) for v in self.metric_values])
return f"{self.metric_name}:{values}|{self.metric_type}|{tags}"
def send_to_socket(socket_path, msgs):
# Check if the socket exists
if not os.path.exists(socket_path):
print(f"Socket at path {socket_path} does not exist")
exit(1)
# Create a socket object
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
try:
# Connect to the socket
sock.connect(socket_path)
bytes_sent = 0
bytes_to_megabytes = 1 / (1024 * 1024)
start_time = time.time()
# Iterate over strings and send each one
while True:
for m in msgs:
bytes_sent += sock.send(str(m).encode())
# Calculate the elapsed time and print the sending rate
elapsed_time = time.time() - start_time
sending_rate = (bytes_sent * bytes_to_megabytes) / elapsed_time
print(f"\rSending rate: {sending_rate:.2f} MB/s", end='')
sys.stdout.flush()
finally:
# Close the socket connection
sock.close()
def generate_dogstatsd_msgs(args):
random.seed(args.rng_seed)
messages = []
metric_types = ["c", "g", "h", "d", "s"]
for _ in range(random.randint(args.min_num_contexts, args.max_num_contexts)):
# Generate metric name
metric_name = ''.join(random.choices(string.ascii_lowercase, k=50))
# Generate metric value
metric_values = [random.randint(-sys.maxsize - 1, sys.maxsize)]
# Generate metric type
metric_type = random.choice(metric_types)
# Generate metric tags
num_tags = random.randint(args.min_tags_per_msg, args.max_tags_per_msg)
metric_tags = []
for _ in range(num_tags):
key_value = ''.join(random.choices(string.ascii_lowercase, k=20))
key_value = key_value[:10] + ':' + key_value[10:]
metric_tags.append(key_value)
if random.random() < args.multivalue_pack_probability:
num_packed_values = random.randint(args.min_multivalue, args.max_multivalue)
for _ in range(num_packed_values):
metric_values.append(random.randint(-sys.maxsize - 1, sys.maxsize))
message = DogStatsDMessage(metric_name, metric_values, metric_type, metric_tags)
messages.append(message)
return messages
def extend_dataset(messages, args):
extended_messages = messages.copy() # start with existing messages
serialized_length = sum(sys.getsizeof(str(message)) for message in extended_messages)
while serialized_length < args.data_size_in_bytes:
# Select a random message to clone
clone_subject = random.choice(extended_messages)
# Generate a new metric values
new_metric_values = [random.randint(-sys.maxsize - 1, sys.maxsize)]
if random.random() < args.multivalue_pack_probability:
num_packed_values = random.randint(args.min_multivalue, args.max_multivalue)
for _ in range(num_packed_values):
new_metric_values.append(random.randint(-sys.maxsize - 1, sys.maxsize))
# Create a new message with the same metric_name, metric_tags, and metric_type, but a new metric_value
new_message = DogStatsDMessage(clone_subject.metric_name, new_metric_values,
clone_subject.metric_type, clone_subject.metric_tags)
# Check if adding the new message would exceed args.data_size_in_bytes
if serialized_length + sys.getsizeof(str(new_message)) > args.data_size_in_bytes:
break
# Add the new message and update the serialized length
extended_messages.append(new_message)
serialized_length += sys.getsizeof(str(new_message))
return extended_messages
def main():
parser = argparse.ArgumentParser(description='Generate dogstatsd messages.')
parser.add_argument('--min_num_contexts', type=int, default=500,
help='Minimum number of contexts')
parser.add_argument('--max_num_contexts', type=int, default=10000,
help='Maximum number of contexts')
parser.add_argument('--rng_seed', type=int, default=42,
help='RNG seed for pseudo-random generation')
parser.add_argument('--min_tags_per_msg', type=int, default=2,
help='Minimum tags per message')
parser.add_argument('--max_tags_per_msg', type=int, default=50,
help='Maximum tags per message')
parser.add_argument('--multivalue_pack_probability', type=float, default=0.1,
help='Probability of packing multiple values')
parser.add_argument('--min_multivalue', type=int, default=2,
help='Minimum multivalue')
parser.add_argument('--max_multivalue', type=int, default=50,
help='Maximum multivalue')
parser.add_argument('--data_size_in_bytes', type=int, default=50 * 1024 * 1024, # 50MB
help='Data size in bytes')
parser.add_argument('--print', action='store_true',
help='Print the messages instead of writing to a file')
parser.add_argument('--unix', type=str,
help='Print the messages instead of writing to a file')
args = parser.parse_args()
msgs = generate_dogstatsd_msgs(args)
# msgs now contains data with the specified number of contexts
# Lets extend it to fill up the entire desired data size in bytes
msgs = extend_dataset(msgs, args)
if args.print:
for msg in msgs:
print(msg)
elif args.unix:
send_to_socket(args.unix, msgs)
else:
with open('dogstatsd_msgs.txt', 'w') as f:
f.write('\n'.join([str(msg) for msg in msgs]))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment