|
import sys |
|
from copy import deepcopy |
|
from pathlib import Path |
|
from typing import Any, List, Optional, Callable, Dict |
|
from types import FrameType |
|
from collections import Counter |
|
|
|
import pandas as pd |
|
|
|
class FunctionInfo(): |
|
def __init__(self, frame): |
|
self.filepath = frame.f_code.co_filename |
|
self.line = frame.f_lineno |
|
self.function = frame.f_code.co_name |
|
self.filename = Path(self.filepath).name |
|
|
|
def __repr__(self): |
|
return f'File "{self.filepath}", line {self.line}, function <{self.function}>' |
|
|
|
|
|
class FunctionTracebackInfo(): |
|
def __init__( |
|
self, |
|
frame: FrameType, |
|
traceback_info_stops_when_filepath_has_none_of: Optional[List[str]] = None |
|
): |
|
self.bt = [] |
|
frame_up = frame |
|
tb_stop = traceback_info_stops_when_filepath_has_none_of # Rename |
|
tb_stop = tb_stop or [] # Default value |
|
while frame_up is not None: |
|
fn_info = FunctionInfo(frame_up) |
|
if not any([str_required in fn_info.filepath for str_required in tb_stop]): |
|
break |
|
self.bt.append(fn_info) |
|
frame_up = frame_up.f_back |
|
|
|
def __repr__(self): |
|
string = '' |
|
tab = False |
|
for t in self.bt: |
|
if tab: |
|
string += '\t' |
|
else: |
|
tab = True |
|
string += str(t) + '\n' |
|
return string |
|
|
|
def __getitem__(self, i): |
|
return self.bt[i] |
|
|
|
def __len__(self): |
|
return len(self.bt) |
|
|
|
|
|
class CaptureConfig(): |
|
def __init__( |
|
self, |
|
variable_name: str, |
|
variable_rename: Optional[str] = None, |
|
function_name: Optional[str] = None |
|
): |
|
self.variable_name = variable_name |
|
self.variable_rename = variable_rename |
|
self.function_name = function_name |
|
|
|
|
|
class Capture(): |
|
def __init__( |
|
self, |
|
variable_name: str, |
|
function_name: str, |
|
value: Any, |
|
capture_config: CaptureConfig, |
|
variable_name_from: Optional[str] = None, |
|
bt: Optional[FunctionTracebackInfo] = None |
|
): |
|
self.variable_name = variable_name |
|
self.function_name = function_name |
|
self.value = value |
|
self.capture_config = capture_config |
|
self.variable_name_from = variable_name_from |
|
self.bt = bt |
|
|
|
def __repr__(self): |
|
string = '' |
|
if self.function_name is None: |
|
function_name_str = '*' |
|
else: |
|
function_name_str = self.function_name |
|
string += f'{self.function_name}::{self.variable_name}' |
|
if self.variable_name_from is not None: |
|
string += f' from {self.variable_name_from}' |
|
if self.bt is not None and len(self.bt) > 0: |
|
string += f' <{self.bt[0].filename}:{self.bt[0].line}>' |
|
string += f' ({type(self.value).__name__})' |
|
return string |
|
|
|
|
|
class VariableCapturer(object): |
|
def __init__( |
|
self, |
|
func: Callable, |
|
capture_configs: List[CaptureConfig], |
|
traceback_info_stops_when_filepath_has_none_of: Optional[List[str]] = None |
|
): |
|
self._locals = {} |
|
self.func = func |
|
self.capture_configs = capture_configs |
|
self.captures = [] |
|
self.traceback_info_stops_when_filepath_has_none_of = traceback_info_stops_when_filepath_has_none_of |
|
|
|
def __repr__(self) -> str: |
|
string_repr = ( |
|
f'{type(self).__name__} capturing "{self.func.__name__}" [{len(self.capture_configs)} configs]\n\n' |
|
f'{len(self.captures)} captures:' |
|
) |
|
|
|
l_key_cnt_sorted = sorted( |
|
list(Counter([str(x) for x in self.captures]).items()), |
|
key=lambda x: x[1], |
|
reverse=True |
|
) |
|
|
|
for key, cnt in l_key_cnt_sorted: |
|
string_repr += f'\n {cnt} captures of {key}' |
|
|
|
return string_repr |
|
|
|
def __call__(self, *args, **kwargs) -> Any: |
|
def tracer(frame, event, arg): |
|
if event == 'return': # types: call, c_call, c_return, return, c_exception |
|
function_name = frame.f_code.co_name |
|
_locals = frame.f_locals |
|
for capture_config in self.capture_configs: |
|
if capture_config.function_name is None or capture_config.function_name == function_name: |
|
if capture_config.variable_name in _locals or capture_config.variable_name == '*': |
|
variable_name = capture_config.variable_name |
|
|
|
bt_info = FunctionTracebackInfo( |
|
frame, |
|
traceback_info_stops_when_filepath_has_none_of=self.traceback_info_stops_when_filepath_has_none_of |
|
) |
|
|
|
self.captures.append(Capture( |
|
variable_name=variable_name if capture_config.variable_rename is None else capture_config.variable_rename, |
|
function_name=function_name, |
|
value=deepcopy(_locals[variable_name]), |
|
capture_config=capture_config, |
|
variable_name_from=None if capture_config.variable_rename is None else variable_name, |
|
bt=bt_info |
|
)) |
|
|
|
# tracer is activated on next call, return or exception |
|
sys.setprofile(tracer) |
|
try: |
|
# trace the function call |
|
res = self.func(*args, **kwargs) |
|
finally: |
|
# disable tracer and replace with old one |
|
sys.setprofile(None) |
|
return res |
|
|
|
def clear_locals(self) -> None: |
|
self._locals = {} |
|
|
|
@property |
|
def locals(self): |
|
return self._locals |
|
|
|
def get_dict_anchor_to_captures( |
|
capture: Capture, |
|
anchor: str, |
|
on_multiple_captures_for_same_anchor: str = 'error', |
|
on_multiple_captures_for_same_anchor_select: str = 'first', |
|
) -> Dict[str, List[Capture]]: |
|
d_anchor_to_captures = {} |
|
|
|
list_captures = cap.captures |
|
|
|
if str(list_captures[0]) != anchor: |
|
print('Anchor not found for value 0. Assuming reverse order') |
|
list_captures = list(reversed(list_captures)) |
|
if str(list_captures[0]) != anchor: |
|
raise ValueError('Anchor not found in positions [0] or [-1]. Anchor is ill-defined') |
|
|
|
for i, capture in enumerate(list_captures): |
|
capture_id = str(capture) |
|
if capture_id == anchor: |
|
anchor_value = capture.value |
|
d_anchor_to_captures[anchor_value] = [] |
|
else: |
|
if capture_id in [str(x) for x in d_anchor_to_captures[anchor_value]]: |
|
if on_multiple_captures_for_same_anchor == 'error': |
|
raise ValueError(f'Anchor of capture in pos {i} is ill-defined as "{anchor_value}" already had a "{capture_id}", and it tried to set another one of the same type') |
|
elif on_multiple_captures_for_same_anchor == 'warn': |
|
print(f'Warning: Anchor of capture in pos {i} is ill-defined as "{anchor_value}" already had a "{capture_id}", and it tried to set another one of the same type') |
|
else: |
|
pass |
|
if on_multiple_captures_for_same_anchor_select == 'first': |
|
pass |
|
elif on_multiple_captures_for_same_anchor_select == 'last': |
|
idx = [str(x) for x in d_anchor_to_captures[anchor_value]].index(capture_id) |
|
d_anchor_to_captures[anchor_value].pop(idx) |
|
d_anchor_to_captures[anchor_value].append(capture) |
|
else: |
|
raise ValueError('on_multiple_captures_for_same_anchor_select ust be one of ["first", "last"]') |
|
|
|
else: |
|
d_anchor_to_captures[anchor_value].append(capture) |
|
|
|
return d_anchor_to_captures |
|
|
|
|
|
def list_captures_to_dict(captures: List[Capture]) -> Dict: |
|
d = {} |
|
for capture in captures: |
|
varname = capture.variable_name |
|
if varname in d: |
|
msg = ( |
|
f'Trying to reset an already existing value for capture with name "{varname}"\n\n' |
|
f'Capture: {capture}\n\n' |
|
f'\tAlready existing value:\n\t\t{d[varname]}\n\n' |
|
f'\tTrying to set value:\n\t\t{capture.value}\n\n' |
|
) |
|
|
|
raise ValueError(msg) |
|
else: |
|
d[varname] = capture.value |
|
return d |
|
|
|
|
|
def get_df_captures_from_d_anchor_to_captures( |
|
d_anchor_to_captures: Dict[str, List[Capture]], |
|
) -> pd.DataFrame: |
|
df_captures = pd.DataFrame({'k': d_anchor_to_captures.keys()}) |
|
df_captures['captures'] = df_captures['k'].apply(lambda k: d_anchor_to_captures[k]) |
|
df_captures['d'] = df_captures['captures'].apply(list_captures_to_dict) |
|
|
|
return json_normalize_and_concat(df_captures, 'd') |
|
|
|
|
|
def json_normalize_and_concat(df: pd.DataFrame, col: str) -> pd.DataFrame: |
|
index_name = df.reset_index().columns[0] |
|
|
|
df_json_norm = pd.json_normalize(df[col], max_level=0).reset_index(drop=True) |
|
df_ret = pd.concat([ |
|
df.reset_index().drop([c for c in df_json_norm.columns if c in df.columns], axis=1), |
|
df_json_norm |
|
], axis=1).set_index(index_name).drop(col, axis=1) |
|
|
|
return df_ret |
|
|
|
|
|
|
|
def cap_to_df( |
|
cap: VariableCapturer, |
|
anchor: str, |
|
on_multiple_captures_for_same_anchor: str, |
|
on_multiple_captures_for_same_anchor_select: str |
|
) -> pd.DataFrame: |
|
# All anchor values are unique |
|
assert all([v == 1 for v in Counter([x.value for x in cap.captures if str(x) == anchor]).values()]) |
|
|
|
d_anchor_to_captures = get_dict_anchor_to_captures( |
|
capture=cap, |
|
anchor=anchor, |
|
on_multiple_captures_for_same_anchor=on_multiple_captures_for_same_anchor, |
|
on_multiple_captures_for_same_anchor_select=on_multiple_captures_for_same_anchor_select |
|
) |
|
|
|
df_captures = get_df_captures_from_d_anchor_to_captures(d_anchor_to_captures) |
|
return df_captures |