Skip to content

Instantly share code, notes, and snippets.

@zjjott
Created January 24, 2017 02:51
Show Gist options
  • Save zjjott/f7fa0e87b0af92317de66a144f164af3 to your computer and use it in GitHub Desktop.
Save zjjott/f7fa0e87b0af92317de66a144f164af3 to your computer and use it in GitHub Desktop.
convert git HEAD blame of every file to database,for statistics
# coding=utf-8
"""
convert git HEAD blame of every file to database
using:
$ python repo_blame.py --help
$ python repo_blame.py --uri mysql://USER:PASSWD@HOST/DATABASE --repo="/home/zhutou/work/git/linux"
# requirements:
pip install peewee==2.8.5 GitPython==2.1.1
# using mysql:
pip install MySQLdb
"""
from __future__ import unicode_literals, print_function
from argparse import ArgumentParser
from peewee import (Model, CharField,
Proxy,
DateTimeField, IntegerField)
from playhouse.db_url import connect
from datetime import datetime
from os.path import join
import os
from git import Repo
from binascii import b2a_hex
import re
from multiprocessing import Pool
proxy = Proxy()
xrange_re = re.compile(r"xrange\((?P<start>\d+), (?P<end>\d+)\)")
class Blame(Model):
filename = CharField()
updated_at = DateTimeField()
lineno_start = IntegerField()
lineno_end = IntegerField()
author = CharField() # email
commit_id = CharField(max_length=8)
class Meta:
database = proxy
def get_blame(repo, filepath):
attr_list = []
for entry in repo.blame_incremental(
"HEAD", filepath):
commit = entry.commit
filepath = entry.orig_path
commit_id, email, committed_date = (b2a_hex(commit.binsha),
commit.author.email,
commit.committed_date)
# xrange have no attrs?
linenos_str = str(entry.linenos)
match = xrange_re.match(linenos_str)
attr_dict = match.groupdict()
linostart, linoend = (int(
attr_dict['start']),
int(attr_dict['end']) - 1)
attr_list.append({
"filename": filepath,
"updated_at": datetime.fromtimestamp(committed_date),
"lineno_start": linostart,
"lineno_end": linoend,
"author": email,
"commit_id": commit_id[:8],
})
print("blame done", filepath)
Blame.insert_many(attr_list
).execute()
class Scanner(object):
def print_usage(self, args):
print(__doc__)
db = connect(args.uri)
print("create tables:", db.get_tables())
def prepare_model(self, args):
db = connect(args.uri)
proxy.initialize(db)
need_create = []
created_table = db.get_tables()
if "blame" not in created_table:
need_create.append(Blame)
if need_create:
db.create_tables([Blame])
return db
def save_callback(self, result):
print("save done", result)
def append_record(self, attr, enforce=False):
if not enforce:
self.attr_list.append(attr)
if len(self.attr_list) > 100 or enforce:
with self.db.atomic():
print("save to database", len(self.attr_list))
Blame.insert_many(self.attr_list
).execute()
# print "self.attr_list"
self.attr_list = []
def main(self, args):
self.db = self.prepare_model(args)
filepath = join(args.repo, "README.md")
repo = Repo(args.repo)
files = []
executer = Pool(args.n)
self.attr_list = []
for root, dirs, files in os.walk(args.repo):
if ".git" in root:
continue
for name in files:
if name.upper() == name:
continue
filepath = "%s/%s" % (root, name)
executer.apply_async(get_blame,
[repo, filepath]
)
print("task sub finish,wait worker finish")
executer.close()
executer.join()
if __name__ == '__main__':
parser = ArgumentParser()
parser.add_argument("--uri",
metavar="DATABASE_URI",
default="sqlite:///repo.db",
help="""Database url,example: \n
sqlite:///my_database.db will create a SqliteDatabase instance for the file my_database.db in the current directory.\n
sqlite:///:memory: will create an in-memory SqliteDatabase instance.\n
postgresql://postgres:my_password@localhost:5432/my_database will create a PostgresqlDatabase instance. A username and password are provided, as well as the host and port to connect to.\n
mysql://user:passwd@ip:port/my_db will create a MySQLDatabase instance for the local MySQL database my_db.""")
parser.add_argument("-p", action="store_true", help="print requirements")
parser.add_argument("-n",
default=1,
type=int,
help="multiprocess process blame. WARNING: not support sqlite database.")
parser.add_argument("--repo",
metavar="REPO_PATH",
help="repo root directory")
args = parser.parse_args()
scan = Scanner()
if args.p:
scan.print_usage(args)
else:
scan.main(args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment