Last active
February 18, 2025 08:52
-
-
Save lgmoneda/d40eec3ca8c7db802260a0e7aa724478 to your computer and use it in GitHub Desktop.
Segmenting text using the proportion of overlapping documents retrieved from a knowledge base to merge sentences
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import urllib.parse | |
import spacy | |
import requests | |
import re | |
from http.server import BaseHTTPRequestHandler, HTTPServer | |
from multi_language_utils import is_en_text, translate_to_english, load_language_detection_model | |
from urllib.parse import parse_qs, urlparse | |
# Load the SpaCy model | |
nlp = spacy.load("en_core_web_sm") | |
def parse_text_into_sentences(text, chars_position=False): | |
doc = nlp(text) | |
if chars_position: | |
sentences = [(sent.text.strip(), sent.start_char, sent.end_char) for sent in doc.sents] | |
else: | |
sentences = [sent.text.strip() for sent in doc.sents] | |
return sentences | |
def semantic_search(query, server_url="http://localhost:8800/api/"): | |
# Encode the query for HTTP transmission | |
encoded_query = requests.utils.quote(query) | |
# Construct the full URL | |
full_url = server_url + encoded_query | |
# Send the GET request | |
response = requests.get(full_url) | |
# Return the response text | |
return response.text | |
def extract_ids(response): | |
ids = [] | |
lines = response.split('\n') | |
for line in lines: | |
if '[[id:' in line: | |
match = re.search(r'\[\[id:(.*?)\]\[', line) | |
if match: | |
ids.append(match.group(1)) | |
return ids | |
def should_merge_sentences(list1, list2, segmentation_granularity): | |
if segmentation_granularity == 0: | |
return True # No segmentation, merge everything | |
set1, set2 = set(list1), set(list2) | |
differing_elements = len(set2 - set1) | |
proportion_different = differing_elements / len(set2) | |
return (1 - proportion_different) > segmentation_granularity | |
def segment_text_with_offsets(text, segmentation_granularity): | |
sentences = parse_text_into_sentences(text, chars_position=True) | |
segment_ids = [] | |
if segmentation_granularity == 0: | |
response = semantic_search(text) | |
return [(text, 0, len(text), extract_ids(response))] | |
current_segment, current_start = sentences[0][0], sentences[0][1] | |
current_ids = extract_ids(semantic_search(current_segment)) | |
for i in range(1, len(sentences)): | |
next_sentence, next_start_char, next_end_char = sentences[i] | |
next_ids = extract_ids(semantic_search(next_sentence)) | |
if should_merge_sentences(current_ids, next_ids, segmentation_granularity): | |
current_segment += text[len(current_segment) + current_start:next_start_char] + next_sentence | |
current_ids = extract_ids(semantic_search(current_segment)) | |
else: | |
end_index = sentences[i-1][2] | |
segment_ids.append((current_segment, current_start, end_index, current_ids)) | |
current_segment, current_start = next_sentence, next_start_char | |
current_ids = next_ids | |
segment_ids.append((current_segment, current_start, sentences[-1][2], current_ids)) | |
return segment_ids | |
def extract_segments_by_char_position(text, segments_with_offsets): | |
split_segments = [] | |
for _, start_idx, end_idx, _ in segments_with_offsets: | |
split_segments.append(text[start_idx:end_idx]) | |
return split_segments | |
class RequestHandler(BaseHTTPRequestHandler): | |
def do_GET(self): | |
parsed_url = urlparse(self.path) | |
query_params = parse_qs(parsed_url.query) | |
if "text" in query_params and "granularity" in query_params: | |
request_str = query_params["text"][0] # Extract text | |
segmentation_granularity = float(query_params["granularity"][0]) # Extract granularity | |
# Process the request here | |
if segmentation_granularity == 1: | |
self.handle_standard_segmentation(request_str) | |
else: | |
self.handle_segmentation_with_granularity(request_str, segmentation_granularity) | |
else: | |
self.send_error(400, "Missing parameters") | |
def handle_standard_segmentation(self, request_str): | |
# Ensure request_str is in English for similarity search | |
if not is_en_text(request_str.replace("\n", ""), language_detection_model): | |
request_str_in_english = translate_to_english(request_str) | |
else: | |
request_str_in_english = request_str | |
segments = parse_text_into_sentences(request_str_in_english) | |
self.respond_json({'segments': segments}) | |
def handle_segmentation_with_granularity(self, request_str, segmentation_granularity): | |
# Ensure request_str is in English | |
if not is_en_text(request_str.replace("\n", ""), language_detection_model): | |
request_str_in_english = translate_to_english(request_str) | |
else: | |
request_str_in_english = request_str | |
segmentation_result = segment_text_with_offsets(request_str_in_english, segmentation_granularity) | |
segments = extract_segments_by_char_position(request_str_in_english, segmentation_result) | |
segments = [seg.strip() for seg in segments] | |
self.respond_json({'segments': segments}) | |
def respond_json(self, data): | |
self.send_response(200) | |
self.send_header('Content-type', 'application/json') | |
self.end_headers() | |
self.wfile.write(json.dumps(data).encode()) | |
def run_segment_server(): | |
server_address= ('', 8866) # Port can be customized | |
httpd = HTTPServer(server_address, RequestHandler) | |
print(f'Segment Server is running on port {server_address[1]}') | |
httpd.serve_forever() | |
if __name__ == '__main__': | |
language_detection_model = load_language_detection_model(mode="inference") | |
run_segment_server() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; Using python code to do the parsing so I can apply ml models | |
(defun start-discourse-segmentation () | |
(interactive) | |
(async-shell-command "source ~/.zshrc && conda activate ml3 && python /Users/luis.moneda/repos/org-roam-ai/discourse_segmentation.py") | |
(delete-window (get-buffer-window (get-buffer "*Async Shell Command*<3>")))) | |
(start-discourse-segmentation) | |
(defun discourse-segmentation-api () | |
"Call the given Python SCRIPT-NAME with the given TEXT as input and | |
display the output in a new temporary buffer." | |
(interactive) | |
(let* ((text (buffer-substring-no-properties (region-beginning) (region-end))) | |
(api-output (my/get-segments text))) | |
api-output)) | |
(setq chunky-semantic-search/segmentation-granularity 0.25) | |
(defun set-chunky-semantic-search-granularity () | |
"Set the value of `chunky-semantic-search/segmentation-granularity`. | |
Provides a selection from a predefined list, but also allows custom input." | |
(interactive) | |
(let* ((choices '(0 0.25 0.5 0.75 0.9 1)) | |
(choice (completing-read | |
"Select granularity (or type a custom value); 0 generates a single chunk, 1 splits by sentence: " | |
(mapcar #'number-to-string choices) | |
nil nil))) | |
(setq chunky-semantic-search/segmentation-granularity | |
(min 1 (max 0 (string-to-number choice)))) | |
(message "Granularity set to %f" chunky-semantic-search/segmentation-granularity))) | |
(defun my/get-segments (query) | |
"Send QUERY to the Python HTTP server and return the segments as an Emacs Lisp list." | |
(require 'url) | |
(require 'json) | |
(let* ((encoded-query (url-hexify-string query)) ;; Encode special characters | |
(url (format "http://localhost:8866/api/segment?text=%s&granularity=%s" | |
encoded-query | |
(number-to-string chunky-semantic-search/segmentation-granularity))) | |
(response-buffer (url-retrieve-synchronously url)) | |
segments) | |
(if response-buffer | |
(with-current-buffer response-buffer | |
(goto-char (point-min)) | |
;; (message "Raw response: %s" (buffer-string)) ;; 🔍 DEBUG: Print raw response | |
(if (search-forward "\n\n" nil t) | |
(let* ((json-object-type 'alist) | |
(json-array-type 'list) | |
(json-key-type 'string) | |
(parsed-json (condition-case err | |
(json-read) | |
(error (message "JSON read error: %s" err) | |
nil)))) | |
(setq segments (cdr (assoc "segments" parsed-json)))) | |
(message "Failed to locate JSON body in response")) | |
(kill-buffer response-buffer)) | |
(message "No response received")) | |
segments)) | |
(defun chunky-semantic-search (start end) | |
"Main function to analyze the region between START and END." | |
(interactive "r") | |
(let* ((sentences (discourse-segmentation-api)) | |
(chunks (cl-loop with pos = start | |
for s in sentences | |
collect (progn | |
(goto-char pos) ;; Start searching from the current position | |
(when (search-forward s end t) | |
(let ((chunk-start (match-beginning 0)) | |
(chunk-end (match-end 0))) | |
(setq pos (1+ chunk-end)) ;; Update `pos` for the next iteration | |
(cons chunk-start chunk-end))))))) | |
;; Filter out any nil results (in case a sentence isn't found) | |
(setq chunks (cl-remove-if-not #'identity chunks)) | |
;; Apply colors and interactions | |
(my/apply-colors-to-chunks chunks) | |
(my/setup-interactions chunks)) | |
(deactivate-mark)) | |
;; Using consult as the interface | |
(require 'consult) | |
;; This function keeps the menu. It is flaky due to the re-openning | |
(defun my/display-popup-at-point () | |
"Display search results using consult for the chunk at point, without closing the menu after selection." | |
(interactive) | |
(let ((ov (car (overlays-at (point))))) | |
(when ov | |
(let ((results (overlay-get ov 'my-results)) | |
(ivy-height 16) | |
(ivy-posframe-height 16) | |
(ivy-posframe-width 100) ;; Adjust width | |
(ivy-posframe-display-functions-alist '((t . ivy-posframe-display-at-point)))) | |
(when results | |
(let* ((titles (mapcar (lambda (s) (decode-coding-string s 'utf-8)) (mapcar #'car results))) | |
(last-selection nil)) ;; Store the last selection | |
(catch 'exit | |
(while t | |
(let* ((selected (decode-coding-string (consult--read | |
titles | |
:prompt "Select entry: " | |
:category 'org-roam | |
:require-match t | |
:sort nil | |
:history t | |
:default last-selection) 'utf-8))) ;; Decode user selection | |
(if selected | |
(progn | |
(setq last-selection selected) ;; Update last selection | |
(let ((entry (assoc selected (mapcar (lambda (r) | |
(cons (decode-coding-string (car r) 'utf-8) (cdr r))) | |
results)))) ;; Match correctly | |
(when entry | |
(my/open-org-roam-node-in-side-buffer entry)))) | |
;; Exit the menu if no selection is made | |
(throw 'exit nil))))))))))) | |
(global-set-key (kbd "C-c s") 'my/display-popup-at-point) | |
;; To clean the faces if I'm writing | |
(defun chunky-semantic-search/clear-faces (&optional start end) | |
"Remove all face overlays applied by `chunky-semantic-search` from the selection or entire buffer." | |
(interactive (if (use-region-p) (list (region-beginning) (region-end)) (list nil nil))) | |
(let ((start (or start (point-min))) | |
(end (or end (point-max)))) | |
(dolist (ov (overlays-in start end)) | |
(when (overlay-get ov 'face) ;; Only remove overlays that set a face | |
(delete-overlay ov)))) | |
(message "Chunky Semantic Search highlights cleared.")) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment