Skip to content

Instantly share code, notes, and snippets.

@joswr1ght
Created October 17, 2025 17:34
Show Gist options
  • Select an option

  • Save joswr1ght/616c3de93c69c251679cca7ca9a1dac4 to your computer and use it in GitHub Desktop.

Select an option

Save joswr1ght/616c3de93c69c251679cca7ca9a1dac4 to your computer and use it in GitHub Desktop.
Generate a visual of network activity using Matplotlib
#!/usr/bin/env python3
# /// script
# dependencies = [
# "matplotlib",
# "numpy",
# ]
# ///
"""
===============================================
Network Activity Timeline from CSV Data
===============================================
Creates a timeline visualization of network activity based on CSV data
with datetime and IP address columns. Activity scaling is based on
the time deltas between events.
Here is a sample CSV file to show the input data format:
$ cat events.csv
"2025-03-19 16:46:11",172.16.42.103->W1GA
"2025-03-19 14:46:22",172.16.42.103->LOLC
"2025-03-19 16:48:39",172.16.42.105->W1GA
"2025-03-19 15:16:13",172.16.42.105->LOLC
"2025-03-19 16:50:23",172.16.42.107->W1GA
"2025-03-19 16:55:19",172.16.42.108->W1GA
"2025-03-19 16:57:04",172.16.42.109->W1GA
"2025-03-19 16:10:48",172.16.42.2->W1GA
"2025-03-19 16:38:15",172.16.42.3->W1GA
"""
import matplotlib.pyplot as plt
import numpy as np
import csv
from datetime import datetime
import argparse
import sys
def parse_arguments():
parser = argparse.ArgumentParser(description='Generate network activity timeline from CSV')
parser.add_argument('csv_file', help='CSV file with datetime,IP columns')
return parser.parse_args()
def calculate_activity_scale(timestamps):
"""Calculate activity scaling based on time deltas between events"""
if len(timestamps) < 2:
return [1.0] * len(timestamps)
# Calculate time deltas in seconds
deltas = []
for i in range(len(timestamps)):
if i == 0:
# First event - use delta to next event
if len(timestamps) > 1:
delta = (timestamps[i+1] - timestamps[i]).total_seconds()
else:
delta = 1.0
elif i == len(timestamps) - 1:
# Last event - use delta from previous event
delta = (timestamps[i] - timestamps[i-1]).total_seconds()
else:
# Middle events - use average of deltas to previous and next
delta_prev = (timestamps[i] - timestamps[i-1]).total_seconds()
delta_next = (timestamps[i+1] - timestamps[i]).total_seconds()
delta = (delta_prev + delta_next) / 2
deltas.append(max(delta, 1.0)) # Minimum delta of 1 second
# Normalize deltas to create scaling factors (inverse relationship)
# Shorter deltas = higher activity = larger markers
max_delta = max(deltas)
scales = [max_delta / delta for delta in deltas]
# Normalize to reasonable marker sizes (50-300)
min_scale, max_scale = min(scales), max(scales)
if max_scale > min_scale:
normalized_scales = [50 + 250 * (s - min_scale) / (max_scale - min_scale) for s in scales]
else:
normalized_scales = [150] * len(scales) # All same size if no variation
return normalized_scales
def position_labels_to_avoid_overlap(dates, ips, levels_count=6):
"""Position IP labels to minimize overlaps"""
n_items = len(dates)
levels = np.tile(np.array([-5, 5, -3, 3, -1, 1]), (n_items // levels_count + 1))[:n_items]
# Sort by date to handle chronologically
sorted_indices = sorted(range(len(dates)), key=lambda i: dates[i])
# Adjust levels to minimize overlaps
final_levels = [0] * n_items
for i, idx in enumerate(sorted_indices):
final_levels[idx] = levels[i % levels_count]
return final_levels
def main():
args = parse_arguments()
try:
# Read CSV file
dates = []
ips = []
with open(args.csv_file, 'r') as csvfile:
reader = csv.reader(csvfile)
for row in reader:
if len(row) >= 2:
datetime_str = row[0].strip().strip('"')
ip_str = row[1].strip().strip('"')
# Parse datetime
dt = datetime.strptime(datetime_str, '%Y-%m-%d %H:%M:%S')
dates.append(dt)
ips.append(ip_str)
# Sort by datetime
combined = list(zip(dates, ips))
combined.sort(key=lambda x: x[0])
dates, ips = zip(*combined) if combined else ([], [])
if not dates:
print("No data found in CSV file")
sys.exit(1)
# Calculate activity scaling
scales = calculate_activity_scale(dates)
# Position labels to avoid overlaps
levels = position_labels_to_avoid_overlap(dates, ips)
# Create the plot
fig, ax = plt.subplots(figsize=(16, 10))
# Recreate the base timeline
start = min(dates)
stop = max(dates)
ax.plot((start, stop), (0, 0), 'k', alpha=0.5, linewidth=2)
# Plot each activity point again
for i, (ip, date, scale, level) in enumerate(zip(ips, dates, scales, levels)):
vert = 'top' if level < 0 else 'bottom'
# Plot activity marker (size based on activity scale)
ax.scatter(date, 0, s=scale, facecolor='lightblue', edgecolor='darkblue',
zorder=9999, alpha=0.7)
# Plot line to label
ax.plot((date, date), (0, level), c='red', alpha=0.7, linewidth=1)
# Add IP label with background
ax.text(date, level, ip,
horizontalalignment='center', verticalalignment=vert,
fontsize=10, backgroundcolor=(1., 1., 1., 0.8),
bbox=dict(boxstyle="round,pad=0.3", facecolor='white', alpha=0.8))
# Set title and labels
ax.set_title(f"Network Activity Timeline - {len(dates)} Events", fontsize=16, pad=20)
ax.set_ylabel("Activity Events", fontsize=12)
# Set manual ticks with exact timestamps
ax.set_xticks(dates)
ax.set_xticklabels([date.strftime("%H:%M:%S") for date in dates], rotation=45, ha='right', fontsize=8)
# Add margins (5% of total time range on each side)
time_range = (max(dates) - min(dates)).total_seconds()
margin_seconds = max(time_range * 0.05, 60) # At least 1 minute margin
from datetime import timedelta
margin_delta = timedelta(seconds=margin_seconds)
ax.set_xlim(min(dates) - margin_delta, max(dates) + margin_delta)
# Clean up the plot
plt.setp((ax.get_yticklabels() + ax.get_yticklines() +
[ax.spines['left'], ax.spines['right'], ax.spines['top']]), visible=False)
# Add grid for better readability
ax.grid(True, alpha=0.3)
plt.tight_layout()
# Save as PNG for debugging
output_file = 'network_timeline.png'
plt.savefig(output_file, dpi=150, bbox_inches='tight')
print(f"Timeline saved to {output_file}")
# Also show the plot
plt.show()
except FileNotFoundError:
print(f"Error: File '{args.csv_file}' not found")
sys.exit(1)
except Exception as e:
print(f"Error processing file: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment