Skip to content

Instantly share code, notes, and snippets.

@EvilFreelancer
Last active November 16, 2024 11:57
Show Gist options
  • Save EvilFreelancer/394d161181977b9ddd227877125a3f3d to your computer and use it in GitHub Desktop.
Save EvilFreelancer/394d161181977b9ddd227877125a3f3d to your computer and use it in GitHub Desktop.
Пример базы знаний для RAG-like проекта который использует ManticoreSearch SNIPPETS CALL и выдачу из DuckDuckGo
import os
import time
import re
import requests
import hashlib
import mysql.connector
from duckduckgo_search import DDGS
from concurrent.futures import ThreadPoolExecutor
class HtmlExtractor:
def __init__(self):
self.filters = ["подпис", "facebook", "vk.com", "twiter"]
def extract_content(self, html):
pattern = r'<(p|pre|blockquote|h[1-6]|article)(.*?)>(.*?)<\/\1>'
matches = re.findall(pattern, html, re.DOTALL)
filtered_matches = [
(tag, attrs, content) for tag, attrs, content in matches
if not any(substring.lower() in content.lower() for substring in self.filters)
]
result = ''.join([f'<{tag}{attrs}>{content}</{tag}>' for tag, attrs, content in filtered_matches])
return result
class WebScraper:
def __init__(self, cache_dir='cache'):
self.cache_dir = cache_dir
os.makedirs(cache_dir, exist_ok=True)
def hash_url(self, url):
return hashlib.md5(url.encode()).hexdigest()
def fetch_html(self, url, callback):
link_hash = self.hash_url(url)
file_path = os.path.join(self.cache_dir, link_hash)
if os.path.exists(file_path):
with open(file_path, 'r', encoding='utf-8') as file:
html_content = file.read().split('\n', 1)[1]
return file_path
try:
response = requests.get(url, headers={
'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 YaBrowser/24.7.0.0 Safari/537.36"},
timeout=(5, 10))
response.raise_for_status()
html_content = response.text
html_content = re.sub(r'<symbol.*?>.*?</symbol>', '', html_content, flags=re.DOTALL)
html_content = re.sub(r'<style.*?>.*?</style>', '', html_content, flags=re.DOTALL)
html_content = re.sub(r'<script.*?>.*?</script>', '', html_content, flags=re.DOTALL)
html_content = re.sub(r'<path.*?>.*?</path>', '', html_content, flags=re.DOTALL)
with open(file_path, 'w', encoding='utf-8') as file:
file.write(f"{url}\n{html_content}")
print(f'Saved: {file_path}')
except Exception as e:
with open(file_path, 'w', encoding='utf-8') as file:
file.write(f"{url}\n")
print(f'Failed to fetch {url}: {e}')
return None
callback(html_content)
return file_path
def process_urls(self, urls, callback):
with ThreadPoolExecutor(max_workers=10) as executor:
future_to_url = {executor.submit(self.fetch_html, url, callback): url for url in urls}
cached_files = [future.result() for future in future_to_url if future.result()]
return cached_files
class SearchEngine:
def __init__(self, host='localhost', user='root', password='', database='test'):
self.connection = mysql.connector.connect(
host=host,
user=user,
password=password,
database=database
)
self.cursor = self.connection.cursor()
def call_snippets(self, file_paths, index_name, query):
files_str = ','.join(f"'{path}'" for path in file_paths)
self.cursor.execute(f"""CALL SNIPPETS(({files_str}), 'my_index', '{query}',
1 AS weight_order,
'strip' AS html_strip_mode,
'' AS before_match,
'' AS after_match,
'|' AS snippet_separator,
'zone' AS snippet_boundary,
150 AS around,
30000 AS limit)""")
snippets = self.cursor.fetchall()
return snippets
def close(self):
self.cursor.close()
self.connection.close()
def process_html(html_content, extractor):
cleaned_content = extractor.extract_content(html_content)
print(f'Processed HTML: {cleaned_content}')
def main():
keyword = 'your_keyword_here'
cache_dir = 'cache'
db_host = 'localhost'
db_user = 'root'
db_password = ''
db_database = 'test'
ddg = DDGS()
results = ddg.text(keyword, max_results=10)
urls = [result['href'] for result in results]
extractor = HtmlExtractor()
scraper = WebScraper(cache_dir)
search_engine = SearchEngine(db_host, db_user, db_password, db_database)
try:
cached_files = scraper.process_urls(urls, lambda html: process_html(html, extractor))
snippets = search_engine.call_snippets(cached_files, 'my_index', keyword)
print("SNIPPETS:", snippets)
for snippet in snippets:
rag = "".join(snippet) + "\n"
with open(f'{snippet[0]}.snipp', 'w', encoding='utf-8') as snippfile:
snippfile.write(rag)
finally:
search_engine.close()
if __name__ == '__main__':
main()
import os
import time
import json
import re
import requests
import html
from duckduckgo_search import DDGS
import hashlib
import mysql.connector
def extract_content(html):
# pattern = r'<(p|table|a|ul|ol|code|pre|blockquote|h[1-6]|article)(.*?)>(.*?)<\/\1>' # Определяем регулярное выражение для поиска верхних уровней тегов
pattern = r'<(p|pre|blockquote|h[1-6]|article)(.*?)>(.*?)<\/\1>' # Определяем регулярное выражение для поиска верхних уровней тегов
matches = re.findall(pattern, html, re.DOTALL) # Находим все совпадения
filters = ["подпис", "facebook", "vk.com", "twiter"] # Фильтруем совпадения, проверяя наличие любой из подстрок
filtered_matches = [
(tag, attrs, content) for tag, attrs, content in matches
if any(substring in content for substring in filters)
]
result = ''.join([f'<{tag}{attrs}>{content}</{tag}>' for tag, attrs, content in matches]) # Объединяем результаты в строку
return result
def get_serp(keyword, proxy=None):
# ddgs = DDGS(proxy=f"http://{username}:{password}@{proxy_url}", timeout=20).text(keyword, max_results=15)
timeout = 10
while True:
try:
ddgs = DDGS(timeout=20).text(keyword, max_results=30)
break
except Exception as e:
print(f'pause {timeout} sec.')
time.sleep(timeout) # Задержка
timeout = timeout + 5
big_html = ''
for item in ddgs: # Цикл для обращения к ссылкам
title = item['title']
link = item['href']
link_hash = hashlib.md5(link.encode()).hexdigest()
file_path = os.path.join('cache', link_hash)
if os.path.exists(file_path):
with open(file_path, 'r', encoding='utf-8') as file:# print(f'File already exists for link: {link}. Skipping download.')
html_content = file.read()
html_content = extract_content(html_content)
# print(html_content)
# exit()
big_html = big_html + "\n\n" + html_content
else:
try:
print(link)
response = requests.get(link, headers={'User-Agent':
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 YaBrowser/24.7.0.0 Safari/537.36"},
timeout=(5, 10))
response.raise_for_status() # Проверка на ошибки HTTP
html_content = response.text
html_content = re.sub(r'<symbol.*?>.*?</symbol>', '', html_content, flags=re.DOTALL)
html_content = re.sub(r'<style.*?>.*?</style>', '', html_content, flags=re.DOTALL)
html_content = re.sub(r'<script.*?>.*?</script>', '', html_content, flags=re.DOTALL)
html_content = re.sub(r'<path.*?>.*?</path>', '', html_content, flags=re.DOTALL)
# html_content = html.unescape(html_content) # коряво
with open(file_path, 'w', encoding='utf-8') as file: # Сохранение HTML-кода в файл
file.write(f"{link}\n{html_content}")
print(' Saved:')
html_content = extract_content(html_content)
# print(html_content)
# exit()
big_html = big_html + "\n\n" + html_content
except Exception as e:
with open(file_path, 'w', encoding='utf-8') as file: # Сохранение HTML-кода в файл
file.write(f"{link}\n")
print(f'Error fetching {link}: {e} ****')
print(f"LEN: {len(big_html)}")
connection = mysql.connector.connect(host='127.0.0.1', port='9306')
cursor = connection.cursor() # Создание курсора для выполнения операций с базой данных
#, '0' AS limits_per_field
cursor.execute("""CALL SNIPPETS(%s, 'my_index', %s,
1 AS weight_order,
'strip' AS html_strip_mode,
'' AS before_match,
'' AS after_match,
'|' AS snippet_separator,
'zone' AS snippet_boundary,
150 AS around,
30000 AS limit)""", (big_html, keyword))
snippets = cursor.fetchall()
print("SNIPPETS:", snippets)
for snippet in snippets:
rag = "".join(snippet) + "\n"
with open(f'{i}.snipp', 'w', encoding='utf-8') as snippfile: # Сохранение rag в файл
snippfile.write(rag)
return rag
@EvilFreelancer
Copy link
Author

EvilFreelancer commented Nov 16, 2024

Алгоритм проекта следующий:

  1. Запрос пользователя передаётся на вход DuckDuckGo через API
  2. Ответ считываюется, из него извлекаются URL
  3. Пачка полученных URL передаётся на вход асинхронного HTTP-клиента, который скачивает HTML
  4. Каждый HTML обрабатывается callback-функций отделяющей текст от HTML
  5. Далее полученный текст сохраняется в папку cache, в качестве именни файла используется хеш-функция на URL
  6. Далее в MantricoSearch Engine отправляется CALL SNIPPETS, в котором перечислен список сохраненных в кеше файлов и оригинальный запрос пользователя из первого шага
  7. Полученные снипеды выводятся на экран

В дальнейшем их можно использовать например в контексте LLM, прося модель отверить на пользовательский запрос используя полученные данные, или же можно использовать данный модуль как внешнюю функциию в паре с function call моделью, или на этапе обучения.


Про CALL SNIPPETS можно будет почитать вот тут:

https://manual.manticoresearch.com/Searching/Highlighting#CALL-SNIPPETS

Пусть относительно которого должна распологаться директория с кешем указывается через параметр snippets_file_prefix, если путь оканчивается на слеш '/' это директория, если без слеша то префикс имени файла с путём.

https://manual.manticoresearch.com/Creating_a_table/Creating_a_distributed_table/Remote_tables#snippets_file_prefix

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment