Created
June 8, 2021 05:36
-
-
Save weixinfree/39d76f1ee71bfdea6c158bbd3d6a870e to your computer and use it in GitHub Desktop.
自动注入
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #! /usr/bin/env python3 | |
| """ | |
| install: | |
| pip3 install fire | |
| """ | |
| import re | |
| import fire | |
| import os | |
| from pathlib import Path | |
| from multiprocessing.dummy import Pool as ThreadPool | |
| import multiprocessing as mp | |
| from pprint import pprint | |
| from typing import NamedTuple, List, Iterable | |
| import pickle | |
| import time | |
| from collections import defaultdict | |
| from collections import Counter | |
| import math | |
| TRAIN_FILE = Path("./.dm_inject") | |
| def _field_name(name: str): | |
| return 'm' + name | |
| class InjectItem(NamedTuple): | |
| ij_type: str | |
| j_type: str | |
| ij_key: str | |
| def __str__(self): | |
| return f"{_field_name(self.j_type)} = {self.ij_type}({self.ij_key});" | |
| def _parse(file: Path) -> List[InjectItem]: | |
| content = file.read_text("utf-8") | |
| result: List[InjectItem] = [] | |
| add_ij = result.append | |
| injects = re.findall( | |
| r'(\w+)\s*=\s*(inject|injectOptional)\((.*?)\);', content) | |
| for item in injects: | |
| name, ij_type, unique_key = item | |
| try: | |
| _type = re.search(rf'(\w+)\s+{name}', content).group(1) | |
| add_ij(InjectItem(ij_type, _type, unique_key)) | |
| except Exception as e: | |
| continue | |
| return result | |
| class _Report: | |
| def __init__(self): | |
| self.index = 0 | |
| def rstart(self, desc: str): | |
| self.index += 1 | |
| print(f'>>> {self.index}. {desc}') | |
| def rstop(self, desc: str): | |
| print(f'<<< {self.index}. {desc}') | |
| def train(root: str): | |
| # 1. find presenter | |
| log = _Report() | |
| log.rstart("enumrating Presenter files.") | |
| r = Path(root) | |
| java_files = list(r.glob("**/*Presenter.java")) | |
| log.rstop(f"enumrating done. total {len(java_files)} Presenter files") | |
| log.rstart('start learning...') | |
| start = time.time() | |
| # 2. parse | |
| with ThreadPool(mp.cpu_count() * 2 + 1) as pool: | |
| items = pool.map(_parse, list(java_files)) | |
| cost = time.time() - start | |
| counter = Counter([ij for item in items for ij in item]) | |
| log.rstop(f'learing end. cost {cost:.2f} sec, learn {len(counter)} unique inject items.') | |
| # 3. save | |
| with open(TRAIN_FILE, mode='wb') as f: | |
| pickle.dump(counter, f) | |
| log.rstart('save learn result to disk finish.') | |
| def _load() -> List[InjectItem]: | |
| with open(TRAIN_FILE, mode='rb') as f: | |
| return pickle.load(f) | |
| def _match(left: str, right: str, verbose: bool) -> int: | |
| li, ri = 0, 0 | |
| ll = len(left) | |
| rl = len(right) | |
| while li < ll and ri < rl: | |
| if ri == rl - 1: | |
| break | |
| if left[li] == right[ri]: | |
| li += 1 | |
| ri += 1 | |
| else: | |
| li += 1 | |
| score = math.ceil((ri / (1 + rl - ri)) ** 5) | |
| if verbose: | |
| print(f"score: {left} vs {right} = {score}; ({li}-{ll}), ({ri}-{rl})") | |
| return score | |
| def inject(name: str, verbose: bool = False): | |
| name = ''.join(part.lower() for part in name.split()) | |
| ij_items = _load() | |
| """ | |
| ij play context | |
| ij router service | |
| """ | |
| def _score(item: InjectItem, name: str, count): | |
| left = item.j_type.lower() | |
| return _match(left, name, verbose) * count | |
| li = [(_score(item, name, count), item) | |
| for item, count in ij_items.items()] | |
| li.sort(reverse=True) | |
| _print_candidates(li[:5]) | |
| def _print_candidates(candidates: List): | |
| base = candidates[0][0] | |
| for item in candidates: | |
| score, ij = item | |
| print(f"{score / base * 100:3.0f}% >>> {ij}") | |
| def main(*names: List[str]): | |
| name = ' '.join(names) | |
| if name == 'train': | |
| path = input('>>> please input source code root dir: ') | |
| train(path.strip()) | |
| else: | |
| inject(name, verbose=False) | |
| if __name__ == '__main__': | |
| import fire | |
| fire.Fire(main) |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
自动查找inject