Skip to content

Instantly share code, notes, and snippets.

@robweber
Last active July 28, 2025 11:57
Show Gist options
  • Select an option

  • Save robweber/894508850d7c632ad824b3d02104e868 to your computer and use it in GitHub Desktop.

Select an option

Save robweber/894508850d7c632ad824b3d02104e868 to your computer and use it in GitHub Desktop.
Sync local folder with Open WebUI Knowledge repo - https://robweber.github.io/ai/automation/coding/local_llm_rag_sync/
"""sync_webui_docs.py
compares a local directory of files with an Open WebUI Knowledge base
Files that exist in both directories are updated, if changed
Files deleted from local source are removed from Open WebUI
New local files are uploaded
After file updates are complete the Knowledgebase is re-indexed.
More information on the API can be found here: https://docs.openwebui.com/getting-started/api-endpoints
Swagger available on Web UI instance at /docs
Author: Rob Weber
"""
import argparse
import hashlib
import requests
import os
import sys
class KnowledgeUploader:
url = None
api_key = None
def __init__(self, host, port, api_key):
self.url = f'http://{host}:{port}/api/v1'
self.api_key = api_key
def _generate_headers(self):
headers = {
'Authorization': f'Bearer {self.api_key}',
'Accept': 'application/json'
}
return headers
# this was copied directly from open_webui: https://github.com/open-webui/open-webui/blob/main/backend/open_webui/utils/misc.py#L279
def _calculate_sha256_string(self, string):
# Create a new SHA-256 hash object
sha256_hash = hashlib.sha256()
# Update the hash object with the bytes of the input string
sha256_hash.update(string.encode("utf-8"))
# Get the hexadecimal representation of the hash
hashed_string = sha256_hash.hexdigest()
return hashed_string
# find the KB ID from the name
def find_knowledge_id(self, name):
result = None
# get a list of all knowledge bases
response = requests.get(f'{self.url}/knowledge/list', headers=self._generate_headers())
if(response.status_code == 200):
r = response.json()
knowledge_base = list(filter(lambda k: k['name'] == name, r))
if(len(knowledge_base) > 0):
result = knowledge_base[0]['id']
return result
# generage list of existing, deleted, and new files from the local folder
def generate_file_list(self, local_files, knowledge_id):
# new is an array since it's files that don't exist in remote yet
result = {"new": [], "existing": {}, "deleted": {}}
# get a list of all files currently in OpenUI
response = requests.get(f'{self.url}/knowledge/{knowledge_id}', headers=self._generate_headers())
if(response.status_code == 200):
remote_files = response.json()['files']
# existing = exists in both remote and local
existing = list(filter(lambda f: f['meta']['name'] in local_files, remote_files))
result['existing'] = list(map(lambda f: {'filename': f['meta']['name'], 'id': f['id']}, existing))
# deleted = exists in remote but NOT in local
deleted = list(filter(lambda f: f['meta']['name'] not in local_files, remote_files))
result['deleted'] = list(map(lambda f: {'filename': f['meta']['name'], 'id': f['id']}, deleted))
# generate list of only remote filenames
remote_filenames = list(map(lambda f: f['meta']['name'], remote_files))
# new files exist in local but NOT in remote
result['new'] = list(filter(lambda f: f not in remote_filenames, local_files))
return result
# upload a single file to the KB
def upload_file(self, knowledge_id, file_path):
result = False
# upload file to the file repo
files = {'file': open(file_path, 'rb')}
response = requests.post(f'{self.url}/files/', headers=self._generate_headers(), files=files)
if(response.status_code == 200):
r = response.json()
# add to the knowledge base
requests.post(f'{self.url}/knowledge/{knowledge_id}/file/add', headers=self._generate_headers(), json={'file_id': r['id']})
result = True
return result
# update contents of an existing KB file
def update_existing_file(self, file_id, file_path):
result = False
# open existing file and get contents
files = {'content': ''}
try:
with open(file_path) as f:
files['content'] = f.read()
except Exception:
print(f'error opening file {file_path}')
return result
# get hash of existing file
response = requests.get(f'{self.url}/files/{file_id}', headers=self._generate_headers())
file_info = response.json()
if(file_info['hash'] != self._calculate_sha256_string(files['content'])):
print(f"Hash of file {file_path} has changed, updating")
# upload to database
response = requests.post(f'{self.url}/files/{file_id}/data/content/update', headers=self._generate_headers(), json=files)
if(response.status_code == 200):
r = response.json()
result = True
else:
result = True # return True as this file didn't need uploading
return result
# remove a file from the KB and delete it from Open WebUI
def delete_file(self, knowledge_id, file_id):
# first remove from the knowledge base
response = requests.delete(f'{self.url}/knowledge/{knowledge_id}/file/remove', headers=self._generate_headers(), json={'id': file_id})
# then delete from file repo
response = requests.delete(f'{self.url}/files/{file_id}', headers=self._generate_headers())
# tell the system to rebuild the index
def refresh_kb(self, knowledge_id):
requests.post(f'{self.url}/knowledge/{knowledge_id}/reindex', headers=self._generate_headers())
parser = argparse.ArgumentParser(description='Syncs files in a directory with Open WebUI Knowledge Base')
parser.add_argument('-H','--host',required=True, help='The IP or hostname of the Open WebUI instance',type=str)
parser.add_argument('-P','--port',required=False, help='Open WebUI port',type=int, default=3000)
parser.add_argument('-T','--token',required=True, help='Open WebUI API Token',type=str)
parser.add_argument('-k','--knowledge_base',required=True, help='Name of Open WebUI knowledge base to sync with',type=str)
parser.add_argument('-f','--folder',required=True, help='Location of the export file',type=str)
args = parser.parse_args()
uploader = KnowledgeUploader(args.host, args.port, args.token)
# get the knowledge base ID from the name
k_id = uploader.find_knowledge_id(args.knowledge_base)
if(k_id == None):
print(f'Knowledge base {args.knowledge_base} could not be found')
sys.exit(2)
# get list of files in directory
local_files = None
try:
local_files = [f for f in os.listdir(args.folder) if os.path.isfile(os.path.join(args.folder, f))]
except FileNotFoundError:
print(f"Error: Directory not found: {directory}")
sys.exit(2)
# get list of new, existing, and deleted files
file_collection = uploader.generate_file_list(local_files, k_id)
# update existing files
print(f"Checking {len(file_collection['existing'])} existing files")
return_codes = list(map(lambda f: uploader.update_existing_file(f['id'], os.path.join(args.folder, f['filename'])), file_collection['existing']))
# delete old files
print(f"Deleting {len(file_collection['deleted'])} files")
return_codes = list(map(lambda f: uploader.delete_file(k_id, f['id']), file_collection['deleted']))
# upload new files
print(f"Uploading {len(file_collection['new'])} files")
return_codes = list(map(lambda f: uploader.upload_file(k_id, os.path.join(args.folder, f)), file_collection['new']))
# refresh knowledge base
uploader.refresh_kb(k_id)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment