Last active
April 4, 2017 20:53
-
-
Save noamraph/c933b32deb8304ac7ccd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from __future__ import division, print_function | |
import sys | |
import os | |
from os.path import join, abspath, exists, expanduser | |
from contextlib import contextmanager | |
import hashlib | |
import cStringIO | |
import json | |
from collections import OrderedDict | |
from pylint.lint import PyLinter, Run | |
import pylint.utils | |
from pylint.checkers.imports import ImportsChecker | |
from pylint.interfaces import UNDEFINED | |
from astroid.bases import NodeNG, YES | |
from astroid.node_classes import EmptyNode | |
from astroid import MANAGER | |
DEFAULT_CACHE_DIR = expanduser('~/.cache/pylint') | |
def hexdigest(s): | |
return hashlib.sha1(s).hexdigest() | |
def depyc(fn): | |
return fn[:-1] if fn.endswith('.pyc') else fn | |
def touch(fn, times=None): | |
with open(fn, 'a'): | |
os.utime(fn, times) | |
# The cache is a record of each module that passed cleanly. | |
# For each such module, we create a file in DEFAULT_CACHE_DIR. Its name is the | |
# sha1 of '\0'.join([cache_stamp, modname, filename, sha1]). | |
# cache_stamp is a unique function of the configuration. | |
# modname is the name of the module which passed. | |
# filename is the module filename, and sha1 is the hash of its content. | |
# The content of each such file is a json which includes cache_stamp, modname, | |
# filename and sha1, and in addition, 'dependencies'. This is a list of | |
# (modname, filename, sha1) tuples for each dependency. | |
# The files are touched whenever a cache is hit, so older entries can be | |
# removed. | |
def get_cache_stamp(linter): | |
""" | |
Return a value that should change if the cache is invalidated. | |
It includes the sha1 of generate_config() and of this file. | |
""" | |
f = cStringIO.StringIO() | |
linter.generate_config(f) | |
config = f.getvalue() | |
config_hash = hexdigest(config) | |
mysrc = open(depyc(__file__)).read() | |
src_hash = hexdigest(mysrc) | |
return 'executable:{} pylint:{} cached_pylint:{} config:{}'.format( | |
sys.executable, pylint.__version__, src_hash, config_hash) | |
def get_cache_entry_fn(cache_dir, cache_stamp, modname, fn, sha1): | |
return join(cache_dir, | |
hexdigest('\0'.join([cache_stamp, modname, fn, sha1]))) | |
def get_file_sha1(fn, sha1_cache={}): | |
""" | |
Get sha1 of the content of a file. Store in memory cache for future calls | |
""" | |
fn = abspath(fn) | |
try: | |
return sha1_cache[fn] | |
except KeyError: | |
r = hexdigest(open(fn).read()) | |
sha1_cache[fn] = r | |
return r | |
@contextmanager | |
def record_infer_depends(depends): | |
""" | |
Monkey-patch inference to update a set of dependencies. | |
depends should be a dict. It will be updated so depends[dependant_mod] will | |
be a set of dependencies. | |
""" | |
orig_infer = NodeNG.infer | |
assert orig_infer.im_func.func_globals['__name__'] == 'astroid.bases' # pylint: disable=no-member | |
def infer(self, context=None, **kwargs): | |
self_mod = self.root().name if not isinstance(self, EmptyNode) else None | |
for inode in orig_infer(self, context, **kwargs): | |
if self_mod is not None and inode is not YES: | |
depends.setdefault(self_mod, set()).add(inode.root().name) | |
yield inode | |
NodeNG.infer = infer | |
try: | |
yield | |
finally: | |
NodeNG.infer = orig_infer | |
def remove_init(modname): | |
suf = '.__init__' | |
return modname[:-len(suf)] if modname.endswith(suf) else modname | |
class CachedPyLinter(PyLinter): | |
def __init__(self, *args, **kwargs): | |
self.cache_dir = kwargs.pop('cache_dir', DEFAULT_CACHE_DIR) | |
if not exists(self.cache_dir): | |
os.makedirs(self.cache_dir) | |
PyLinter.__init__(self, *args, **kwargs) | |
self._cache_stamp = None | |
# Used for tracking check operations | |
self.analyzed_modnames = None | |
self.nopass_modnames = None | |
self.dependencies = None | |
def check(self, files_or_modules): | |
"""main checking entry: check a list of files or modules from their | |
name. | |
""" | |
self.analyzed_modnames = set() | |
self.nopass_modnames = set() | |
self.dependencies = {} | |
with record_infer_depends(self.dependencies): | |
PyLinter.check(self, files_or_modules) | |
self.update_pass_cache() | |
@property | |
def cache_stamp(self): | |
if self._cache_stamp is None: | |
self._cache_stamp = get_cache_stamp(self) | |
return self._cache_stamp | |
def set_current_module(self, modname, filepath=None): | |
realmodname = remove_init(modname) | |
self.analyzed_modnames.add(realmodname) | |
print(realmodname) | |
return PyLinter.set_current_module(self, modname, filepath) | |
def add_message(self, msg_descr, line=None, node=None, args=None, | |
confidence=UNDEFINED): | |
# We wrap PyLinter.add_message in order to update self.nopass_modnames | |
msgid = self.msgs_store.check_message_id(msg_descr).msgid | |
if line is None and node is not None: | |
line = node.fromlineno | |
if self.is_message_enabled(msgid, line, confidence): | |
# The same test is done in PyLinter.add_message - we want to know | |
# if a message is displayed | |
modname = (self.current_name if node is None | |
else pylint.utils.get_module_and_frameid(node)[0]) | |
#import pdb; pdb.set_trace() | |
self.nopass_modnames.add(modname) | |
PyLinter.add_message(self, msg_descr, line, node, args, confidence) | |
def should_analyze_file(self, modname, path): | |
realmodname = remove_init(modname) | |
path = abspath(path) | |
sha1 = get_file_sha1(path) | |
cache_entry_fn = get_cache_entry_fn( | |
self.cache_dir, self.cache_stamp, realmodname, path, sha1) | |
if exists(cache_entry_fn): | |
d = json.load(open(cache_entry_fn)) | |
if all(get_file_sha1(fn) == sha1 | |
for _modname, fn, sha1 in d['dependencies']): | |
touch(cache_entry_fn) | |
return False | |
# Else | |
return PyLinter.should_analyze_file(self, modname, path) | |
def get_imports_checker(self): | |
return next(checker for checker in self.get_checkers() | |
if isinstance(checker, ImportsChecker)) | |
def prepare_checkers(self): | |
# Wrap PyLinter.prepare_checkers to make sure ImportsChecker is enabled | |
neededcheckers = PyLinter.prepare_checkers(self) | |
impchecker = self.get_imports_checker() | |
if impchecker not in neededcheckers: | |
neededcheckers.append(impchecker) | |
neededcheckers.sort(key=lambda chk: chk.priority, reverse=True) | |
return neededcheckers | |
def update_dependencies_from_imports(self): | |
# Update dependencies with imports. This is needed because if A imports | |
# something from B that B imported from C, record_infer_depends() will | |
# only record that A depends on C. | |
impchecker = self.get_imports_checker() | |
for provider_mod, user_mods in impchecker.stats['dependencies'].iteritems(): | |
if provider_mod not in MANAGER.astroid_cache: | |
# It seems that ImportsChecker also includes functions, eg. | |
# "from tools import offset_slice_tuple" | |
# So we just check if a module is in astroid_cache. | |
continue | |
for user_mod in user_mods: | |
self.dependencies.setdefault(user_mod, set()).add(provider_mod) | |
def update_pass_cache(self): | |
self.update_dependencies_from_imports() | |
pass_modnames = self.analyzed_modnames - self.nopass_modnames | |
for modname in pass_modnames: | |
d = OrderedDict() | |
d['cache_stamp'] = self.cache_stamp | |
d['modname'] = modname | |
d['filename'] = filename = abspath(MANAGER.astroid_cache[modname].file) | |
d['sha1'] = sha1 = get_file_sha1(filename) | |
dependencies = [] | |
for depname in self.dependencies.get(modname, ()): | |
if (depname == modname or depname == '' | |
or depname not in MANAGER.astroid_cache): | |
# I have seen the non-existent 'str' module. Whatever. | |
continue | |
fn = MANAGER.astroid_cache[depname].file | |
if fn is None: | |
# for built-in modules | |
continue | |
fn = abspath(fn) | |
dependencies.append((depname, fn, get_file_sha1(fn))) | |
d['dependencies'] = dependencies | |
cache_entry_fn = get_cache_entry_fn( | |
self.cache_dir, self.cache_stamp, modname, filename, sha1) | |
write_fn = cache_entry_fn + '.write' | |
with open(write_fn, 'w') as f: | |
json.dump(d, f, indent=2) | |
os.rename(write_fn, cache_entry_fn) | |
def main(): | |
class CachedRun(Run): | |
LinterClass = CachedPyLinter | |
run = CachedRun(sys.argv[1:], exit=False) | |
return run.linter.msg_status | |
if __name__ == '__main__': | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Just spotted this, wondering if it got any further?