Skip to content

Instantly share code, notes, and snippets.

@JaySon-Huang
Last active April 14, 2023 05:54
Show Gist options
  • Save JaySon-Huang/e374da1fafa41a3fa30a24e135c60825 to your computer and use it in GitHub Desktop.
Save JaySon-Huang/e374da1fafa41a3fa30a24e135c60825 to your computer and use it in GitHub Desktop.
A helper for grouping stacks of pstack output
#!/usr/bin/env python3
from concurrent.futures import thread
from random import sample
import re
import hashlib
from pprint import pprint
import sys
import unittest
class Frame:
regx_frame = re.compile(r'^#(?P<num>\d+)\W+((?P<addr>[0-9a-fx]+) in )?(?P<func>.+)? at (?P<loc>.+)$')
regx_frame_no_loc = re.compile(r'^#(?P<num>\d+)\W+(?P<addr>[0-9a-fx]+) in (?P<func>.*)?$')
regx_func_param = re.compile(r'^(?P<func>.+)(\W+?)\((?P<params>.+=.+(,\W+.+=.+)*)*\)')
@classmethod
def parse_func_deprecated(cls, func):
m = cls.regx_func_param.match(func)
if m is None:
raise Exception("can not parse func", func)
return m.group('func'), m.group('params')
@classmethod
def parse_func(cls, func):
reversed_func = func[::-1]
def find_idx(reversed_func):
depth = 0
for idx, x in enumerate(reversed_func):
if x == ')':
depth += 1
elif x == '(':
depth -= 1
if depth == 0:
return idx
raise Exception()
idx = find_idx(reversed_func)
size = len(reversed_func)
return func[0:size - idx - 1].strip(), func[size - idx:-1].strip()
@classmethod
def parse(cls, line):
m = cls.regx_frame.match(line)
if m is not None:
return cls(m.group('num'), m.group('addr'), m.group('func'), m.group('loc'))
m = cls.regx_frame_no_loc.match(line)
if m is not None:
return cls(m.group('num'), m.group('addr'), m.group('func'), None)
raise Exception("can not parse line", line)
def __init__(self, num, addr, func, loc):
self.num = num
self.addr = addr
self.func, self.params = self.parse_func(func)
self.loc = loc
def __repr__(self):
return '<Frame #{}, func:{}>'.format(self.num, self.func)
def parts(self):
return [self.num, self.addr, self.func, self.loc]
class Stack:
regx_thd = re.compile(r'^Thread ([0-9]+)')
def __init__(self, lines):
self.hdr = lines[0]
thread_id = self.regx_thd.match(self.hdr).group(1)
self.id = thread_id
frames = lines[1:]
self.frames = [Frame.parse(l) for l in frames]
self.full_digest = None
self.full_digest = self.digest()
def __repr__(self):
return '<Stack {}, depth:{}>'.format(self.id, len(self.frames))
def __len__(self):
return len(self.frames)
def digest(self, last_n_frames=None, raw=False):
if self.full_digest is not None and last_n_frames is None:
return self.full_digest
frames_to_digest = self.frames
if last_n_frames is not None:
frames_to_digest = list(reversed(self.frames))[:last_n_frames]
hash_str = '\n'.join([x.func for x in frames_to_digest])
if raw:
return hash_str
return hashlib.md5(hash_str.encode("utf-8")).hexdigest()
def frames_to_str(self):
# return '\n'.join(['{} in {} at {}'.format(' ?' if f.addr is None else f.addr, f.func, f.loc) for f in self.frames])
return '\n'.join(['#{:2s} {} at {}'.format(f.num, f.func, f.loc if f.loc is not None else '?') for f in self.frames])
class TestParseFrames(unittest.TestCase):
def test_parse_frames(self):
lines = """
#9 0x000000000635c68f in execute_native_thread_routine ()
#17 0x000000000635c68f in execute_native_thread_routine ()
#0 futex_abstimed_wait_cancelable (private=<optimized out>, abstime=0x7f1c331c6c20, clockid=<optimized out>, expected=0, futex_word=0x7f1c331c6c68) at ../sysdeps/nptl/futex-internal.h:320
#1 __pthread_cond_wait_common (abstime=0x7f1c331c6c20, clockid=<optimized out>, mutex=0x7f1de5bf1a00, cond=0x7f1c331c6c40) at pthread_cond_wait.c:520
#2 __pthread_cond_timedwait (cond=0x7f1c331c6c40, mutex=0x7f1de5bf1a00, abstime=0x7f1c331c6c20) at pthread_cond_wait.c:665
#3 0x00000000053bb59d in __gthread_cond_timedwait (__abs_timeout=0x7f1c331c6c20, __mutex=<optimized out>, __cond=0x7f1c331c6c40) at /opt/rh/devtoolset-7/root/usr/include/c++/7/x86_64-redhat-linux/bits/gthr-default.h:871
#4 std::condition_variable::__wait_until_impl<std::chrono::duration<long, std::ratio<1l, 1000000000l> > > (__atime=..., __lock=<synthetic pointer>..., this=0x7f1c331c6c40) at /opt/rh/devtoolset-7/root/usr/include/c++/7/condition_variable:166
#5 std::condition_variable::wait_until<std::chrono::duration<long, std::ratio<1l, 1000000000l> > > (__atime=..., __lock=<synthetic pointer>..., this=0x7f1c331c6c40) at /opt/rh/devtoolset-7/root/usr/include/c++/7/condition_variable:106
#6 std::condition_variable::wait_for<long, std::ratio<1l, 1000000000l> > (__rtime=..., __lock=<synthetic pointer>..., this=0x7f1c331c6c40) at /opt/rh/devtoolset-7/root/usr/include/c++/7/condition_variable:138
#7 DB::DynamicThreadPool::dynamicWork (this=0x7f1de5bf1940, initial_task=...) at /home/guojiangtao/new_tiflash/dbms/src/Common/DynamicThreadPool.cpp:136
#8 0x00000000053bcb8f in std::__invoke_impl<void, void (DB::DynamicThreadPool::* const&)(std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> >), DB::DynamicThreadPool*, std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> > > (__t=<optimized out>, __f=@0x7f1cc400ee30: (void (DB::DynamicThreadPool::*)(class DB::DynamicThreadPool * const, class std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> >)) 0x53bb430 <DB::DynamicThreadPool::dynamicWork(std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> >)>) at /opt/rh/devtoolset-7/root/usr/include/c++/7/bits/invoke.h:73
#9 std::__invoke<void (DB::DynamicThreadPool::* const&)(std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> >), DB::DynamicThreadPool*, std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> > > (__fn=<optimized out>) at /opt/rh/devtoolset-7/root/usr/include/c++/7/bits/invoke.h:95
#10 std::invoke<void (DB::DynamicThreadPool::* const&)(std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> >), DB::DynamicThreadPool*, std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> > > (__fn=@0x7f1cc400ee30: (void (DB::DynamicThreadPool::*)(class DB::DynamicThreadPool * const, class std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> >)) 0x53bb430 <DB::DynamicThreadPool::dynamicWork(std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> >)>) at /opt/rh/devtoolset-7/root/usr/include/c++/7/functional:80
#11 _ZZN2DB13ThreadFactory9newThreadIMNS_17DynamicThreadPoolEFvSt10unique_ptrINS_15IExecutableTaskESt14default_deleteIS4_EEEJPS2_S7_EEESt6threadbSsOT_DpOT0_ENKUlDpOT_E_clIJSA_S7_EEEDaSJ_ (__closure=0x7f1cc400ee18) at /home/guojiangtao/new_tiflash/dbms/src/Common/ThreadFactory.h:47
#12 _ZSt13__invoke_implIvZN2DB13ThreadFactory9newThreadIMNS0_17DynamicThreadPoolEFvSt10unique_ptrINS0_15IExecutableTaskESt14default_deleteIS5_EEEJPS3_S8_EEESt6threadbSsOT_DpOT0_EUlDpOT_E_JSB_S8_EESD_St14__invoke_otherOT0_DpOT1_ (__f=...) at /opt/rh/devtoolset-7/root/usr/include/c++/7/bits/invoke.h:60
#13 _ZSt8__invokeIZN2DB13ThreadFactory9newThreadIMNS0_17DynamicThreadPoolEFvSt10unique_ptrINS0_15IExecutableTaskESt14default_deleteIS5_EEEJPS3_S8_EEESt6threadbSsOT_DpOT0_EUlDpOT_E_JSB_S8_EENSt15__invoke_resultISD_JDpSF_EE4typeESE_SH_ (__fn=...) at /opt/rh/devtoolset-7/root/usr/include/c++/7/bits/invoke.h:95
#14 _ZNSt6thread8_InvokerISt5tupleIJZN2DB13ThreadFactory9newThreadIMNS2_17DynamicThreadPoolEFvSt10unique_ptrINS2_15IExecutableTaskESt14default_deleteIS7_EEEJPS5_SA_EEES_bSsOT_DpOT0_EUlDpOT_E_SD_SA_EEE9_M_invokeIJLm0ELm1ELm2EEEEDTcl8__invokespcl10_S_declvalIXT_EEEEESt12_Index_tupleIJXspT_EEE (this=0x7f1cc400ee08) at /opt/rh/devtoolset-7/root/usr/include/c++/7/thread:234
#15 _ZNSt6thread8_InvokerISt5tupleIJZN2DB13ThreadFactory9newThreadIMNS2_17DynamicThreadPoolEFvSt10unique_ptrINS2_15IExecutableTaskESt14default_deleteIS7_EEEJPS5_SA_EEES_bSsOT_DpOT0_EUlDpOT_E_SD_SA_EEEclEv (this=0x7f1cc400ee08) at /opt/rh/devtoolset-7/root/usr/include/c++/7/thread:243
#16 _ZNSt6thread11_State_implINS_8_InvokerISt5tupleIJZN2DB13ThreadFactory9newThreadIMNS3_17DynamicThreadPoolEFvSt10unique_ptrINS3_15IExecutableTaskESt14default_deleteIS8_EEEJPS6_SB_EEES_bSsOT_DpOT0_EUlDpOT_E_SE_SB_EEEEE6_M_runEv (this=0x7f1cc400ee00) at /opt/rh/devtoolset-7/root/usr/include/c++/7/thread:186
#17 0x000000000635c68f in execute_native_thread_routine ()
#18 0x00007f1df8875609 in start_thread (arg=<optimized out>) at pthread_create.c:477
#19 0x00007f1df8649133 in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95
""".splitlines()
for line in lines:
line = line.strip()
if len(line) == 0: continue
# No exception raised
Frame.parse(line)
def test_parse_func(self):
func = '''std::__invoke_impl<void, void (DB::DynamicThreadPool::* const&)(std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> >), DB::DynamicThreadPool*, std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> > > (__t=<optimized out>, __f=<optimized out>)'''.strip()
func_name, func_params = Frame.parse_func(func)
self.assertEqual(func_name, 'std::__invoke_impl<void, void (DB::DynamicThreadPool::* const&)(std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> >), DB::DynamicThreadPool*, std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> > >')
self.assertEqual(func_params, '__t=<optimized out>, __f=<optimized out>')
func = '''std::__invoke_impl<void, void (DB::DynamicThreadPool::* const&)(std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> >), DB::DynamicThreadPool*, std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> > > (__t=<optimized out>, __f=@0x7f1ca0518030: (void (DB::DynamicThreadPool::*)(class DB::DynamicThreadPool * const, class std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> >)) 0x53bb430 <DB::DynamicThreadPool::dynamicWork(std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> >)>)'''.strip()
func_name, func_params = Frame.parse_func(func)
self.assertEqual(func_name, 'std::__invoke_impl<void, void (DB::DynamicThreadPool::* const&)(std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> >), DB::DynamicThreadPool*, std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> > >')
self.assertEqual(func_params, '__t=<optimized out>, __f=@0x7f1ca0518030: (void (DB::DynamicThreadPool::*)(class DB::DynamicThreadPool * const, class std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> >)) 0x53bb430 <DB::DynamicThreadPool::dynamicWork(std::unique_ptr<DB::IExecutableTask, std::default_delete<DB::IExecutableTask> >)>')
func = '''DB::(anonymous namespace)::handleRpcs (curcq=0x7f1de60a12e0, log=std::shared_ptr<class DB::Logger> (use count 1, weak count 0) = {...})'''
func_name, func_params = Frame.parse_func(func)
self.assertEqual(func_name, 'DB::(anonymous namespace)::handleRpcs')
self.assertEqual(func_params, 'curcq=0x7f1de60a12e0, log=std::shared_ptr<class DB::Logger> (use count 1, weak count 0) = {...}')
def parse_input_file(file):
'''
Return a map thread-id => Stack
'''
threads = {}
lines_one_stack = []
with open(file, 'r') as infile:
for line in infile:
line = line.strip()
# End of all stacks
if line == 'quit':
break
if len(line) == 0: continue
m = Stack.regx_thd.match(line)
if m is not None:
# Meet new lines for another threads
if len(lines_one_stack) != 0:
s = Stack(lines_one_stack)
threads[s.id] = s
lines_one_stack = [line]
else:
# Append to the previous thread stack frame
lines_one_stack.append(line)
if len(lines_one_stack) != 0:
s = Stack(lines_one_stack)
threads[s.id] = s
return threads
def group_by_full_digest(threads):
# Group threads by digest
# the digest of each threads is a hash base on the func name
# on the frame
summary = {}
for thd_id, thd in threads.items():
digest = thd.digest()
if digest not in summary:
summary[digest] = [thd_id, ]
else:
summary[digest].append(thd_id)
# Sort the digest base on the number of threads
l = sorted([(digest, len(thds)) for digest, thds in summary.items()], key=lambda x: x[1], reverse=True)
# Print a sample frame for each digest
total_num_group = len(l)
print(f'Total num of digest group: {total_num_group}\n')
for (idx, (digest, num)) in enumerate(l):
print(f'Group [{idx + 1}/{total_num_group}]')
print(f'Digest={digest}, num of threads={num}')
print('Thread IDs: [{}]'.format(",".join([x for x in summary[digest]])))
sample_stack = threads[summary[digest][0]]
print(sample_stack.frames_to_str())
print()
# import ipdb; ipdb.set_trace()
class DigestGroup:
def __init__(self, digest, level):
self.digest = digest
self.level = level
self.thread_ids = []
self.sub_groups = {}
self.layer_frame = None
self.share_frame = None
def add(self, thd_id):
self.thread_ids.append(thd_id)
def gen_sub_groups(self, threads):
full_digests = {}
tmp_sub_groups = {}
for thd_id in self.thread_ids:
thd = threads[thd_id]
full = thd.digest()
digest = thd.digest(self.level + 1)
if full not in full_digests:
full_digests[full] = 1
if digest not in tmp_sub_groups:
sub_group = DigestGroup(digest, self.level + 1)
frame_depth = -sub_group.level
if sub_group.level >= len(thd.frames):
frame_depth = 0
sub_group.layer_frame = thd.frames[frame_depth]
sub_group.share_frame = thd.frames[frame_depth + 1]
# print('new subgroup by thread_id:{}, level:{}, frame: {}'.format(thd_id, sub_group.level, thd.frames[frame_depth]))
tmp_sub_groups[digest] = sub_group
tmp_sub_groups[digest].add(thd_id)
prefix = ' ' * self.level
if len(full_digests) == 1:
# print('{}All the threads in the group share the same full digest, digest={}, level={}, ids={}'.format(prefix, self.digest, self.level, self.thread_ids))
return
self.sub_groups = tmp_sub_groups
for g in self.sub_groups:
# print('{}Generate on next level groups, level={} len(full_digests)={} digest={}'.format(prefix, self.level, len(full_digests), g))
self.sub_groups[g].gen_sub_groups(threads)
# print('{}Generate done on next level groups, level={}'.format(prefix, self.level))
def collect_groups_on_leaves(self, parent_digest):
if len(self.sub_groups) == 0:
# print(parent_digest, self.level, self.digest, self.layer_frame, self.thread_ids)
return [self]
l = []
for g in self.sub_groups:
l.append((len(self.sub_groups[g].thread_ids), g))
l.sort(key=lambda x: x[0], reverse=True)
all_grps = []
for (_num_thds, g) in l:
grp = self.sub_groups[g].collect_groups_on_leaves(self.digest)
all_grps.extend(grp)
return all_grps
def group_by_layered(threads):
summary = {}
for thd_id, thd in threads.items():
digest = thd.digest(1)
if digest not in summary:
summary[digest] = DigestGroup(digest, 1)
summary[digest].add(thd_id)
for g in summary:
summary[g].gen_sub_groups(threads)
all_leaves = []
for g in summary:
all_leaves.extend(summary[g].collect_groups_on_leaves('root'))
total_num_group = len(all_leaves)
print(f'Total num of digest group: {total_num_group}\n')
for idx, g in enumerate(all_leaves):
print(f'Group [{idx + 1}/{total_num_group}]')
print(f'Digest={g.digest}, num of threads={len(g.thread_ids)}, share frame with other stacks={g.share_frame}')
print('Thread IDs: [{}]'.format(','.join([x for x in g.thread_ids])))
sample_stack = threads[g.thread_ids[0]]
print(sample_stack.frames_to_str())
print()
# import ipdb; ipdb.set_trace()
if __name__ == '__main__':
# unittest.main()
if len(sys.argv) < 2:
print(f'Usage {sys.argv[0]} <pstack.log>', file=sys.stderr)
exit(1)
file = sys.argv[1]
threads = parse_input_file(file)
# group_by_full_digest(threads)
group_by_layered(threads)
@JaySon-Huang
Copy link
Author

lsof -i:9000
export PID=28280
gdb -p $PID -ex "thread apply all bt" -ex "detach" -ex "quit" > tiflash.$PID.log

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment