Last active
August 10, 2020 01:03
-
-
Save martinsotir/ff2d8733e0baaa8b70ee67facb9205cd to your computer and use it in GitHub Desktop.
Loading Chrome devtoools heap snapshots in Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
2 helper classes that can load Google Chrome Heap Snapshots files. | |
Both implementations are experimentals, only tested on one example. | |
""" | |
import collections | |
import json | |
from typing import Dict, Any | |
import numpy as np | |
import pandas as pd | |
from tqdm import tqdm | |
# import orjson # not faster | |
# import rapidjson # not faster | |
# import ujson # slightly faster | |
# import hyperjson # not faster | |
# import simplejson # not faster | |
#################################### | |
# Implementation #1 based on pandas | |
#################################### | |
# Usage: | |
# | |
# ```python | |
# | |
# snapshot = PandasHeapSnapshotLoader("session.heapsnapshot") | |
# | |
# snapshot.nodes_df | |
# snapshot.edges_df | |
# | |
# node_idx = int(snapshot.id_node_lookup.loc[12345678]) | |
# | |
# child_edges, child_nodes = snapshot.get_child_nodes(node_idx) | |
# | |
# children_of_children = snapshot.get_children_for_all_nodes(child_nodes) | |
# ```` | |
class PandasHeapSnapshotLoader(): | |
"""Basic heapsnaphot reader based on pandas dataframe. | |
Useful data analys on edges and notes tables but pandas is slow | |
at graph operations (e.g. retrived second order children). | |
""" | |
def _load_heap_snapshot_array(self, data: Dict[str, Any], elem_type: str) -> pd.DataFrame: | |
nodes_fields = data['snapshot']['meta'][f'{elem_type}_fields'] | |
table = pd.DataFrame(np.array(data[f'{elem_type}s']).reshape(-1, len(nodes_fields)), columns=nodes_fields) | |
types = data['snapshot']['meta'][f'{elem_type}_types'] | |
strings = pd.Series(data['strings']) | |
for col in table.columns: | |
if types[nodes_fields.index(col)] == 'string': | |
table[col] = table[col].map(strings) | |
if 'type' in table.columns: | |
table['type'] = pd.Categorical.from_codes(table['type'], categories=types[nodes_fields.index('type')]) | |
if elem_type == 'edge': | |
table['edge_name'] = table['name_or_index'].map(strings) | |
no_name_mask = (table['type'] == 'hidden') | (table['type'] == 'element') | |
table.loc[no_name_mask, 'edge_name'] = table.loc[no_name_mask, 'name_or_index'].astype(str) | |
return table | |
def __init__(self, filepath): | |
print(f"Reading dump '{filepath}'...") | |
with open(filepath) as fp: | |
self.data = json.load(fp) | |
print(f"Parsing nodes...") | |
self.nodes_df = self._load_heap_snapshot_array(self.data, 'node') | |
print(f"Parsing edges...") | |
self.edges_df = self._load_heap_snapshot_array(self.data, 'edge') | |
self.edges_df['to_node'] = self.edges_df['to_node'] // len(self.data['snapshot']['meta'][f'node_fields']) | |
print(f"Creating node edge lookup...") | |
self.nodes_df['edge_idx_start'] = 0 | |
self.nodes_df['edge_idx_end'] = self.nodes_df['edge_count'].cumsum().astype(int) | |
self.nodes_df.loc[:, 'edge_idx_start'].iloc[1:] = self.nodes_df['edge_idx_end'].iloc[:-1].values | |
print(f"Creating node id lookup") | |
self.id_node_lookup = self.nodes_df['id'].reset_index().set_index('id').sort_index() | |
def get_child_nodes(self, node_index): | |
parent = self.nodes_df.loc[node_index] | |
child_edges = self.edges_df.loc[parent['edge_idx_start']:parent['edge_idx_end'] - 1, ['edge_name', 'type', 'to_node']] | |
child_nodes = self.nodes_df.iloc[child_edges['to_node'].values] | |
return child_edges, child_nodes | |
def get_children_for_all_nodes(self, nodes): | |
edges_ranges = nodes[['edge_idx_start', 'edge_idx_end']].apply(lambda x: range(x[0], x[1]), axis=1) | |
nodes_exploded = nodes.assign(edge_idx=edges_ranges).explode('edge_idx').drop(columns=['edge_idx_start', 'edge_idx_end']) | |
nodes_with_edges = pd.merge(nodes_exploded, self.edges_df[['edge_name', 'type', 'to_node']], | |
how='left', left_on='edge_idx', suffixes=('_node', ''), right_index=True) | |
return pd.merge(nodes_with_edges[['edge_name', 'type', 'to_node', 'id']].rename(columns={'id': 'parent_id'}), | |
self.nodes_df, how='left', left_on='to_node', right_index=True, suffixes=('_edge', '')).drop(columns=['to_node']) | |
########################################### | |
# Implementation #2 based on vanilla python | |
########################################### | |
# Usage: | |
# | |
# ```python | |
# | |
# snapshot = PythonHeapSnapshotLoader("session.heapsnapshot") | |
# | |
# snapshot.nodes[1234] | |
# snapshot.edges[1234] | |
# snapshot.locations[1234] | |
# | |
# node_idx = snapshot.id_lookup[12345678] | |
# | |
# node_edges = snapshot.get_edges_for_node(node_idx) | |
# | |
# children = snapshot.get_child_nodes(node_idx) | |
# ```` | |
class PythonHeapSnapshotLoader(): | |
"""Python based Chrome devtool Heap Snashot Reader. | |
This loader is slower than the pandas loader for the initial loader but | |
it is simpler to 'hack' and implement custom queries. | |
Attributes | |
----------- | |
data : Dict[str, Any] | |
Raw json datasctrure | |
nodes: PythonHeapSnapshotLoader.HeapSnapshotArray | |
edges: PythonHeapSnapshotLoader.HeapSnapshotArray | |
locations: PythonHeapSnapshotLoader.HeapSnapshotArray | |
start_indices: List[int] | |
List of first edge indices for each node | |
parent_lookup: Dict[int, Set(int)] | |
Dict mapping to each node a set of parent nodes indices | |
id_lookup: Dict[int, int] | |
Dictionnary mapping nodes ids to nodes indices (in nodes array) | |
""" | |
def __init__(self, filepath=None, data=None): | |
if not data: | |
print(f"\nReading dump '{filepath}'...\n") | |
with open(filepath) as fp: | |
self.data = json.load(fp) | |
else: | |
self.data = data | |
print("\nCreating nodes, edges and locations arrays...\n") | |
self.nodes = self.HeapSnapshotArray(self.data, "node") | |
self.edges = self.HeapSnapshotArray(self.data, "edge") | |
self.locations = self.HeapSnapshotArray(self.data, "location") | |
print("\nCreating node's edges start indices lookup....\n") | |
self.start_indices = self._get_edge_start_indices() | |
print("\nCreating parent node lookup...\n") | |
self.parent_lookup = self._make_parent_node_lookup() | |
print("\nCreating node id lookup...\n") | |
self.id_lookup = self._make_node_id_lookup() | |
def _get_edge_start_indices(self): | |
node_fields = self.data['snapshot']['meta']['node_fields'] | |
edge_fields = self.data['snapshot']['meta']['edge_fields'] | |
nodes_data = self.data['nodes'] | |
nb_nodes = len(nodes_data) // len(node_fields) | |
edge_count_offset = node_fields.index('edge_count') | |
start_indices = np.empty(nb_nodes + 1, dtype=np.int32) | |
next_index = 0 | |
for i in tqdm(range(nb_nodes)): | |
start_indices[i] = next_index | |
next_index += nodes_data[i * len(node_fields) + edge_count_offset] | |
pass | |
start_indices[nb_nodes] = len(self.data['edges']) // len(edge_fields) | |
return start_indices | |
def _make_node_id_lookup(self): | |
node_fields = self.data['snapshot']['meta']['node_fields'] | |
nodes_data = self.data['nodes'] | |
nb_nodes = len(nodes_data) // len(node_fields) | |
id_offset = node_fields.index('id') | |
return {nodes_data[i * len(node_fields) + id_offset]: i for i in tqdm(range(nb_nodes))} | |
def _make_parent_node_lookup(self): | |
node_fields = self.data['snapshot']['meta']['node_fields'] | |
edge_fields = self.data['snapshot']['meta']['edge_fields'] | |
nb_node_fields = len(node_fields) | |
nb_edge_fields = len(edge_fields) | |
edge_data = self.data['edges'] | |
nb_nodes = len(self.data['nodes']) // len(node_fields) | |
edge_to_node_offset = edge_fields.index('to_node') | |
parent_lookup = collections.defaultdict(set) | |
for node_idx in tqdm(range(nb_nodes)): | |
for e_idx in range(self.start_indices[node_idx], self.start_indices[node_idx+1]): | |
child_node = edge_data[e_idx * nb_edge_fields + edge_to_node_offset] // nb_node_fields | |
parent_lookup[child_node].add(node_idx) | |
return parent_lookup | |
def get_edges_for_node(self, node_index): | |
edge_start = self.start_indices[node_index] | |
edge_end = edge_start + self.nodes[node_index]['edge_count'] | |
return [self.edges[i] for i in range(edge_start, edge_end)] | |
def get_child_nodes(self, node_index): | |
node_edges = self.get_edges_for_node(node_index) | |
nb_node_fields = len(self.nodes._fields) | |
return {e['edge_name']: self.nodes[e['to_node'] // nb_node_fields] for e in node_edges} | |
class HeapSnapshotArray(collections.Sequence): | |
def __init__(self, data: Dict[str, Any], elem_type: str): | |
self._fields = data['snapshot']['meta'][f'{elem_type}_fields'] | |
if f'{elem_type}_types' in data['snapshot']['meta']: | |
self._types = data['snapshot']['meta'][f'{elem_type}_types'] | |
else: | |
self._types = None | |
self._elems = data[f'{elem_type}s'] | |
self.elem_type = elem_type | |
self.strings = data['strings'] | |
def __len__(self): | |
return len(self._elems) // len(self._fields) | |
def __getitem__(self, index): | |
if index >= len(self): | |
raise IndexError() | |
values = self._elems[index * len(self._fields) : (index + 1 ) * len(self._fields)] | |
elem = dict(zip(self._fields, values)) | |
if 'type' in elem: | |
elem['type'] = self._types[self._fields.index('type')][elem['type']] | |
if self._types: | |
for field in self._fields: | |
dtype = self._types[self._fields.index(field)] | |
if dtype == 'string': | |
elem[field] = self.strings[elem[field]] | |
if self.elem_type == 'edge': | |
if (elem['type'] != 'hidden') and (elem['type'] != 'element'): | |
elem['edge_name'] = self.strings[elem['name_or_index']] | |
else: | |
elem['edge_name'] = str(elem['name_or_index']) | |
elem['index_'] = index | |
return elem |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment