Skip to content

Instantly share code, notes, and snippets.

@solarpunkin
Created December 1, 2025 06:59
Show Gist options
  • Select an option

  • Save solarpunkin/7e5229bd77b16b935f7c6d2e16697cd0 to your computer and use it in GitHub Desktop.

Select an option

Save solarpunkin/7e5229bd77b16b935f7c6d2e16697cd0 to your computer and use it in GitHub Desktop.
Wi-Fi Self-Similarity & LRD Analysis
import os
import sys
import math
from datetime import datetime
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import pywt
from hurst import compute_Hc
import nolds
from scipy import stats
from scipy.signal import periodogram
from statsmodels.regression.linear_model import OLS
from statsmodels.tools import add_constant
sns.set(style='whitegrid')
FILENAME = '/content/capture.csv' # path in Colab
TIME_COL = 'frame.time_epoch'
LEN_COL = 'frame.len'
MIN_BIN = 0.001 # smallest bin (1 ms)
MAX_BIN = 100.0 # largest bin (100 s)
BIN_MULT = 2 # geometric progression factor (powers of BIN_MULT)
df = pd.read_csv(FILENAME, on_bad_lines='skip')
if TIME_COL not in df.columns or LEN_COL not in df.columns:
raise ValueError(f"Expected columns '{TIME_COL}' and '{LEN_COL}' in CSV. Found: {df.columns.tolist()}")
# Drop null rows
df = df.dropna(subset=[TIME_COL, LEN_COL])
# Convert types
df[TIME_COL] = df[TIME_COL].astype(float)
df[LEN_COL] = df[LEN_COL].astype(int)
# Normalize time to start at 0
t0 = df[TIME_COL].iloc[0]
df['t_rel'] = df[TIME_COL] - t0
TOTAL_TIME = df['t_rel'].iloc[-1]
print(f"Total capture time: {TOTAL_TIME:.2f} seconds")
print(f"Total packets: {len(df):,}")
# Create geometric bin sizes: MIN_BIN * BIN_MULT**k up to <= MAX_BIN
bins = []
b = MIN_BIN
while b <= MAX_BIN:
bins.append(b)
b *= BIN_MULT
print('\nBin sizes (s):', bins)
# Function to get counts per bin for given bin size
def counts_per_bin(times, bin_size):
nbins = int(np.ceil(times[-1] / bin_size))
edges = np.linspace(0, nbins*bin_size, nbins+1)
counts, _ = np.histogram(times, bins=edges)
return counts
counts_dict = {}
bytes_dict = {}
for b in bins:
counts = counts_per_bin(df['t_rel'].values, b)
counts_dict[b] = counts
idx = np.digitize(df['t_rel'].values, np.arange(0, (len(counts)+1)*b, b)) - 1
agg_bytes = np.zeros(len(counts), dtype=np.int64)
for i, L in zip(idx, df[LEN_COL].values):
if 0 <= i < len(agg_bytes):
agg_bytes[i] += L
bytes_dict[b] = agg_bytes
print('\nBuilt time-series for counts and bytes at multiple scales.')
results = []
for b in bins:
counts = counts_dict[b]
# Remove initial zeros to avoid bias when capture didn't start immediately
counts_nonzero = counts # keep zeros; they matter for LRD
try:
H_rs, c_rs, data_rs = compute_Hc(counts_nonzero, kind='random_walk', simplified=True)
except Exception as e:
H_rs = np.nan
# DFA
try:
H_dfa = nolds.dfa(counts_nonzero)
except Exception as e:
H_dfa = np.nan
# Wavelet: variance across detail coefficients
try:
coeffs = pywt.wavedec(counts_nonzero, 'haar', level=None)
variances = [np.var(c) for c in coeffs[1:]]
scales = np.arange(1, len(variances)+1)
slope = np.polyfit(np.log(scales), np.log(variances), 1)[0]
H_wave = (slope + 2) / 2
except Exception:
H_wave = np.nan
results.append({'bin_s': b, 'H_rs': H_rs, 'H_dfa': H_dfa, 'H_wave': H_wave, 'mean_counts': np.mean(counts_nonzero)})
res_df = pd.DataFrame(results)
print('\nH estimates across bin scales:')
print(res_df)
base_bin = bins[0]
base_counts = counts_dict[base_bin]
# compute aggregation factors (powers of 2) up to reasonable limit
max_factor = 2**int(np.floor(np.log2(len(base_counts)/16))) # need ≥16 points
agg_factors = 2**np.arange(0, int(np.log2(max_factor))+1)
vs = []
for m in agg_factors:
L = len(base_counts) - (len(base_counts) % m)
agg = base_counts[:L].reshape(-1, m).sum(axis=1)
vs.append(np.var(agg))
plt.figure(figsize=(6,4))
plt.loglog(agg_factors * base_bin, vs, 'o-')
plt.xlabel('aggregation window (s)')
plt.ylabel('variance')
plt.title(f'Variance-time plot (base bin={base_bin}s)')
plt.grid(True, which='both')
plt.show()
def RS_series(ts):
n = len(ts)
rs = []
ns = []
for k in [16,32,64,128,256,512,1024]:
if k >= n:
break
arr = ts[:(n//k)*k]
arr = arr.reshape(-1, k)
R_over_S = []
for row in arr:
Y = np.cumsum(row - np.mean(row))
R = np.max(Y) - np.min(Y)
S = np.std(row, ddof=1)
if S > 0:
R_over_S.append(R / S)
if len(R_over_S) > 0:
rs.append(np.mean(R_over_S))
ns.append(k)
return np.array(ns), np.array(rs)
ns, rsvals = RS_series(base_counts)
plt.figure(figsize=(6,4))
plt.loglog(ns * base_bin, rsvals, 'o-')
plt.xlabel('segment size (s)')
plt.ylabel('R/S')
plt.title('R/S plot (base counts)')
plt.grid(True)
plt.show()
coeffs = pywt.wavedec(base_counts, 'haar', level=None)
variances = [np.var(c) for c in coeffs[1:]]
scales = np.arange(1, len(variances)+1)
plt.figure(figsize=(6,4))
plt.plot(np.log2(scales * base_bin), np.log2(variances), 'o-')
plt.xlabel('log2(scale s)')
plt.ylabel('log2(variance)')
plt.title('Wavelet logscale diagram')
plt.grid(True)
plt.show()
cols = []
if 'ip.src' in df.columns and 'ip.dst' in df.columns:
df['ip_src_s'] = df['ip.src'].astype(str)
df['ip_dst_s'] = df['ip.dst'].astype(str)
cols += ['ip_src_s','ip_dst_s']
if 'tcp.srcport' in df.columns and 'tcp.dstport' in df.columns:
df['tcp.srcport'] = df['tcp.srcport'].fillna(0).astype(int)
df['tcp.dstport'] = df['tcp.dstport'].fillna(0).astype(int)
cols += ['tcp.srcport','tcp.dstport']
if len(cols) >= 2:
df['flow_id'] = df[cols].astype(str).agg('-'.join, axis=1)
else:
df['flow_id'] = 'singlehost'
flows = df.groupby('flow_id').agg(bytes=('frame.len','sum'), start=('t_rel','min'), end=('t_rel','max'))
flows['dur'] = flows['end'] - flows['start']
sizes = flows['bytes'].values
sizes_sorted = np.sort(sizes)
ccdf = 1.0 - np.arange(len(sizes_sorted))/len(sizes_sorted)
plt.figure(figsize=(6,4))
plt.loglog(sizes_sorted[sizes_sorted>0], ccdf[sizes_sorted>0], '.')
plt.xlabel('flow size (bytes)')
plt.ylabel('CCDF')
plt.title('Flow size CCDF')
plt.grid(True)
plt.show()
choose_bin = None
for b in bins:
if 0.05 <= b <= 0.2:
choose_bin = b
break
if choose_bin is None:
choose_bin = bins[len(bins)//2]
series = bytes_dict[choose_bin]
# detrend
series_d = series - np.mean(series)
# periodogram
f, Pxx = periodogram(series_d)
valid = (f > 0) & (f < 0.1)
slope, intercept, r_val, p_val, std_err = stats.linregress(np.log(f[valid]), np.log(Pxx[valid]))
# power spectral density ~ f^{-beta}, H = (beta+1)/2
beta = -slope
H_spec = (beta + 1) / 2
print(f"Spectral slope beta={beta:.3f}, H_spec={H_spec:.3f}")
plt.figure(figsize=(6,4))
plt.loglog(f[1:], Pxx[1:])
plt.xlabel('frequency (Hz)')
plt.ylabel('PSD')
plt.title('Periodogram (log-log)')
plt.grid(True)
plt.show()
out = {
'capture_file': FILENAME,
'total_packets': int(len(df)),
'capture_seconds': float(TOTAL_TIME),
'hurst_table': res_df.to_dict(orient='records'),
'spectral_H': float(H_spec)
}
import json
with open('analysis_summary.json','w') as f:
json.dump(out, f, indent=2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment