Last active
November 16, 2024 11:57
-
-
Save EvilFreelancer/394d161181977b9ddd227877125a3f3d to your computer and use it in GitHub Desktop.
Пример базы знаний для RAG-like проекта который использует ManticoreSearch SNIPPETS CALL и выдачу из DuckDuckGo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import time | |
import re | |
import requests | |
import hashlib | |
import mysql.connector | |
from duckduckgo_search import DDGS | |
from concurrent.futures import ThreadPoolExecutor | |
class HtmlExtractor: | |
def __init__(self): | |
self.filters = ["подпис", "facebook", "vk.com", "twiter"] | |
def extract_content(self, html): | |
pattern = r'<(p|pre|blockquote|h[1-6]|article)(.*?)>(.*?)<\/\1>' | |
matches = re.findall(pattern, html, re.DOTALL) | |
filtered_matches = [ | |
(tag, attrs, content) for tag, attrs, content in matches | |
if not any(substring.lower() in content.lower() for substring in self.filters) | |
] | |
result = ''.join([f'<{tag}{attrs}>{content}</{tag}>' for tag, attrs, content in filtered_matches]) | |
return result | |
class WebScraper: | |
def __init__(self, cache_dir='cache'): | |
self.cache_dir = cache_dir | |
os.makedirs(cache_dir, exist_ok=True) | |
def hash_url(self, url): | |
return hashlib.md5(url.encode()).hexdigest() | |
def fetch_html(self, url, callback): | |
link_hash = self.hash_url(url) | |
file_path = os.path.join(self.cache_dir, link_hash) | |
if os.path.exists(file_path): | |
with open(file_path, 'r', encoding='utf-8') as file: | |
html_content = file.read().split('\n', 1)[1] | |
return file_path | |
try: | |
response = requests.get(url, headers={ | |
'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 YaBrowser/24.7.0.0 Safari/537.36"}, | |
timeout=(5, 10)) | |
response.raise_for_status() | |
html_content = response.text | |
html_content = re.sub(r'<symbol.*?>.*?</symbol>', '', html_content, flags=re.DOTALL) | |
html_content = re.sub(r'<style.*?>.*?</style>', '', html_content, flags=re.DOTALL) | |
html_content = re.sub(r'<script.*?>.*?</script>', '', html_content, flags=re.DOTALL) | |
html_content = re.sub(r'<path.*?>.*?</path>', '', html_content, flags=re.DOTALL) | |
with open(file_path, 'w', encoding='utf-8') as file: | |
file.write(f"{url}\n{html_content}") | |
print(f'Saved: {file_path}') | |
except Exception as e: | |
with open(file_path, 'w', encoding='utf-8') as file: | |
file.write(f"{url}\n") | |
print(f'Failed to fetch {url}: {e}') | |
return None | |
callback(html_content) | |
return file_path | |
def process_urls(self, urls, callback): | |
with ThreadPoolExecutor(max_workers=10) as executor: | |
future_to_url = {executor.submit(self.fetch_html, url, callback): url for url in urls} | |
cached_files = [future.result() for future in future_to_url if future.result()] | |
return cached_files | |
class SearchEngine: | |
def __init__(self, host='localhost', user='root', password='', database='test'): | |
self.connection = mysql.connector.connect( | |
host=host, | |
user=user, | |
password=password, | |
database=database | |
) | |
self.cursor = self.connection.cursor() | |
def call_snippets(self, file_paths, index_name, query): | |
files_str = ','.join(f"'{path}'" for path in file_paths) | |
self.cursor.execute(f"""CALL SNIPPETS(({files_str}), 'my_index', '{query}', | |
1 AS weight_order, | |
'strip' AS html_strip_mode, | |
'' AS before_match, | |
'' AS after_match, | |
'|' AS snippet_separator, | |
'zone' AS snippet_boundary, | |
150 AS around, | |
30000 AS limit)""") | |
snippets = self.cursor.fetchall() | |
return snippets | |
def close(self): | |
self.cursor.close() | |
self.connection.close() | |
def process_html(html_content, extractor): | |
cleaned_content = extractor.extract_content(html_content) | |
print(f'Processed HTML: {cleaned_content}') | |
def main(): | |
keyword = 'your_keyword_here' | |
cache_dir = 'cache' | |
db_host = 'localhost' | |
db_user = 'root' | |
db_password = '' | |
db_database = 'test' | |
ddg = DDGS() | |
results = ddg.text(keyword, max_results=10) | |
urls = [result['href'] for result in results] | |
extractor = HtmlExtractor() | |
scraper = WebScraper(cache_dir) | |
search_engine = SearchEngine(db_host, db_user, db_password, db_database) | |
try: | |
cached_files = scraper.process_urls(urls, lambda html: process_html(html, extractor)) | |
snippets = search_engine.call_snippets(cached_files, 'my_index', keyword) | |
print("SNIPPETS:", snippets) | |
for snippet in snippets: | |
rag = "".join(snippet) + "\n" | |
with open(f'{snippet[0]}.snipp', 'w', encoding='utf-8') as snippfile: | |
snippfile.write(rag) | |
finally: | |
search_engine.close() | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import time | |
import json | |
import re | |
import requests | |
import html | |
from duckduckgo_search import DDGS | |
import hashlib | |
import mysql.connector | |
def extract_content(html): | |
# pattern = r'<(p|table|a|ul|ol|code|pre|blockquote|h[1-6]|article)(.*?)>(.*?)<\/\1>' # Определяем регулярное выражение для поиска верхних уровней тегов | |
pattern = r'<(p|pre|blockquote|h[1-6]|article)(.*?)>(.*?)<\/\1>' # Определяем регулярное выражение для поиска верхних уровней тегов | |
matches = re.findall(pattern, html, re.DOTALL) # Находим все совпадения | |
filters = ["подпис", "facebook", "vk.com", "twiter"] # Фильтруем совпадения, проверяя наличие любой из подстрок | |
filtered_matches = [ | |
(tag, attrs, content) for tag, attrs, content in matches | |
if any(substring in content for substring in filters) | |
] | |
result = ''.join([f'<{tag}{attrs}>{content}</{tag}>' for tag, attrs, content in matches]) # Объединяем результаты в строку | |
return result | |
def get_serp(keyword, proxy=None): | |
# ddgs = DDGS(proxy=f"http://{username}:{password}@{proxy_url}", timeout=20).text(keyword, max_results=15) | |
timeout = 10 | |
while True: | |
try: | |
ddgs = DDGS(timeout=20).text(keyword, max_results=30) | |
break | |
except Exception as e: | |
print(f'pause {timeout} sec.') | |
time.sleep(timeout) # Задержка | |
timeout = timeout + 5 | |
big_html = '' | |
for item in ddgs: # Цикл для обращения к ссылкам | |
title = item['title'] | |
link = item['href'] | |
link_hash = hashlib.md5(link.encode()).hexdigest() | |
file_path = os.path.join('cache', link_hash) | |
if os.path.exists(file_path): | |
with open(file_path, 'r', encoding='utf-8') as file:# print(f'File already exists for link: {link}. Skipping download.') | |
html_content = file.read() | |
html_content = extract_content(html_content) | |
# print(html_content) | |
# exit() | |
big_html = big_html + "\n\n" + html_content | |
else: | |
try: | |
print(link) | |
response = requests.get(link, headers={'User-Agent': | |
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 YaBrowser/24.7.0.0 Safari/537.36"}, | |
timeout=(5, 10)) | |
response.raise_for_status() # Проверка на ошибки HTTP | |
html_content = response.text | |
html_content = re.sub(r'<symbol.*?>.*?</symbol>', '', html_content, flags=re.DOTALL) | |
html_content = re.sub(r'<style.*?>.*?</style>', '', html_content, flags=re.DOTALL) | |
html_content = re.sub(r'<script.*?>.*?</script>', '', html_content, flags=re.DOTALL) | |
html_content = re.sub(r'<path.*?>.*?</path>', '', html_content, flags=re.DOTALL) | |
# html_content = html.unescape(html_content) # коряво | |
with open(file_path, 'w', encoding='utf-8') as file: # Сохранение HTML-кода в файл | |
file.write(f"{link}\n{html_content}") | |
print(' Saved:') | |
html_content = extract_content(html_content) | |
# print(html_content) | |
# exit() | |
big_html = big_html + "\n\n" + html_content | |
except Exception as e: | |
with open(file_path, 'w', encoding='utf-8') as file: # Сохранение HTML-кода в файл | |
file.write(f"{link}\n") | |
print(f'Error fetching {link}: {e} ****') | |
print(f"LEN: {len(big_html)}") | |
connection = mysql.connector.connect(host='127.0.0.1', port='9306') | |
cursor = connection.cursor() # Создание курсора для выполнения операций с базой данных | |
#, '0' AS limits_per_field | |
cursor.execute("""CALL SNIPPETS(%s, 'my_index', %s, | |
1 AS weight_order, | |
'strip' AS html_strip_mode, | |
'' AS before_match, | |
'' AS after_match, | |
'|' AS snippet_separator, | |
'zone' AS snippet_boundary, | |
150 AS around, | |
30000 AS limit)""", (big_html, keyword)) | |
snippets = cursor.fetchall() | |
print("SNIPPETS:", snippets) | |
for snippet in snippets: | |
rag = "".join(snippet) + "\n" | |
with open(f'{i}.snipp', 'w', encoding='utf-8') as snippfile: # Сохранение rag в файл | |
snippfile.write(rag) | |
return rag |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Алгоритм проекта следующий:
cache
, в качестве именни файла используется хеш-функция на URLВ дальнейшем их можно использовать например в контексте LLM, прося модель отверить на пользовательский запрос используя полученные данные, или же можно использовать данный модуль как внешнюю функциию в паре с function call моделью, или на этапе обучения.
Про CALL SNIPPETS можно будет почитать вот тут:
https://manual.manticoresearch.com/Searching/Highlighting#CALL-SNIPPETS
Пусть относительно которого должна распологаться директория с кешем указывается через параметр
snippets_file_prefix
, если путь оканчивается на слеш '/' это директория, если без слеша то префикс имени файла с путём.https://manual.manticoresearch.com/Creating_a_table/Creating_a_distributed_table/Remote_tables#snippets_file_prefix