Last active
May 6, 2024 22:03
-
-
Save danield137/29c2c57e1a04c78bab890b6d52c7830a to your computer and use it in GitHub Desktop.
Analyze NSG flow logs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import abc | |
from collections import defaultdict | |
import json | |
import os | |
import pandas as pd | |
import time | |
from dataclasses import dataclass | |
from typing import Any, Dict, List, Optional, Tuple | |
import requests | |
def process_file(path: str) -> List[Tuple[dict]]: | |
if not path.endswith(".json"): | |
raise ValueError("File must be in JSON format") | |
data_dict = json.load(open(path)) | |
# Flatten the JSON structure and extract flow tuples into a DataFrame | |
flow_data = [] | |
for record in data_dict['records']: | |
time = record['time'] | |
version = record['properties']['Version'] | |
if not version or int(version) != 2: | |
# print(f"Unsupported version: {version}") | |
continue | |
for flow in record['properties']['flows']: | |
rule = flow['rule'] | |
if rule != 'DefaultRule_AllowInternetOutBound': | |
# print(f"Unsupported rule: {rule}") | |
continue | |
for subflow in flow['flows']: | |
for tuple_str in subflow['flowTuples']: | |
tuple_data = tuple_str.split(',') | |
protocol = tuple_data[5] | |
flow = tuple_data[6] | |
decision = tuple_data[7] | |
state = tuple_data[8] | |
# we only care about end of TCP outbound connections that were accepted - because we have the bytes sent | |
if protocol == 'T' and flow == 'O' and decision == 'A' and state == 'E': | |
flow_data.append({ | |
'time': tuple_data[0], | |
'source_ip': tuple_data[1], | |
'dest_ip': tuple_data[2], | |
'source_port': int(tuple_data[3]), | |
'dest_port': int(tuple_data[4]), | |
'protocol': tuple_data[5], | |
'traffic_flow': tuple_data[6], | |
'traffic_decision': tuple_data[7], | |
'flow_state': tuple_data[8], | |
'packets_sent': int(tuple_data[9]) if tuple_data[9].isdigit() else 0, | |
'bytes_sent': int(tuple_data[10]) if tuple_data[10].isdigit() else 0, | |
'packets_received': int(tuple_data[11]) if tuple_data[11].isdigit() else 0, | |
'bytes_received': int(tuple_data[12]) if tuple_data[12].isdigit() else 0 | |
}) | |
return flow_data | |
def format_bytes(value: float, *args: List[Any], **kwargs: Dict[str, Any]) -> str: | |
""" | |
Converts a byte value into a human-readable format with the appropriate units. | |
:param value: The size in bytes to be converted. | |
:return: A human-readable string representing the size. | |
""" | |
units = ["B", "KB", "MB", "GB", "TB", "PB"] | |
size = float(value) | |
for unit in units: | |
if size < 1024: | |
return f"{size:.4f} {unit}" | |
size /= 1024.0 | |
return f"{size:.4f} {units[-1]}" | |
@dataclass | |
class IpGeo: | |
status: str | |
continent: Optional[str] | |
country: Optional[str] | |
region: Optional[str] | |
INTER_CONTINENTAL_COSTS_PER_GB = { | |
"NA": 0.05, | |
"EU": 0.05, | |
"AS": 0.08, | |
"OC": 0.08, | |
"AF": 0.08, | |
"SA": 0.16, | |
} | |
INTRA_CONTINENTAL_COSTS_PER_GB = { | |
"NA": 0.02, | |
"EU": 0.02, | |
"AS": 0.08, | |
"OC": 0.08, | |
"ME": 0.08, | |
"AF": 0.08, | |
"SA": 0.16, | |
} | |
BYTES_IN_GB = 1024 * 1024 * 1024 | |
def estimate_cost(home: IpGeo, traffic: pd.DataFrame) -> dict[str, Tuple[float, float]]: | |
""" | |
Given the home location and the traffic data, estimate the cost of the traffic. | |
Returns a dict of {'inter-continental': (existing_cost, predicted_cost), 'intra-continental': (existing_cost, predicted_cost)} | |
""" | |
inter_cost = 0 | |
intra_cost = 0 | |
inter_continental_cost_per_gb = INTER_CONTINENTAL_COSTS_PER_GB[home.continent] | |
intra_continental_cost_per_gb = INTRA_CONTINENTAL_COSTS_PER_GB[home.continent] | |
min_time, max_time = traffic['time'].min(), traffic['time'].max() | |
hours = (max_time - min_time).total_seconds() / 3600 | |
for idx, row in traffic.iterrows(): | |
if row['continent'] != home.continent: | |
inter_cost += row['bytes_sent'] / BYTES_IN_GB * inter_continental_cost_per_gb | |
else: | |
intra_cost += row['bytes_sent'] / BYTES_IN_GB * intra_continental_cost_per_gb | |
daily_inter_cost = inter_cost / hours * 24 | |
daily_intra_cost = intra_cost / hours * 24 | |
return {'inter-continental': (inter_cost, daily_inter_cost), 'intra-continental': (intra_cost, daily_intra_cost)} | |
class IpGeoCache: | |
data: dict[str, IpGeo] | |
def __init__(self): | |
self.data = defaultdict(IpGeo) | |
def __getitem__(self, ip: str) -> IpGeo: | |
return self.data[ip] | |
def load(self) -> bool: | |
if os.path.exists("ip_cache.json"): | |
try: | |
data = json.load(open("ip_cache.json")) | |
self.data = {ip: IpGeo(**data[ip]) for ip in data} | |
return True | |
except Exception as e: | |
... | |
return False | |
def save(self, ips: dict[str, IpGeo]) -> bool: | |
try: | |
json.dump({ip: ips[ip].__dict__ for ip in ips}, open("ip_cache.json", "w")) | |
return True | |
except Exception as e: | |
return False | |
def add_known(self, ip: str, geo: IpGeo) -> None: | |
self.data[ip] = geo | |
def add_unknown(self, ip: str) -> None: | |
self.data[ip] = IpGeo("error", None, None, None) | |
def is_known(self, ip: str) -> bool: | |
return ip in self.data and self.data[ip].status == 'success' | |
def is_unknown(self, ip: str) -> bool: | |
return ip in self.data and self.data[ip].status != 'success' | |
def unknown(self) -> List[str]: | |
return [ip for ip in self.data if self.data[ip].status != 'success'] | |
def all_known(self) -> bool: | |
return all([self.is_known(ip) for ip in self.data]) | |
class GeoService(abc.ABC): | |
@abc.abstractmethod | |
def get_geo_data_batch(self, ips: List[str]) -> Tuple[Dict[str, IpGeo], bool]: | |
pass | |
@abc.abstractmethod | |
def get_geo_data_single(self, ip: str) -> Tuple[IpGeo, bool]: | |
pass | |
class IpApiGeoService(GeoService): | |
cache: IpGeoCache | |
batch_size: int = 100 | |
def __init__(self, cache: IpGeoCache) -> None: | |
super().__init__() | |
self.cache = cache | |
def get_geo_data_batch(self, ips: List[str]) -> Dict[str, IpGeo]: | |
batch_size = 100 | |
results = {} | |
ok = True | |
for ip in ips: | |
batch = [] | |
if self.cache.is_known(ip): | |
results[ip] = self.cache[ip] | |
continue | |
batch.append({"query": ip, "fields": "status,continentCode,countryCode,region,query"}) | |
if len(batch) == batch_size: | |
try: | |
response = requests.post( | |
f"https://ip-api.com/batch?fields=status,continentCode,countryCode,region,query", | |
json=batch) | |
response_json = response.json() | |
for idx, resp in enumerate(response_json): | |
results[batch[idx]['query']] = IpGeo( | |
status=resp['status'], | |
continent=resp['continentCode'], | |
country=resp['countryCode'], | |
region=resp['region'] | |
) | |
except Exception as e: | |
ok = False | |
print(f"Error fetching geo data: {e}") | |
return results, ok | |
def get_geo_data_single(self, ip: str) -> Tuple[IpGeo, bool]: | |
result = self.get_geo_data_batch([ip]) | |
return (result[0][ip], result[1]) | |
class IpApi2GeoService(GeoService): | |
cache: IpGeoCache | |
def __init__(self, cache: IpGeoCache) -> None: | |
super().__init__() | |
self.cache = cache | |
def get_geo_data_batch(self, ips: List[str]) -> Dict[str, IpGeo]: | |
results = {} | |
ok = True | |
for ip in ips: | |
result = self.get_geo_data_single(ip) | |
results[ip] = result[0] | |
ok = ok and result[1] | |
return results, ok | |
def get_geo_data_single(self, ip: str) -> Tuple[IpGeo, bool]: | |
try: | |
if self.cache.is_known(ip): | |
return self.cache[ip], True | |
response = requests.get(f"https://ipapi.co/{ip}/json/") | |
response.raise_for_status() | |
resp_json = response.json() | |
ip_geo = IpGeo( | |
"success", | |
resp_json['continent_code'], | |
resp_json['country_code'], | |
resp_json['region_code'] | |
) | |
self.cache.add_known(ip, ip_geo) | |
return self.cache[ip], True | |
except Exception as e: | |
return IpGeo("error", None, None, None), False | |
def get_geo_data(ips: List[str]) -> Dict[str, IpGeo]: | |
# first collect ips to make a batch request | |
cached_ips = IpGeoCache() | |
cached_ips.load() | |
services = [IpApi2GeoService(cached_ips), IpApiGeoService(cached_ips)] | |
ok = False | |
for service in services: | |
results, ok = service.get_geo_data_batch(ips) | |
if cached_ips.all_known(): | |
break | |
if not ok: | |
raise Exception(f"Some services failed. There are {len(cached_ips.unknown())} unknown IPs. Please review them manually.") | |
cached_ips.save(results) | |
return results | |
def main(): | |
folder = "data" if os.getcwd().endswith("nsg") else "nsg/data" | |
# traverse the directory and process each file | |
# note that the file structure can be nested (h=19\m=32\macAddress=00-0D-3A-13-1E-8E\PT1H.json) | |
# we want to process all "leaf" files | |
df = pd.DataFrame(columns=['time', 'source_ip', 'dest_ip', 'source_port', 'dest_port', 'protocol', 'traffic_flow', 'traffic_decision', 'flow_state', 'packets_sent', 'bytes_sent', 'packets_received', 'bytes_received']) | |
files_processed = 0 | |
print(f"{os.getcwd()}") | |
for root, _, files in os.walk(folder): | |
for file in files: | |
path = os.path.join(root, file) | |
if path.endswith(".json"): | |
data_tuples = process_file(path) | |
print(f"Processed {len(data_tuples)} flow tuples from {path}") | |
df = pd.concat([df, pd.DataFrame(data_tuples)], ignore_index=True) | |
files_processed += 1 | |
print(f"Processed {files_processed} files and {len(df)} flow tuples in total") | |
print(f"total bytes sent: {format_bytes(df['bytes_sent'].sum())}") | |
# summarize all traffic by destination IP | |
dest_ip_summary = df.groupby('dest_ip', as_index=False).agg({ | |
'bytes_sent': 'sum', | |
'bytes_received': 'sum', | |
}) | |
geo_data = get_geo_data(list(dest_ip_summary['dest_ip'].unique())) | |
existing_cost, predicted_cost = estimate_cost(IpGeo("EU", "NL", "CA"), dest_ip_summary) | |
print(f"Existing cost: {existing_cost:.2f} USD") | |
print(f"Predicted daily cost: {predicted_cost:.2f} USD") | |
# extend the dataframe with the geo data (continent, country, region) | |
dest_ip_summary['continent'] = dest_ip_summary['dest_ip'].map(lambda x: geo_data[x]['continentCode'] if geo_data[x]['status'] == 'success' else None) | |
dest_ip_summary['country'] = dest_ip_summary['dest_ip'].map(lambda x: geo_data[x]['countryCode'] if geo_data[x]['status'] == 'success' else None) | |
dest_ip_summary['region'] = dest_ip_summary['dest_ip'].map(lambda x: geo_data[x]['region'] if geo_data[x]['status'] == 'success' else None) | |
# summarize sent data by each of the above (by itself) and print each summary | |
for col in ['continent', 'country', 'region']: | |
print(f"Summary by {col}") | |
print(dest_ip_summary.groupby(col, as_index=False).agg({ | |
'bytes_sent': 'sum', | |
'bytes_received': 'sum', | |
})) | |
if __name__ == "__main__": | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import abc | |
from collections import defaultdict | |
import json | |
import os | |
import pandas as pd | |
import time | |
from dataclasses import dataclass | |
from typing import Any, Dict, List, Optional, Tuple | |
import requests | |
def process_file(path: str) -> List[Tuple[dict]]: | |
if not path.endswith(".json"): | |
raise ValueError("File must be in JSON format") | |
data_dict = json.load(open(path)) | |
print(f"Processing {path} with {len(data_dict['records'])} records") | |
# Flatten the JSON structure and extract flow tuples into a DataFrame | |
flow_data = [] | |
for record in data_dict['records']: | |
time = record['time'] | |
version = record['properties']['Version'] | |
if not version or int(version) != 2: | |
# print(f"Unsupported version: {version}") | |
continue | |
for flow in record['properties']['flows']: | |
rule = flow['rule'] | |
if rule != 'DefaultRule_AllowInternetOutBound': | |
# print(f"Unsupported rule: {rule}") | |
continue | |
for subflow in flow['flows']: | |
for tuple_str in subflow['flowTuples']: | |
tuple_data = tuple_str.split(',') | |
protocol = tuple_data[5] | |
flow = tuple_data[6] | |
decision = tuple_data[7] | |
state = tuple_data[8] | |
# we only care about end of TCP outbound connections that were accepted - because we have the bytes sent | |
if protocol == 'T' and flow == 'O' and decision == 'A' and state == 'E': | |
flow_data.append({ | |
'time': tuple_data[0], | |
'source_ip': tuple_data[1], | |
'dest_ip': tuple_data[2], | |
'source_port': int(tuple_data[3]), | |
'dest_port': int(tuple_data[4]), | |
'protocol': tuple_data[5], | |
'traffic_flow': tuple_data[6], | |
'traffic_decision': tuple_data[7], | |
'flow_state': tuple_data[8], | |
'packets_sent': int(tuple_data[9]) if tuple_data[9].isdigit() else 0, | |
'bytes_sent': int(tuple_data[10]) if tuple_data[10].isdigit() else 0, | |
'packets_received': int(tuple_data[11]) if tuple_data[11].isdigit() else 0, | |
'bytes_received': int(tuple_data[12]) if tuple_data[12].isdigit() else 0 | |
}) | |
return flow_data | |
def format_bytes(value: float, *args: List[Any], **kwargs: Dict[str, Any]) -> str: | |
""" | |
Converts a byte value into a human-readable format with the appropriate units. | |
:param value: The size in bytes to be converted. | |
:return: A human-readable string representing the size. | |
""" | |
units = ["B", "KB", "MB", "GB", "TB", "PB"] | |
size = float(value) | |
for unit in units: | |
if size < 1024: | |
return f"{size:.4f} {unit}" | |
size /= 1024.0 | |
return f"{size:.4f} {units[-1]}" | |
def main(): | |
folder = "data" if os.getcwd().endswith("nsg") else "nsg/data" | |
# traverse the directory and process each file | |
# note that the file structure can be nested (h=19\m=32\macAddress=00-0D-3A-13-1E-8E\PT1H.json) | |
# we want to process all "leaf" files | |
df = pd.DataFrame(columns=['time', 'source_ip', 'dest_ip', 'source_port', 'dest_port', 'protocol', 'traffic_flow', 'traffic_decision', 'flow_state', 'packets_sent', 'bytes_sent', 'packets_received', 'bytes_received']) | |
files_processed = 0 | |
print(f"{os.getcwd()}") | |
for root, _, files in os.walk(folder): | |
for file in files: | |
path = os.path.join(root, file) | |
if path.endswith(".json"): | |
data_tuples = process_file(path) | |
print(f"Processed {len(data_tuples)} flow tuples from {path}") | |
df = pd.concat([df, pd.DataFrame(data_tuples)], ignore_index=True) | |
files_processed += 1 | |
print(f"Processed {files_processed} files and {len(df)} flow tuples in total") | |
print(f"total bytes sent: {format_bytes(df['bytes_sent'].sum())}") | |
df.to_csv("flows.csv", index=False) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment