Skip to content

Instantly share code, notes, and snippets.

@wware
Created March 16, 2017 03:33
Show Gist options
  • Save wware/2e715a337959ab855d1304068d8c1139 to your computer and use it in GitHub Desktop.
Save wware/2e715a337959ab855d1304068d8c1139 to your computer and use it in GitHub Desktop.
On my job, I need to debug what appears to be a memory leak in a Python program that spins up a bunch of workers using the multiprocessing module and then it uses managed queues to send them work and get back results. But the memory is going crazy. This is my attempt to better wrap my head around what Python does with memory.
# A litle library of memory diagnostics
# In no particular order of usefulness:
# https://pymotw.com/2/gc/
# https://code.tutsplus.com/tutorials/understand-how-much-memory-your-python-objects-use--cms-25609
# https://www.huyng.com/posts/python-performance-analysis
# http://stackoverflow.com/questions/23369937
import gc
import os
import sys
import pprint
import resource
import Queue
from collections import namedtuple
from types import InstanceType
def _get_obj_type(obj):
objtype = type(obj)
if type(obj) is InstanceType:
objtype = obj.__class__
return objtype
def _short_typename(obj):
return _get_obj_type(obj).__name__
def _long_typename(obj):
objtype = _get_obj_type(obj)
name = objtype.__name__
module = getattr(objtype, '__module__', None)
if module:
return '%s.%s' % (module, name)
else:
return name
def info(typename, objects, exclude=None):
if exclude is None:
exclude = []
count = sz = 0
if '.' in typename:
_typename = _long_typename
else:
_typename = _short_typename
for o in objects:
if o in exclude:
continue
if _typename(o) == typename:
count += 1
sz += sys.getsizeof(o)
return (count, sz / (1024. * 1024.))
def all_info(objects):
s = set()
for o in objects:
s.add(_long_typename(o))
n = sz = 0
for typename in s:
_n, _sz = info(typename, objects)
n += _n
sz += _sz
return (n, sz)
CycleFound = Exception
def find_cycles(obj):
seen = set()
to_process = Queue.Queue()
to_process.put(obj)
while not to_process.empty():
next_guy = to_process.get()
seen.add(id(next_guy))
for r in gc.get_referents(next_guy):
if isinstance(r, basestring) or isinstance(r, type):
# Ignore strings and classes
pass
elif id(r) in seen:
raise CycleFound(pprint.pformat(r))
else:
to_process.put(r)
Memory = namedtuple('Memory', ['pid', 'used'])
def memory_usage():
return Memory(
os.getpid(),
resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024.)
import logging
import objgraph
import pprint
import gc
import re
import sys
from mem import Memory, memory_usage, CycleFound, find_cycles, info, all_info
from memory_profiler import profile
logging.basicConfig(
format='%(asctime)-15s %(levelname)s %(filename)s:%(lineno)d %(message)s',
level=logging.DEBUG,
)
def make_big_list():
x = [1.0]
for i in range(6):
y = []
for _ in xrange(10):
y.extend(x)
x = y
return x
# http://stackoverflow.com/questions/23369937
"""
Long running Python jobs that consume a lot of memory while
running may not return that memory to the operating system
until the process actually terminates, even if everything
is garbage collected properly.
The solution I need for my problem is that FOR EACH PIECE,
I'll need to spawn a new process AND TERMINATE THAT PROCESS
when its work is done. I think that's the only way out of this
morass.
"""
def main():
for _ in xrange(5):
logging.debug(memory_usage())
logging.debug(all_info(gc.get_objects()))
x = make_big_list()
logging.debug(memory_usage())
logging.debug(all_info(gc.get_objects()))
del x
logging.debug(memory_usage())
logging.debug(all_info(gc.get_objects()))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment