Created
October 14, 2023 14:50
-
-
Save fsndzomga/7587b6b1af26785c555d0f1a69e67db8 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# 1. Grouping and organizing imports | |
from typing import Any | |
import os | |
import time | |
import warnings | |
from config import OPENAI_API_KEY, USER, ELASTIC_PASSWORD | |
from pdf2image import convert_from_path | |
import pytesseract | |
from anonLLM.llm import OpenaiLanguageModel | |
import chromadb | |
import fitz | |
from elasticsearch import Elasticsearch | |
# 2. Separate configurations | |
os.environ['OPENAI_API_KEY'] = OPENAI_API_KEY | |
warnings.simplefilter(action='ignore', category=Warning) | |
# 3. Extract classes and functions | |
class ElasticsearchClient: | |
def __init__(self): | |
self.client = Elasticsearch( | |
hosts="https://localhost:9200", | |
http_auth=(USER, ELASTIC_PASSWORD), | |
verify_certs=False | |
) | |
def index_texts(self, texts, index_name): | |
for i, text in enumerate(texts): | |
self.client.index(index=index_name, id=i + 1, body={"content": text}) | |
def search(self, index_name, query): | |
body = { | |
"query": { | |
"bool": { | |
"must": [{"match": {"content": query}}], | |
} | |
} | |
} | |
return self.client.search(index=index_name, body=body)['hits']['hits'] | |
def top5_es(data): | |
sorted_data = sorted(data, key=lambda x: x['_score'], reverse=True) | |
top_5_contents = [entry['_source']['content'] for entry in sorted_data[:5]] | |
return "\n\n".join(top_5_contents) | |
class Responder(): | |
def __init__(self, collection, index_name) -> None: | |
self.llm = OpenaiLanguageModel(anonymize=False) | |
self.collection = collection | |
self.index_name = index_name | |
def __call__(self, question) -> Any: | |
results_chroma = self.collection.query( | |
query_texts=["This is a query document"], | |
n_results=5 | |
) | |
results_es = elasticsearch_client.search(self.index_name, question) | |
# Joining all the text content of documents into one string | |
merged_text_chroma = ' '.join([doc for doc in results_chroma['documents'][0]]) | |
merged_text_es = top5_es(results_es) | |
merged_text = f"{merged_text_es} {merged_text_chroma}" | |
prompt = f""" | |
Answer this question: {question}, using these informations from the document: {merged_text} | |
""" | |
response = self.llm.generate(prompt) | |
return response | |
def Backend(pdf_path): | |
pdf_name = os.path.basename(pdf_path) | |
index_name = os.path.splitext(pdf_name)[0] # remove extension | |
if index_name not in chroma_client.list_collections(): | |
collection = chroma_client.create_collection(name=index_name) | |
# wait a moment for the collection to be fully initialized | |
time.sleep(1) | |
# First, try to extract text directly from the PDF | |
pdf_document = fitz.open(pdf_path) | |
texts = [] | |
for page_number in range(len(pdf_document)): | |
page = pdf_document.load_page(page_number) | |
texts.append(page.get_text()) | |
pdf_document.close() | |
# If no text is extracted, fall back to the OCR approach | |
if not any(texts): | |
texts = [] | |
# Convert PDF to list of images | |
images = convert_from_path(pdf_path, output_folder='images') | |
# Extract text from each image | |
texts = [] | |
for image in images: | |
extracted_text = pytesseract.image_to_string(image) | |
texts.append(extracted_text) | |
collection.add(documents=texts, | |
metadatas=[{"source": f"page_{i+1}"} for i in range(len(texts))], | |
ids=[f"id{i+1}" for i in range(len(texts))]) | |
elasticsearch_client.index_texts(texts, index_name) | |
collection = chroma_client.get_collection(index_name) | |
responder = Responder(collection, index_name) | |
return responder | |
def interact_with_pdf(pdf_path: str): | |
responder = Backend(pdf_path) | |
while True: | |
question = input("\nEnter your question about the document or type quit:\n\n") | |
if question == 'quit': | |
break | |
print("\n", responder(question)) | |
# 4. Separate UI from core logic | |
def main(): | |
pdf_name = input("What is the name of the PDF file you want to chat with ?\n\n") | |
if not pdf_name.lower().endswith('.pdf'): | |
pdf_name += '.pdf' | |
pdf_folder = 'pdfs' | |
pdf_path = os.path.join(pdf_folder, pdf_name) | |
interact_with_pdf(pdf_path) | |
if __name__ == "__main__": | |
chroma_client = chromadb.Client() | |
elasticsearch_client = ElasticsearchClient() | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment