Skip to content

Instantly share code, notes, and snippets.

@martinsotir
Last active August 10, 2020 01:03
Show Gist options
  • Save martinsotir/ff2d8733e0baaa8b70ee67facb9205cd to your computer and use it in GitHub Desktop.
Save martinsotir/ff2d8733e0baaa8b70ee67facb9205cd to your computer and use it in GitHub Desktop.
Loading Chrome devtoools heap snapshots in Python
"""
2 helper classes that can load Google Chrome Heap Snapshots files.
Both implementations are experimentals, only tested on one example.
"""
import collections
import json
from typing import Dict, Any
import numpy as np
import pandas as pd
from tqdm import tqdm
# import orjson # not faster
# import rapidjson # not faster
# import ujson # slightly faster
# import hyperjson # not faster
# import simplejson # not faster
####################################
# Implementation #1 based on pandas
####################################
# Usage:
#
# ```python
#
# snapshot = PandasHeapSnapshotLoader("session.heapsnapshot")
#
# snapshot.nodes_df
# snapshot.edges_df
#
# node_idx = int(snapshot.id_node_lookup.loc[12345678])
#
# child_edges, child_nodes = snapshot.get_child_nodes(node_idx)
#
# children_of_children = snapshot.get_children_for_all_nodes(child_nodes)
# ````
class PandasHeapSnapshotLoader():
"""Basic heapsnaphot reader based on pandas dataframe.
Useful data analys on edges and notes tables but pandas is slow
at graph operations (e.g. retrived second order children).
"""
def _load_heap_snapshot_array(self, data: Dict[str, Any], elem_type: str) -> pd.DataFrame:
nodes_fields = data['snapshot']['meta'][f'{elem_type}_fields']
table = pd.DataFrame(np.array(data[f'{elem_type}s']).reshape(-1, len(nodes_fields)), columns=nodes_fields)
types = data['snapshot']['meta'][f'{elem_type}_types']
strings = pd.Series(data['strings'])
for col in table.columns:
if types[nodes_fields.index(col)] == 'string':
table[col] = table[col].map(strings)
if 'type' in table.columns:
table['type'] = pd.Categorical.from_codes(table['type'], categories=types[nodes_fields.index('type')])
if elem_type == 'edge':
table['edge_name'] = table['name_or_index'].map(strings)
no_name_mask = (table['type'] == 'hidden') | (table['type'] == 'element')
table.loc[no_name_mask, 'edge_name'] = table.loc[no_name_mask, 'name_or_index'].astype(str)
return table
def __init__(self, filepath):
print(f"Reading dump '{filepath}'...")
with open(filepath) as fp:
self.data = json.load(fp)
print(f"Parsing nodes...")
self.nodes_df = self._load_heap_snapshot_array(self.data, 'node')
print(f"Parsing edges...")
self.edges_df = self._load_heap_snapshot_array(self.data, 'edge')
self.edges_df['to_node'] = self.edges_df['to_node'] // len(self.data['snapshot']['meta'][f'node_fields'])
print(f"Creating node edge lookup...")
self.nodes_df['edge_idx_start'] = 0
self.nodes_df['edge_idx_end'] = self.nodes_df['edge_count'].cumsum().astype(int)
self.nodes_df.loc[:, 'edge_idx_start'].iloc[1:] = self.nodes_df['edge_idx_end'].iloc[:-1].values
print(f"Creating node id lookup")
self.id_node_lookup = self.nodes_df['id'].reset_index().set_index('id').sort_index()
def get_child_nodes(self, node_index):
parent = self.nodes_df.loc[node_index]
child_edges = self.edges_df.loc[parent['edge_idx_start']:parent['edge_idx_end'] - 1, ['edge_name', 'type', 'to_node']]
child_nodes = self.nodes_df.iloc[child_edges['to_node'].values]
return child_edges, child_nodes
def get_children_for_all_nodes(self, nodes):
edges_ranges = nodes[['edge_idx_start', 'edge_idx_end']].apply(lambda x: range(x[0], x[1]), axis=1)
nodes_exploded = nodes.assign(edge_idx=edges_ranges).explode('edge_idx').drop(columns=['edge_idx_start', 'edge_idx_end'])
nodes_with_edges = pd.merge(nodes_exploded, self.edges_df[['edge_name', 'type', 'to_node']],
how='left', left_on='edge_idx', suffixes=('_node', ''), right_index=True)
return pd.merge(nodes_with_edges[['edge_name', 'type', 'to_node', 'id']].rename(columns={'id': 'parent_id'}),
self.nodes_df, how='left', left_on='to_node', right_index=True, suffixes=('_edge', '')).drop(columns=['to_node'])
###########################################
# Implementation #2 based on vanilla python
###########################################
# Usage:
#
# ```python
#
# snapshot = PythonHeapSnapshotLoader("session.heapsnapshot")
#
# snapshot.nodes[1234]
# snapshot.edges[1234]
# snapshot.locations[1234]
#
# node_idx = snapshot.id_lookup[12345678]
#
# node_edges = snapshot.get_edges_for_node(node_idx)
#
# children = snapshot.get_child_nodes(node_idx)
# ````
class PythonHeapSnapshotLoader():
"""Python based Chrome devtool Heap Snashot Reader.
This loader is slower than the pandas loader for the initial loader but
it is simpler to 'hack' and implement custom queries.
Attributes
-----------
data : Dict[str, Any]
Raw json datasctrure
nodes: PythonHeapSnapshotLoader.HeapSnapshotArray
edges: PythonHeapSnapshotLoader.HeapSnapshotArray
locations: PythonHeapSnapshotLoader.HeapSnapshotArray
start_indices: List[int]
List of first edge indices for each node
parent_lookup: Dict[int, Set(int)]
Dict mapping to each node a set of parent nodes indices
id_lookup: Dict[int, int]
Dictionnary mapping nodes ids to nodes indices (in nodes array)
"""
def __init__(self, filepath=None, data=None):
if not data:
print(f"\nReading dump '{filepath}'...\n")
with open(filepath) as fp:
self.data = json.load(fp)
else:
self.data = data
print("\nCreating nodes, edges and locations arrays...\n")
self.nodes = self.HeapSnapshotArray(self.data, "node")
self.edges = self.HeapSnapshotArray(self.data, "edge")
self.locations = self.HeapSnapshotArray(self.data, "location")
print("\nCreating node's edges start indices lookup....\n")
self.start_indices = self._get_edge_start_indices()
print("\nCreating parent node lookup...\n")
self.parent_lookup = self._make_parent_node_lookup()
print("\nCreating node id lookup...\n")
self.id_lookup = self._make_node_id_lookup()
def _get_edge_start_indices(self):
node_fields = self.data['snapshot']['meta']['node_fields']
edge_fields = self.data['snapshot']['meta']['edge_fields']
nodes_data = self.data['nodes']
nb_nodes = len(nodes_data) // len(node_fields)
edge_count_offset = node_fields.index('edge_count')
start_indices = np.empty(nb_nodes + 1, dtype=np.int32)
next_index = 0
for i in tqdm(range(nb_nodes)):
start_indices[i] = next_index
next_index += nodes_data[i * len(node_fields) + edge_count_offset]
pass
start_indices[nb_nodes] = len(self.data['edges']) // len(edge_fields)
return start_indices
def _make_node_id_lookup(self):
node_fields = self.data['snapshot']['meta']['node_fields']
nodes_data = self.data['nodes']
nb_nodes = len(nodes_data) // len(node_fields)
id_offset = node_fields.index('id')
return {nodes_data[i * len(node_fields) + id_offset]: i for i in tqdm(range(nb_nodes))}
def _make_parent_node_lookup(self):
node_fields = self.data['snapshot']['meta']['node_fields']
edge_fields = self.data['snapshot']['meta']['edge_fields']
nb_node_fields = len(node_fields)
nb_edge_fields = len(edge_fields)
edge_data = self.data['edges']
nb_nodes = len(self.data['nodes']) // len(node_fields)
edge_to_node_offset = edge_fields.index('to_node')
parent_lookup = collections.defaultdict(set)
for node_idx in tqdm(range(nb_nodes)):
for e_idx in range(self.start_indices[node_idx], self.start_indices[node_idx+1]):
child_node = edge_data[e_idx * nb_edge_fields + edge_to_node_offset] // nb_node_fields
parent_lookup[child_node].add(node_idx)
return parent_lookup
def get_edges_for_node(self, node_index):
edge_start = self.start_indices[node_index]
edge_end = edge_start + self.nodes[node_index]['edge_count']
return [self.edges[i] for i in range(edge_start, edge_end)]
def get_child_nodes(self, node_index):
node_edges = self.get_edges_for_node(node_index)
nb_node_fields = len(self.nodes._fields)
return {e['edge_name']: self.nodes[e['to_node'] // nb_node_fields] for e in node_edges}
class HeapSnapshotArray(collections.Sequence):
def __init__(self, data: Dict[str, Any], elem_type: str):
self._fields = data['snapshot']['meta'][f'{elem_type}_fields']
if f'{elem_type}_types' in data['snapshot']['meta']:
self._types = data['snapshot']['meta'][f'{elem_type}_types']
else:
self._types = None
self._elems = data[f'{elem_type}s']
self.elem_type = elem_type
self.strings = data['strings']
def __len__(self):
return len(self._elems) // len(self._fields)
def __getitem__(self, index):
if index >= len(self):
raise IndexError()
values = self._elems[index * len(self._fields) : (index + 1 ) * len(self._fields)]
elem = dict(zip(self._fields, values))
if 'type' in elem:
elem['type'] = self._types[self._fields.index('type')][elem['type']]
if self._types:
for field in self._fields:
dtype = self._types[self._fields.index(field)]
if dtype == 'string':
elem[field] = self.strings[elem[field]]
if self.elem_type == 'edge':
if (elem['type'] != 'hidden') and (elem['type'] != 'element'):
elem['edge_name'] = self.strings[elem['name_or_index']]
else:
elem['edge_name'] = str(elem['name_or_index'])
elem['index_'] = index
return elem
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment