Last active
April 14, 2023 05:54
-
-
Save JaySon-Huang/e374da1fafa41a3fa30a24e135c60825 to your computer and use it in GitHub Desktop.
A helper for grouping stacks of pstack output
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from concurrent.futures import thread | |
from random import sample | |
import re | |
import hashlib | |
from pprint import pprint | |
import sys | |
import unittest | |
class Frame: | |
regx_frame = re.compile(r'^#(?P<num>\d+)\W+((?P<addr>[0-9a-fx]+) in )?(?P<func>.+)? at (?P<loc>.+)$') | |
regx_frame_no_loc = re.compile(r'^#(?P<num>\d+)\W+(?P<addr>[0-9a-fx]+) in (?P<func>.*)?$') | |
regx_func_param = re.compile(r'^(?P<func>.+)(\W+?)\((?P<params>.+=.+(,\W+.+=.+)*)*\)') | |
@classmethod | |
def parse_func_deprecated(cls, func): | |
m = cls.regx_func_param.match(func) | |
if m is None: | |
raise Exception("can not parse func", func) | |
return m.group('func'), m.group('params') | |
@classmethod | |
def parse_func(cls, func): | |
reversed_func = func[::-1] | |
def find_idx(reversed_func): | |
depth = 0 | |
for idx, x in enumerate(reversed_func): | |
if x == ')': | |
depth += 1 | |
elif x == '(': | |
depth -= 1 | |
if depth == 0: | |
return idx | |
raise Exception() | |
idx = find_idx(reversed_func) | |
size = len(reversed_func) | |
return func[0:size - idx - 1].strip(), func[size - idx:-1].strip() | |
@classmethod | |
def parse(cls, line): | |
m = cls.regx_frame.match(line) | |
if m is not None: | |
return cls(m.group('num'), m.group('addr'), m.group('func'), m.group('loc')) | |
m = cls.regx_frame_no_loc.match(line) | |
if m is not None: | |
return cls(m.group('num'), m.group('addr'), m.group('func'), None) | |
raise Exception("can not parse line", line) | |
def __init__(self, num, addr, func, loc): | |
self.num = num | |
self.addr = addr | |
self.func, self.params = self.parse_func(func) | |
self.loc = loc | |
def __repr__(self): | |
return '<Frame #{}, func:{}>'.format(self.num, self.func) | |
def parts(self): | |
return [self.num, self.addr, self.func, self.loc] | |
class Stack: | |
regx_thd = re.compile(r'^Thread ([0-9]+)') | |
def __init__(self, lines): | |
self.hdr = lines[0] | |
thread_id = self.regx_thd.match(self.hdr).group(1) | |
self.id = thread_id | |
frames = lines[1:] | |
self.frames = [Frame.parse(l) for l in frames] | |
self.full_digest = None | |
self.full_digest = self.digest() | |
def __repr__(self): | |
return '<Stack {}, depth:{}>'.format(self.id, len(self.frames)) | |
def __len__(self): | |
return len(self.frames) | |
def digest(self, last_n_frames=None, raw=False): | |
if self.full_digest is not None and last_n_frames is None: | |
return self.full_digest | |
frames_to_digest = self.frames | |
if last_n_frames is not None: | |
frames_to_digest = list(reversed(self.frames))[:last_n_frames] | |
hash_str = '\n'.join([x.func for x in frames_to_digest]) | |
if raw: | |
return hash_str | |
return hashlib.md5(hash_str.encode("utf-8")).hexdigest() | |
def frames_to_str(self): | |
# return '\n'.join(['{} in {} at {}'.format(' ?' if f.addr is None else f.addr, f.func, f.loc) for f in self.frames]) | |
return '\n'.join(['#{:2s} {} at {}'.format(f.num, f.func, f.loc if f.loc is not None else '?') for f in self.frames]) | |
class TestParseFrames(unittest.TestCase): | |
def test_parse_frames(self): | |
lines = """ | |
#9 0x000000000635c68f in execute_native_thread_routine () | |
#17 0x000000000635c68f in execute_native_thread_routine () | |
#0 futex_abstimed_wait_cancelable (private=<optimized out>, abstime=0x7f1c331c6c20, clockid=<optimized out>, expected=0, futex_word=0x7f1c331c6c68) at ../sysdeps/nptl/futex-internal.h:320 | |
#1 __pthread_cond_wait_common (abstime=0x7f1c331c6c20, clockid=<optimized out>, mutex=0x7f1de5bf1a00, cond=0x7f1c331c6c40) at pthread_cond_wait.c:520 | |
#2 __pthread_cond_timedwait (cond=0x7f1c331c6c40, mutex=0x7f1de5bf1a00, abstime=0x7f1c331c6c20) at pthread_cond_wait.c:665 | |
#3 0x00000000053bb59d in __gthread_cond_timedwait (__abs_timeout=0x7f1c331c6c20, __mutex=<optimized out>, __cond=0x7f1c331c6c40) at /opt/rh/devtoolset-7/root/usr/include/c++/7/x86_64-redhat-linux/bits/gthr-default.h:871 | |
#4 std::condition_variable::__wait_until_impl<std::chrono::duration<long, std::ratio<1l, 1000000000l> > > (__atime=..., __lock=<synthetic pointer>..., this=0x7f1c331c6c40) at /opt/rh/devtoolset-7/root/usr/include/c++/7/condition_variable:166 | |
#5 std::condition_variable::wait_until<std::chrono::duration<long, std::ratio<1l, 1000000000l> > > (__atime=..., __lock=<synthetic pointer>..., this=0x7f1c331c6c40) at /opt/rh/devtoolset-7/root/usr/include/c++/7/condition_variable:106 | |
#6 std::condition_variable::wait_for<long, std::ratio<1l, 1000000000l> > (__rtime=..., __lock=<synthetic pointer>..., this=0x7f1c331c6c40) at /opt/rh/devtoolset-7/root/usr/include/c++/7/condition_variable:138 | |
#7 DB::DynamicThreadPool::dynamicWork (this=0x7f1de5bf1940, initial_task=...) at /home/guojiangtao/new_tiflash/dbms/src/Common/DynamicThreadPool.cpp:136 | |
#8 0x00000000053bcb8f in std::__invoke_impl<void, void (DB::DynamicThreadPool::* const&)(std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> >), DB::DynamicThreadPool*, std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> > > (__t=<optimized out>, __f=@0x7f1cc400ee30: (void (DB::DynamicThreadPool::*)(class DB::DynamicThreadPool * const, class std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> >)) 0x53bb430 <DB::DynamicThreadPool::dynamicWork(std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> >)>) at /opt/rh/devtoolset-7/root/usr/include/c++/7/bits/invoke.h:73 | |
#9 std::__invoke<void (DB::DynamicThreadPool::* const&)(std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> >), DB::DynamicThreadPool*, std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> > > (__fn=<optimized out>) at /opt/rh/devtoolset-7/root/usr/include/c++/7/bits/invoke.h:95 | |
#10 std::invoke<void (DB::DynamicThreadPool::* const&)(std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> >), DB::DynamicThreadPool*, std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> > > (__fn=@0x7f1cc400ee30: (void (DB::DynamicThreadPool::*)(class DB::DynamicThreadPool * const, class std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> >)) 0x53bb430 <DB::DynamicThreadPool::dynamicWork(std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> >)>) at /opt/rh/devtoolset-7/root/usr/include/c++/7/functional:80 | |
#11 _ZZN2DB13ThreadFactory9newThreadIMNS_17DynamicThreadPoolEFvSt10unique_ptrINS_15IExecutableTaskESt14default_deleteIS4_EEEJPS2_S7_EEESt6threadbSsOT_DpOT0_ENKUlDpOT_E_clIJSA_S7_EEEDaSJ_ (__closure=0x7f1cc400ee18) at /home/guojiangtao/new_tiflash/dbms/src/Common/ThreadFactory.h:47 | |
#12 _ZSt13__invoke_implIvZN2DB13ThreadFactory9newThreadIMNS0_17DynamicThreadPoolEFvSt10unique_ptrINS0_15IExecutableTaskESt14default_deleteIS5_EEEJPS3_S8_EEESt6threadbSsOT_DpOT0_EUlDpOT_E_JSB_S8_EESD_St14__invoke_otherOT0_DpOT1_ (__f=...) at /opt/rh/devtoolset-7/root/usr/include/c++/7/bits/invoke.h:60 | |
#13 _ZSt8__invokeIZN2DB13ThreadFactory9newThreadIMNS0_17DynamicThreadPoolEFvSt10unique_ptrINS0_15IExecutableTaskESt14default_deleteIS5_EEEJPS3_S8_EEESt6threadbSsOT_DpOT0_EUlDpOT_E_JSB_S8_EENSt15__invoke_resultISD_JDpSF_EE4typeESE_SH_ (__fn=...) at /opt/rh/devtoolset-7/root/usr/include/c++/7/bits/invoke.h:95 | |
#14 _ZNSt6thread8_InvokerISt5tupleIJZN2DB13ThreadFactory9newThreadIMNS2_17DynamicThreadPoolEFvSt10unique_ptrINS2_15IExecutableTaskESt14default_deleteIS7_EEEJPS5_SA_EEES_bSsOT_DpOT0_EUlDpOT_E_SD_SA_EEE9_M_invokeIJLm0ELm1ELm2EEEEDTcl8__invokespcl10_S_declvalIXT_EEEEESt12_Index_tupleIJXspT_EEE (this=0x7f1cc400ee08) at /opt/rh/devtoolset-7/root/usr/include/c++/7/thread:234 | |
#15 _ZNSt6thread8_InvokerISt5tupleIJZN2DB13ThreadFactory9newThreadIMNS2_17DynamicThreadPoolEFvSt10unique_ptrINS2_15IExecutableTaskESt14default_deleteIS7_EEEJPS5_SA_EEES_bSsOT_DpOT0_EUlDpOT_E_SD_SA_EEEclEv (this=0x7f1cc400ee08) at /opt/rh/devtoolset-7/root/usr/include/c++/7/thread:243 | |
#16 _ZNSt6thread11_State_implINS_8_InvokerISt5tupleIJZN2DB13ThreadFactory9newThreadIMNS3_17DynamicThreadPoolEFvSt10unique_ptrINS3_15IExecutableTaskESt14default_deleteIS8_EEEJPS6_SB_EEES_bSsOT_DpOT0_EUlDpOT_E_SE_SB_EEEEE6_M_runEv (this=0x7f1cc400ee00) at /opt/rh/devtoolset-7/root/usr/include/c++/7/thread:186 | |
#17 0x000000000635c68f in execute_native_thread_routine () | |
#18 0x00007f1df8875609 in start_thread (arg=<optimized out>) at pthread_create.c:477 | |
#19 0x00007f1df8649133 in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95 | |
""".splitlines() | |
for line in lines: | |
line = line.strip() | |
if len(line) == 0: continue | |
# No exception raised | |
Frame.parse(line) | |
def test_parse_func(self): | |
func = '''std::__invoke_impl<void, void (DB::DynamicThreadPool::* const&)(std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> >), DB::DynamicThreadPool*, std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> > > (__t=<optimized out>, __f=<optimized out>)'''.strip() | |
func_name, func_params = Frame.parse_func(func) | |
self.assertEqual(func_name, 'std::__invoke_impl<void, void (DB::DynamicThreadPool::* const&)(std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> >), DB::DynamicThreadPool*, std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> > >') | |
self.assertEqual(func_params, '__t=<optimized out>, __f=<optimized out>') | |
func = '''std::__invoke_impl<void, void (DB::DynamicThreadPool::* const&)(std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> >), DB::DynamicThreadPool*, std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> > > (__t=<optimized out>, __f=@0x7f1ca0518030: (void (DB::DynamicThreadPool::*)(class DB::DynamicThreadPool * const, class std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> >)) 0x53bb430 <DB::DynamicThreadPool::dynamicWork(std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> >)>)'''.strip() | |
func_name, func_params = Frame.parse_func(func) | |
self.assertEqual(func_name, 'std::__invoke_impl<void, void (DB::DynamicThreadPool::* const&)(std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> >), DB::DynamicThreadPool*, std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> > >') | |
self.assertEqual(func_params, '__t=<optimized out>, __f=@0x7f1ca0518030: (void (DB::DynamicThreadPool::*)(class DB::DynamicThreadPool * const, class std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> >)) 0x53bb430 <DB::DynamicThreadPool::dynamicWork(std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> >)>') | |
func = '''DB::(anonymous namespace)::handleRpcs (curcq=0x7f1de60a12e0, log=std::shared_ptr<class DB::Logger> (use count 1, weak count 0) = {...})''' | |
func_name, func_params = Frame.parse_func(func) | |
self.assertEqual(func_name, 'DB::(anonymous namespace)::handleRpcs') | |
self.assertEqual(func_params, 'curcq=0x7f1de60a12e0, log=std::shared_ptr<class DB::Logger> (use count 1, weak count 0) = {...}') | |
def parse_input_file(file): | |
''' | |
Return a map thread-id => Stack | |
''' | |
threads = {} | |
lines_one_stack = [] | |
with open(file, 'r') as infile: | |
for line in infile: | |
line = line.strip() | |
# End of all stacks | |
if line == 'quit': | |
break | |
if len(line) == 0: continue | |
m = Stack.regx_thd.match(line) | |
if m is not None: | |
# Meet new lines for another threads | |
if len(lines_one_stack) != 0: | |
s = Stack(lines_one_stack) | |
threads[s.id] = s | |
lines_one_stack = [line] | |
else: | |
# Append to the previous thread stack frame | |
lines_one_stack.append(line) | |
if len(lines_one_stack) != 0: | |
s = Stack(lines_one_stack) | |
threads[s.id] = s | |
return threads | |
def group_by_full_digest(threads): | |
# Group threads by digest | |
# the digest of each threads is a hash base on the func name | |
# on the frame | |
summary = {} | |
for thd_id, thd in threads.items(): | |
digest = thd.digest() | |
if digest not in summary: | |
summary[digest] = [thd_id, ] | |
else: | |
summary[digest].append(thd_id) | |
# Sort the digest base on the number of threads | |
l = sorted([(digest, len(thds)) for digest, thds in summary.items()], key=lambda x: x[1], reverse=True) | |
# Print a sample frame for each digest | |
total_num_group = len(l) | |
print(f'Total num of digest group: {total_num_group}\n') | |
for (idx, (digest, num)) in enumerate(l): | |
print(f'Group [{idx + 1}/{total_num_group}]') | |
print(f'Digest={digest}, num of threads={num}') | |
print('Thread IDs: [{}]'.format(",".join([x for x in summary[digest]]))) | |
sample_stack = threads[summary[digest][0]] | |
print(sample_stack.frames_to_str()) | |
print() | |
# import ipdb; ipdb.set_trace() | |
class DigestGroup: | |
def __init__(self, digest, level): | |
self.digest = digest | |
self.level = level | |
self.thread_ids = [] | |
self.sub_groups = {} | |
self.layer_frame = None | |
self.share_frame = None | |
def add(self, thd_id): | |
self.thread_ids.append(thd_id) | |
def gen_sub_groups(self, threads): | |
full_digests = {} | |
tmp_sub_groups = {} | |
for thd_id in self.thread_ids: | |
thd = threads[thd_id] | |
full = thd.digest() | |
digest = thd.digest(self.level + 1) | |
if full not in full_digests: | |
full_digests[full] = 1 | |
if digest not in tmp_sub_groups: | |
sub_group = DigestGroup(digest, self.level + 1) | |
frame_depth = -sub_group.level | |
if sub_group.level >= len(thd.frames): | |
frame_depth = 0 | |
sub_group.layer_frame = thd.frames[frame_depth] | |
sub_group.share_frame = thd.frames[frame_depth + 1] | |
# print('new subgroup by thread_id:{}, level:{}, frame: {}'.format(thd_id, sub_group.level, thd.frames[frame_depth])) | |
tmp_sub_groups[digest] = sub_group | |
tmp_sub_groups[digest].add(thd_id) | |
prefix = ' ' * self.level | |
if len(full_digests) == 1: | |
# print('{}All the threads in the group share the same full digest, digest={}, level={}, ids={}'.format(prefix, self.digest, self.level, self.thread_ids)) | |
return | |
self.sub_groups = tmp_sub_groups | |
for g in self.sub_groups: | |
# print('{}Generate on next level groups, level={} len(full_digests)={} digest={}'.format(prefix, self.level, len(full_digests), g)) | |
self.sub_groups[g].gen_sub_groups(threads) | |
# print('{}Generate done on next level groups, level={}'.format(prefix, self.level)) | |
def collect_groups_on_leaves(self, parent_digest): | |
if len(self.sub_groups) == 0: | |
# print(parent_digest, self.level, self.digest, self.layer_frame, self.thread_ids) | |
return [self] | |
l = [] | |
for g in self.sub_groups: | |
l.append((len(self.sub_groups[g].thread_ids), g)) | |
l.sort(key=lambda x: x[0], reverse=True) | |
all_grps = [] | |
for (_num_thds, g) in l: | |
grp = self.sub_groups[g].collect_groups_on_leaves(self.digest) | |
all_grps.extend(grp) | |
return all_grps | |
def group_by_layered(threads): | |
summary = {} | |
for thd_id, thd in threads.items(): | |
digest = thd.digest(1) | |
if digest not in summary: | |
summary[digest] = DigestGroup(digest, 1) | |
summary[digest].add(thd_id) | |
for g in summary: | |
summary[g].gen_sub_groups(threads) | |
all_leaves = [] | |
for g in summary: | |
all_leaves.extend(summary[g].collect_groups_on_leaves('root')) | |
total_num_group = len(all_leaves) | |
print(f'Total num of digest group: {total_num_group}\n') | |
for idx, g in enumerate(all_leaves): | |
print(f'Group [{idx + 1}/{total_num_group}]') | |
print(f'Digest={g.digest}, num of threads={len(g.thread_ids)}, share frame with other stacks={g.share_frame}') | |
print('Thread IDs: [{}]'.format(','.join([x for x in g.thread_ids]))) | |
sample_stack = threads[g.thread_ids[0]] | |
print(sample_stack.frames_to_str()) | |
print() | |
# import ipdb; ipdb.set_trace() | |
if __name__ == '__main__': | |
# unittest.main() | |
if len(sys.argv) < 2: | |
print(f'Usage {sys.argv[0]} <pstack.log>', file=sys.stderr) | |
exit(1) | |
file = sys.argv[1] | |
threads = parse_input_file(file) | |
# group_by_full_digest(threads) | |
group_by_layered(threads) |
Author
JaySon-Huang
commented
Apr 14, 2023
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment