Created
October 17, 2025 17:34
-
-
Save joswr1ght/616c3de93c69c251679cca7ca9a1dac4 to your computer and use it in GitHub Desktop.
Generate a visual of network activity using Matplotlib
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # /// script | |
| # dependencies = [ | |
| # "matplotlib", | |
| # "numpy", | |
| # ] | |
| # /// | |
| """ | |
| =============================================== | |
| Network Activity Timeline from CSV Data | |
| =============================================== | |
| Creates a timeline visualization of network activity based on CSV data | |
| with datetime and IP address columns. Activity scaling is based on | |
| the time deltas between events. | |
| Here is a sample CSV file to show the input data format: | |
| $ cat events.csv | |
| "2025-03-19 16:46:11",172.16.42.103->W1GA | |
| "2025-03-19 14:46:22",172.16.42.103->LOLC | |
| "2025-03-19 16:48:39",172.16.42.105->W1GA | |
| "2025-03-19 15:16:13",172.16.42.105->LOLC | |
| "2025-03-19 16:50:23",172.16.42.107->W1GA | |
| "2025-03-19 16:55:19",172.16.42.108->W1GA | |
| "2025-03-19 16:57:04",172.16.42.109->W1GA | |
| "2025-03-19 16:10:48",172.16.42.2->W1GA | |
| "2025-03-19 16:38:15",172.16.42.3->W1GA | |
| """ | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import csv | |
| from datetime import datetime | |
| import argparse | |
| import sys | |
| def parse_arguments(): | |
| parser = argparse.ArgumentParser(description='Generate network activity timeline from CSV') | |
| parser.add_argument('csv_file', help='CSV file with datetime,IP columns') | |
| return parser.parse_args() | |
| def calculate_activity_scale(timestamps): | |
| """Calculate activity scaling based on time deltas between events""" | |
| if len(timestamps) < 2: | |
| return [1.0] * len(timestamps) | |
| # Calculate time deltas in seconds | |
| deltas = [] | |
| for i in range(len(timestamps)): | |
| if i == 0: | |
| # First event - use delta to next event | |
| if len(timestamps) > 1: | |
| delta = (timestamps[i+1] - timestamps[i]).total_seconds() | |
| else: | |
| delta = 1.0 | |
| elif i == len(timestamps) - 1: | |
| # Last event - use delta from previous event | |
| delta = (timestamps[i] - timestamps[i-1]).total_seconds() | |
| else: | |
| # Middle events - use average of deltas to previous and next | |
| delta_prev = (timestamps[i] - timestamps[i-1]).total_seconds() | |
| delta_next = (timestamps[i+1] - timestamps[i]).total_seconds() | |
| delta = (delta_prev + delta_next) / 2 | |
| deltas.append(max(delta, 1.0)) # Minimum delta of 1 second | |
| # Normalize deltas to create scaling factors (inverse relationship) | |
| # Shorter deltas = higher activity = larger markers | |
| max_delta = max(deltas) | |
| scales = [max_delta / delta for delta in deltas] | |
| # Normalize to reasonable marker sizes (50-300) | |
| min_scale, max_scale = min(scales), max(scales) | |
| if max_scale > min_scale: | |
| normalized_scales = [50 + 250 * (s - min_scale) / (max_scale - min_scale) for s in scales] | |
| else: | |
| normalized_scales = [150] * len(scales) # All same size if no variation | |
| return normalized_scales | |
| def position_labels_to_avoid_overlap(dates, ips, levels_count=6): | |
| """Position IP labels to minimize overlaps""" | |
| n_items = len(dates) | |
| levels = np.tile(np.array([-5, 5, -3, 3, -1, 1]), (n_items // levels_count + 1))[:n_items] | |
| # Sort by date to handle chronologically | |
| sorted_indices = sorted(range(len(dates)), key=lambda i: dates[i]) | |
| # Adjust levels to minimize overlaps | |
| final_levels = [0] * n_items | |
| for i, idx in enumerate(sorted_indices): | |
| final_levels[idx] = levels[i % levels_count] | |
| return final_levels | |
| def main(): | |
| args = parse_arguments() | |
| try: | |
| # Read CSV file | |
| dates = [] | |
| ips = [] | |
| with open(args.csv_file, 'r') as csvfile: | |
| reader = csv.reader(csvfile) | |
| for row in reader: | |
| if len(row) >= 2: | |
| datetime_str = row[0].strip().strip('"') | |
| ip_str = row[1].strip().strip('"') | |
| # Parse datetime | |
| dt = datetime.strptime(datetime_str, '%Y-%m-%d %H:%M:%S') | |
| dates.append(dt) | |
| ips.append(ip_str) | |
| # Sort by datetime | |
| combined = list(zip(dates, ips)) | |
| combined.sort(key=lambda x: x[0]) | |
| dates, ips = zip(*combined) if combined else ([], []) | |
| if not dates: | |
| print("No data found in CSV file") | |
| sys.exit(1) | |
| # Calculate activity scaling | |
| scales = calculate_activity_scale(dates) | |
| # Position labels to avoid overlaps | |
| levels = position_labels_to_avoid_overlap(dates, ips) | |
| # Create the plot | |
| fig, ax = plt.subplots(figsize=(16, 10)) | |
| # Recreate the base timeline | |
| start = min(dates) | |
| stop = max(dates) | |
| ax.plot((start, stop), (0, 0), 'k', alpha=0.5, linewidth=2) | |
| # Plot each activity point again | |
| for i, (ip, date, scale, level) in enumerate(zip(ips, dates, scales, levels)): | |
| vert = 'top' if level < 0 else 'bottom' | |
| # Plot activity marker (size based on activity scale) | |
| ax.scatter(date, 0, s=scale, facecolor='lightblue', edgecolor='darkblue', | |
| zorder=9999, alpha=0.7) | |
| # Plot line to label | |
| ax.plot((date, date), (0, level), c='red', alpha=0.7, linewidth=1) | |
| # Add IP label with background | |
| ax.text(date, level, ip, | |
| horizontalalignment='center', verticalalignment=vert, | |
| fontsize=10, backgroundcolor=(1., 1., 1., 0.8), | |
| bbox=dict(boxstyle="round,pad=0.3", facecolor='white', alpha=0.8)) | |
| # Set title and labels | |
| ax.set_title(f"Network Activity Timeline - {len(dates)} Events", fontsize=16, pad=20) | |
| ax.set_ylabel("Activity Events", fontsize=12) | |
| # Set manual ticks with exact timestamps | |
| ax.set_xticks(dates) | |
| ax.set_xticklabels([date.strftime("%H:%M:%S") for date in dates], rotation=45, ha='right', fontsize=8) | |
| # Add margins (5% of total time range on each side) | |
| time_range = (max(dates) - min(dates)).total_seconds() | |
| margin_seconds = max(time_range * 0.05, 60) # At least 1 minute margin | |
| from datetime import timedelta | |
| margin_delta = timedelta(seconds=margin_seconds) | |
| ax.set_xlim(min(dates) - margin_delta, max(dates) + margin_delta) | |
| # Clean up the plot | |
| plt.setp((ax.get_yticklabels() + ax.get_yticklines() + | |
| [ax.spines['left'], ax.spines['right'], ax.spines['top']]), visible=False) | |
| # Add grid for better readability | |
| ax.grid(True, alpha=0.3) | |
| plt.tight_layout() | |
| # Save as PNG for debugging | |
| output_file = 'network_timeline.png' | |
| plt.savefig(output_file, dpi=150, bbox_inches='tight') | |
| print(f"Timeline saved to {output_file}") | |
| # Also show the plot | |
| plt.show() | |
| except FileNotFoundError: | |
| print(f"Error: File '{args.csv_file}' not found") | |
| sys.exit(1) | |
| except Exception as e: | |
| print(f"Error processing file: {e}") | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment