Skip to content

Instantly share code, notes, and snippets.

@weixinfree
Created June 8, 2021 05:36
Show Gist options
  • Select an option

  • Save weixinfree/39d76f1ee71bfdea6c158bbd3d6a870e to your computer and use it in GitHub Desktop.

Select an option

Save weixinfree/39d76f1ee71bfdea6c158bbd3d6a870e to your computer and use it in GitHub Desktop.
自动注入
#! /usr/bin/env python3
"""
install:
pip3 install fire
"""
import re
import fire
import os
from pathlib import Path
from multiprocessing.dummy import Pool as ThreadPool
import multiprocessing as mp
from pprint import pprint
from typing import NamedTuple, List, Iterable
import pickle
import time
from collections import defaultdict
from collections import Counter
import math
TRAIN_FILE = Path("./.dm_inject")
def _field_name(name: str):
return 'm' + name
class InjectItem(NamedTuple):
ij_type: str
j_type: str
ij_key: str
def __str__(self):
return f"{_field_name(self.j_type)} = {self.ij_type}({self.ij_key});"
def _parse(file: Path) -> List[InjectItem]:
content = file.read_text("utf-8")
result: List[InjectItem] = []
add_ij = result.append
injects = re.findall(
r'(\w+)\s*=\s*(inject|injectOptional)\((.*?)\);', content)
for item in injects:
name, ij_type, unique_key = item
try:
_type = re.search(rf'(\w+)\s+{name}', content).group(1)
add_ij(InjectItem(ij_type, _type, unique_key))
except Exception as e:
continue
return result
class _Report:
def __init__(self):
self.index = 0
def rstart(self, desc: str):
self.index += 1
print(f'>>> {self.index}. {desc}')
def rstop(self, desc: str):
print(f'<<< {self.index}. {desc}')
def train(root: str):
# 1. find presenter
log = _Report()
log.rstart("enumrating Presenter files.")
r = Path(root)
java_files = list(r.glob("**/*Presenter.java"))
log.rstop(f"enumrating done. total {len(java_files)} Presenter files")
log.rstart('start learning...')
start = time.time()
# 2. parse
with ThreadPool(mp.cpu_count() * 2 + 1) as pool:
items = pool.map(_parse, list(java_files))
cost = time.time() - start
counter = Counter([ij for item in items for ij in item])
log.rstop(f'learing end. cost {cost:.2f} sec, learn {len(counter)} unique inject items.')
# 3. save
with open(TRAIN_FILE, mode='wb') as f:
pickle.dump(counter, f)
log.rstart('save learn result to disk finish.')
def _load() -> List[InjectItem]:
with open(TRAIN_FILE, mode='rb') as f:
return pickle.load(f)
def _match(left: str, right: str, verbose: bool) -> int:
li, ri = 0, 0
ll = len(left)
rl = len(right)
while li < ll and ri < rl:
if ri == rl - 1:
break
if left[li] == right[ri]:
li += 1
ri += 1
else:
li += 1
score = math.ceil((ri / (1 + rl - ri)) ** 5)
if verbose:
print(f"score: {left} vs {right} = {score}; ({li}-{ll}), ({ri}-{rl})")
return score
def inject(name: str, verbose: bool = False):
name = ''.join(part.lower() for part in name.split())
ij_items = _load()
"""
ij play context
ij router service
"""
def _score(item: InjectItem, name: str, count):
left = item.j_type.lower()
return _match(left, name, verbose) * count
li = [(_score(item, name, count), item)
for item, count in ij_items.items()]
li.sort(reverse=True)
_print_candidates(li[:5])
def _print_candidates(candidates: List):
base = candidates[0][0]
for item in candidates:
score, ij = item
print(f"{score / base * 100:3.0f}% >>> {ij}")
def main(*names: List[str]):
name = ' '.join(names)
if name == 'train':
path = input('>>> please input source code root dir: ')
train(path.strip())
else:
inject(name, verbose=False)
if __name__ == '__main__':
import fire
fire.Fire(main)
@weixinfree
Copy link
Copy Markdown
Author

自动查找inject

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment