-
-
Save hhhaiai/dd9b919495248a2dfc48705bd8f27c21 to your computer and use it in GitHub Desktop.
vx
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# -*- coding: UTF-8 -*- | |
import csv | |
# 颜色代码 | |
RED = '\033[31m' # 红色 | |
GREEN = '\033[32m' # 绿色 | |
YELLOW = '\033[33m' # 黄色 | |
BLUE = '\033[34m' # 蓝色 | |
PURPLE = '\033[35m' # 紫色 | |
CYAN = '\033[36m' # 青色 | |
WHITE = '\033[37m' # 白色 | |
# 高亮色 | |
H_RED = '\033[91m' # 亮红色 | |
H_GREEN = '\033[92m' # 亮绿色 | |
H_YELLOW = '\033[93m' # 亮黄色 | |
H_BLUE = '\033[94m' # 亮蓝色 | |
H_PURPLE = '\033[95m' # 亮紫色 | |
H_CYAN = '\033[96m' # 亮青色 | |
# 背景色 | |
BG_RED = '\033[41m' | |
BG_GREEN = '\033[42m' | |
BG_YELLOW = '\033[43m' | |
BG_BLUE = '\033[44m' | |
BG_PURPLE = '\033[45m' | |
BG_CYAN = '\033[46m' | |
BG_WHITE = '\033[47m' | |
BG_BLACK = '\033[40m' | |
# 样式代码 | |
BOLD = '\033[1m' | |
ITALIC = '\033[3m' | |
UNDERLINE = '\033[4m' | |
# 默认样式(关闭所有样式) | |
DEFAULT = '\033[0m' | |
def wrap_text(text, color="", bg_color="", bold=False, italic=False): | |
if text == '' or len(text) < 1: | |
return text | |
style = "" | |
if bold: | |
style += BOLD | |
if italic: | |
style += ITALIC | |
return f"{bg_color}{style}{color}{text}{DEFAULT}" | |
def contain_list_by_index(file_path="", super_index=0, sub_index=0): | |
""" 基本描述 | |
判断某个文件中的,某个字段是否包含其他字段,如果不包含,那么打印行号 | |
Parameters | |
---------- | |
file_path : str | |
解析的文件路径 | |
super_index : int | |
包含的主体序号 | |
sub_index : int | |
被包含主体序号 | |
""" | |
if file_path == "": | |
print(f'传入的文件路径为空!') | |
return | |
if sub_index == super_index: | |
print("判断的包含和被不包含为同一个") | |
return | |
with open(file_path) as f: | |
reader = csv.reader(f) | |
# 跳过标题行 | |
header_row = next(reader) | |
logHeader = wrap_text("-----------------------------------------------------------", color=BLUE) | |
print(f'{logHeader}' | |
f'\r\n{wrap_text("判断文件:", color=BLUE, bold=True)} {wrap_text(f"{file_path}", color=BLUE)}' | |
f'\r\n{wrap_text("判断条件:", color=BLUE, bold=True)} {wrap_text(f"【{header_row[super_index]}】是否包含【{header_row[sub_index]}】", color=BLUE)}' | |
f'\r\n{logHeader}') | |
for line_number, row in enumerate(reader): | |
# print(f' line:{line_number} row: {row}') | |
one = row[super_index] | |
two = row[sub_index] | |
one_items = one.split(",") | |
two_items = two.split(",") | |
one_set = set(one_items) | |
two_set = set(two_items) | |
# 判断是否包含 | |
contain = two_set.issubset(one_set) | |
if not contain: | |
print(wrap_text( | |
f'文件{line_number}行, {header_row[super_index]}不包含{header_row[sub_index]},请检查!', | |
color=H_RED)) | |
pass | |
def contain_list_by_title(file_path, father_set_name="", sub_set_name=""): | |
""" 基本描述 | |
判断某个文件中的,某个字段是否包含其他字段,如果不包含,那么打印行号 | |
Parameters | |
---------- | |
file_path : str | |
解析的文件路径 | |
father_set_name : str | |
包含的列表的标题 | |
sub_set_name : str | |
被包含列表的标题 | |
""" | |
if file_path == "" or len(file_path) < 1: | |
print(f'传入的文件路径为空!') | |
return | |
if father_set_name == "" or len(father_set_name) < 1 or sub_set_name == "" or len(sub_set_name) < 1: | |
print(f'出入匹配的表头信息!') | |
return | |
with open(file_path) as f: | |
reader = csv.reader(f) | |
# 跳过标题行 | |
header_row = next(reader) | |
# 检测序列 | |
for line_number, item in enumerate(header_row): | |
print(f"{line_number}-->{item}") | |
if item == father_set_name: | |
super_index = line_number | |
elif item == sub_set_name: | |
sub_index = line_number | |
logHeader = wrap_text("-----------------------------------------------------------", color=BLUE) | |
print(f'{logHeader}' | |
f'\r\n{wrap_text("判断文件:", color=BLUE, bold=True)} {wrap_text(f"{file_path}", color=BLUE)}' | |
f'\r\n{wrap_text("判断条件:", color=BLUE, bold=True)} {wrap_text(f"【{header_row[super_index]}】是否包含【{header_row[sub_index]}】", color=BLUE)}' | |
f'\r\n{logHeader}') | |
for line_number, row in enumerate(reader): | |
# print(f' line:{line_number} row: {row}') | |
one = row[super_index] | |
two = row[sub_index] | |
one_items = one.split(",") | |
two_items = two.split(",") | |
one_set = set(one_items) | |
two_set = set(two_items) | |
# 判断是否包含 | |
contain = two_set.issubset(one_set) | |
if not contain: | |
print(wrap_text( | |
f'文件{line_number}行, {header_row[super_index]}不包含{header_row[sub_index]},请检查!', | |
color=H_RED)) | |
pass | |
def once_check(two_path, one_path, three_path): | |
# 2文件--> id:set(bq1,bq2...) | |
super_dict = {} | |
# 加载文件2 | |
with open(two_path) as f: | |
reader = csv.reader(f) | |
# 跳过标题行 | |
next(reader) | |
for line_number, row in enumerate(reader): | |
id = row[0] | |
twoBq = row[1] | |
if super_dict.__contains__(id): | |
print(wrap_text(f"【{two_path}】{line_number}行的ID:{id} 重复!", color=RED)) | |
else: | |
if ',' in twoBq: | |
super_dict[id] = set(twoBq.split(",")) | |
else: | |
super_dict[id] = set(twoBq) | |
# 加载文件一 | |
with open(one_path) as f: | |
reader = csv.reader(f) | |
# 跳过标题行 | |
next(reader) | |
for line_number, row in enumerate(reader): | |
id = row[0] | |
oneBq = row[1] | |
# 如果不包含 | |
if not super_dict.__contains__(id): | |
print(wrap_text(f"【{one_path}】{line_number}行的ID:{id} 表一数据,表二没有!!!", color=H_RED)) | |
else: | |
supser = super_dict[id] | |
if ',' in oneBq: | |
supser.difference_update(set(oneBq.split(","))) | |
super_dict[id] = supser | |
else: | |
supser.remove(oneBq) | |
super_dict[id] = supser | |
# 加载文件三 | |
with open(three_path) as f: | |
reader = csv.reader(f) | |
# 跳过标题行 | |
next(reader) | |
for line_number, row in enumerate(reader): | |
id = row[0] | |
tb1 = row[1] | |
tb2 = row[2] | |
tb3 = row[3] | |
tb4 = row[4] | |
tb5 = row[5] | |
tb6 = row[6] | |
if not super_dict.__contains__(id): | |
print(wrap_text(f"【{three_path}】{line_number}行的ID:{id} 表三的数据,表二没有!!!", color=H_RED)) | |
else: | |
supser=super_dict[id] | |
try: | |
if tb1 != "" and len(tb1) > 0 and supser.__contains__(tb1): | |
supser.remove(tb1) | |
super_dict[id] =supser | |
except AttributeError: | |
pass | |
try: | |
if tb2 != "" and len(tb2) > 0 and supser.__contains__(tb2): | |
supser.remove(tb2) | |
super_dict[id] =supser | |
except AttributeError: | |
pass | |
try: | |
if tb3 != "" and len(tb3) > 0 and supser.__contains__(tb3): | |
supser.remove(tb3) | |
super_dict[id] =supser | |
except AttributeError: | |
pass | |
try: | |
if tb4 != "" and len(tb4) > 0 and supser.__contains__(tb4): | |
supser.remove(tb4) | |
super_dict[id] =supser | |
except AttributeError: | |
pass | |
try: | |
if tb5 != "" and len(tb5) > 0 and supser.__contains__(tb5): | |
supser.remove(tb5) | |
super_dict[id] =supser | |
except AttributeError: | |
pass | |
try: | |
if tb6 != "" and len(tb6) > 0 and supser.__contains__(tb6): | |
supser.remove(tb6) | |
super_dict[id] =supser | |
except AttributeError: | |
pass | |
# 遍历字典 | |
for k, v in super_dict.items(): | |
if len(v) > 0: | |
print(k) | |
pass | |
if __name__ == '__main__': | |
contain_list_by_title('/Users/sanbo/Desktop/data.csv', father_set_name="标签", sub_set_name="产品") | |
once_check("file2", "file1", "file3") |
Author
hhhaiai
commented
Sep 1, 2023
•
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment