Last active
February 20, 2024 13:36
-
-
Save aaronjolson/86e021d662a9b0b41eacccdcd4ab5485 to your computer and use it in GitHub Desktop.
Code for using llama-index to load github data into the activeloop deeplake vector database. Originally from this course https://learn.activeloop.ai/courses/take/rag/multimedia/51349127-chat-with-your-code-llamaindex-and-activeloop-deep-lake-for-github-repositories this code has the imports modified to work with the latest version of llama-index
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' in .env file | |
GITHUB_TOKEN="YOUR_GH_CLASSIC_TOKEN" | |
OPENAI_API_KEY="YOUR_OPENAI_KEY" | |
ACTIVELOOP_TOKEN="YOUR_ACTIVELOOP_TOKEN" | |
DATASET_PATH="hub://YOUR_ORG/repository_vector_store" | |
need to install llama-index >= 0.10.0, python-dotenv, and llama-index-readers-github >= 0.1.5 | |
''' | |
import os | |
import textwrap | |
from dotenv import load_dotenv | |
from llama_index.core import download_loader | |
from llama_index.readers.github import GithubRepositoryReader, GithubClient | |
from llama_index.core import VectorStoreIndex | |
from llama_index.vector_stores.deeplake import DeepLakeVectorStore | |
from llama_index.core.storage.storage_context import StorageContext | |
import re | |
# Load environment variables | |
load_dotenv() | |
# Fetch and set API keys | |
openai_api_key = os.getenv("OPENAI_API_KEY") | |
active_loop_token = os.getenv("ACTIVELOOP_TOKEN") | |
dataset_path = os.getenv("DATASET_PATH") | |
def parse_github_url(url): | |
pattern = r"https://github\.com/([^/]+)/([^/]+)" | |
match = re.match(pattern, url) | |
return match.groups() if match else (None, None) | |
def validate_owner_repo(owner, repo): | |
return bool(owner) and bool(repo) | |
def initialize_github_client(): | |
github_token = os.getenv("GITHUB_TOKEN") | |
return GithubClient(github_token) | |
def main(): | |
# Check for OpenAI API key | |
openai_api_key = os.getenv("OPENAI_API_KEY") | |
if not openai_api_key: | |
raise EnvironmentError("OpenAI API key not found in environment variables") | |
# Check for GitHub Token | |
github_token = os.getenv("GITHUB_TOKEN") | |
if not github_token: | |
raise EnvironmentError("GitHub token not found in environment variables") | |
# Check for Activeloop Token | |
active_loop_token = os.getenv("ACTIVELOOP_TOKEN") | |
if not active_loop_token: | |
raise EnvironmentError("Activeloop token not found in environment variables") | |
github_client = initialize_github_client() | |
download_loader("GithubRepositoryReader") | |
github_url = input("Please enter the GitHub repository URL: ") | |
owner, repo = parse_github_url(github_url) | |
while True: | |
owner, repo = parse_github_url(github_url) | |
if validate_owner_repo(owner, repo): | |
loader = GithubRepositoryReader( | |
github_client, | |
owner=owner, | |
repo=repo, | |
filter_file_extensions=( | |
[".py", ".js", ".ts", ".md"], | |
GithubRepositoryReader.FilterType.INCLUDE, | |
), | |
verbose=False, | |
concurrent_requests=5, | |
) | |
print(f"Loading {repo} repository by {owner}") | |
docs = loader.load_data(branch="main") | |
print("Documents uploaded:") | |
for doc in docs: | |
print(doc.metadata) | |
break # Exit the loop once the valid URL is processed | |
else: | |
print("Invalid GitHub URL. Please try again.") | |
github_url = input("Please enter the GitHub repository URL: ") | |
print("Uploading to vector store...") | |
# ====== Create vector store and upload data ====== | |
vector_store = DeepLakeVectorStore( | |
dataset_path=dataset_path, | |
overwrite=True, | |
runtime={"tensor_db": True}, | |
) | |
storage_context = StorageContext.from_defaults(vector_store=vector_store) | |
index = VectorStoreIndex.from_documents(docs, storage_context=storage_context) | |
query_engine = index.as_query_engine() | |
# Include a simple question to test. | |
intro_question = "What is the repository about?" | |
print(f"Test question: {intro_question}") | |
print("=" * 50) | |
answer = query_engine.query(intro_question) | |
print(f"Answer: {textwrap.fill(str(answer), 100)} \n") | |
while True: | |
user_question = input("Please enter your question (or type 'exit' to quit): ") | |
if user_question.lower() == "exit": | |
print("Exiting, thanks for chatting!") | |
break | |
print(f"Your question: {user_question}") | |
print("=" * 50) | |
answer = query_engine.query(user_question) | |
print(f"Answer: {textwrap.fill(str(answer), 100)} \n") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment