Skip to content

Instantly share code, notes, and snippets.

@lgmoneda
Last active February 18, 2025 08:52
Show Gist options
  • Save lgmoneda/d40eec3ca8c7db802260a0e7aa724478 to your computer and use it in GitHub Desktop.
Save lgmoneda/d40eec3ca8c7db802260a0e7aa724478 to your computer and use it in GitHub Desktop.
Segmenting text using the proportion of overlapping documents retrieved from a knowledge base to merge sentences
import json
import urllib.parse
import spacy
import requests
import re
from http.server import BaseHTTPRequestHandler, HTTPServer
from multi_language_utils import is_en_text, translate_to_english, load_language_detection_model
from urllib.parse import parse_qs, urlparse
# Load the SpaCy model
nlp = spacy.load("en_core_web_sm")
def parse_text_into_sentences(text, chars_position=False):
doc = nlp(text)
if chars_position:
sentences = [(sent.text.strip(), sent.start_char, sent.end_char) for sent in doc.sents]
else:
sentences = [sent.text.strip() for sent in doc.sents]
return sentences
def semantic_search(query, server_url="http://localhost:8800/api/"):
# Encode the query for HTTP transmission
encoded_query = requests.utils.quote(query)
# Construct the full URL
full_url = server_url + encoded_query
# Send the GET request
response = requests.get(full_url)
# Return the response text
return response.text
def extract_ids(response):
ids = []
lines = response.split('\n')
for line in lines:
if '[[id:' in line:
match = re.search(r'\[\[id:(.*?)\]\[', line)
if match:
ids.append(match.group(1))
return ids
def should_merge_sentences(list1, list2, segmentation_granularity):
if segmentation_granularity == 0:
return True # No segmentation, merge everything
set1, set2 = set(list1), set(list2)
differing_elements = len(set2 - set1)
proportion_different = differing_elements / len(set2)
return (1 - proportion_different) > segmentation_granularity
def segment_text_with_offsets(text, segmentation_granularity):
sentences = parse_text_into_sentences(text, chars_position=True)
segment_ids = []
if segmentation_granularity == 0:
response = semantic_search(text)
return [(text, 0, len(text), extract_ids(response))]
current_segment, current_start = sentences[0][0], sentences[0][1]
current_ids = extract_ids(semantic_search(current_segment))
for i in range(1, len(sentences)):
next_sentence, next_start_char, next_end_char = sentences[i]
next_ids = extract_ids(semantic_search(next_sentence))
if should_merge_sentences(current_ids, next_ids, segmentation_granularity):
current_segment += text[len(current_segment) + current_start:next_start_char] + next_sentence
current_ids = extract_ids(semantic_search(current_segment))
else:
end_index = sentences[i-1][2]
segment_ids.append((current_segment, current_start, end_index, current_ids))
current_segment, current_start = next_sentence, next_start_char
current_ids = next_ids
segment_ids.append((current_segment, current_start, sentences[-1][2], current_ids))
return segment_ids
def extract_segments_by_char_position(text, segments_with_offsets):
split_segments = []
for _, start_idx, end_idx, _ in segments_with_offsets:
split_segments.append(text[start_idx:end_idx])
return split_segments
class RequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
parsed_url = urlparse(self.path)
query_params = parse_qs(parsed_url.query)
if "text" in query_params and "granularity" in query_params:
request_str = query_params["text"][0] # Extract text
segmentation_granularity = float(query_params["granularity"][0]) # Extract granularity
# Process the request here
if segmentation_granularity == 1:
self.handle_standard_segmentation(request_str)
else:
self.handle_segmentation_with_granularity(request_str, segmentation_granularity)
else:
self.send_error(400, "Missing parameters")
def handle_standard_segmentation(self, request_str):
# Ensure request_str is in English for similarity search
if not is_en_text(request_str.replace("\n", ""), language_detection_model):
request_str_in_english = translate_to_english(request_str)
else:
request_str_in_english = request_str
segments = parse_text_into_sentences(request_str_in_english)
self.respond_json({'segments': segments})
def handle_segmentation_with_granularity(self, request_str, segmentation_granularity):
# Ensure request_str is in English
if not is_en_text(request_str.replace("\n", ""), language_detection_model):
request_str_in_english = translate_to_english(request_str)
else:
request_str_in_english = request_str
segmentation_result = segment_text_with_offsets(request_str_in_english, segmentation_granularity)
segments = extract_segments_by_char_position(request_str_in_english, segmentation_result)
segments = [seg.strip() for seg in segments]
self.respond_json({'segments': segments})
def respond_json(self, data):
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(data).encode())
def run_segment_server():
server_address= ('', 8866) # Port can be customized
httpd = HTTPServer(server_address, RequestHandler)
print(f'Segment Server is running on port {server_address[1]}')
httpd.serve_forever()
if __name__ == '__main__':
language_detection_model = load_language_detection_model(mode="inference")
run_segment_server()
;; Using python code to do the parsing so I can apply ml models
(defun start-discourse-segmentation ()
(interactive)
(async-shell-command "source ~/.zshrc && conda activate ml3 && python /Users/luis.moneda/repos/org-roam-ai/discourse_segmentation.py")
(delete-window (get-buffer-window (get-buffer "*Async Shell Command*<3>"))))
(start-discourse-segmentation)
(defun discourse-segmentation-api ()
"Call the given Python SCRIPT-NAME with the given TEXT as input and
display the output in a new temporary buffer."
(interactive)
(let* ((text (buffer-substring-no-properties (region-beginning) (region-end)))
(api-output (my/get-segments text)))
api-output))
(setq chunky-semantic-search/segmentation-granularity 0.25)
(defun set-chunky-semantic-search-granularity ()
"Set the value of `chunky-semantic-search/segmentation-granularity`.
Provides a selection from a predefined list, but also allows custom input."
(interactive)
(let* ((choices '(0 0.25 0.5 0.75 0.9 1))
(choice (completing-read
"Select granularity (or type a custom value); 0 generates a single chunk, 1 splits by sentence: "
(mapcar #'number-to-string choices)
nil nil)))
(setq chunky-semantic-search/segmentation-granularity
(min 1 (max 0 (string-to-number choice))))
(message "Granularity set to %f" chunky-semantic-search/segmentation-granularity)))
(defun my/get-segments (query)
"Send QUERY to the Python HTTP server and return the segments as an Emacs Lisp list."
(require 'url)
(require 'json)
(let* ((encoded-query (url-hexify-string query)) ;; Encode special characters
(url (format "http://localhost:8866/api/segment?text=%s&granularity=%s"
encoded-query
(number-to-string chunky-semantic-search/segmentation-granularity)))
(response-buffer (url-retrieve-synchronously url))
segments)
(if response-buffer
(with-current-buffer response-buffer
(goto-char (point-min))
;; (message "Raw response: %s" (buffer-string)) ;; 🔍 DEBUG: Print raw response
(if (search-forward "\n\n" nil t)
(let* ((json-object-type 'alist)
(json-array-type 'list)
(json-key-type 'string)
(parsed-json (condition-case err
(json-read)
(error (message "JSON read error: %s" err)
nil))))
(setq segments (cdr (assoc "segments" parsed-json))))
(message "Failed to locate JSON body in response"))
(kill-buffer response-buffer))
(message "No response received"))
segments))
(defun chunky-semantic-search (start end)
"Main function to analyze the region between START and END."
(interactive "r")
(let* ((sentences (discourse-segmentation-api))
(chunks (cl-loop with pos = start
for s in sentences
collect (progn
(goto-char pos) ;; Start searching from the current position
(when (search-forward s end t)
(let ((chunk-start (match-beginning 0))
(chunk-end (match-end 0)))
(setq pos (1+ chunk-end)) ;; Update `pos` for the next iteration
(cons chunk-start chunk-end)))))))
;; Filter out any nil results (in case a sentence isn't found)
(setq chunks (cl-remove-if-not #'identity chunks))
;; Apply colors and interactions
(my/apply-colors-to-chunks chunks)
(my/setup-interactions chunks))
(deactivate-mark))
;; Using consult as the interface
(require 'consult)
;; This function keeps the menu. It is flaky due to the re-openning
(defun my/display-popup-at-point ()
"Display search results using consult for the chunk at point, without closing the menu after selection."
(interactive)
(let ((ov (car (overlays-at (point)))))
(when ov
(let ((results (overlay-get ov 'my-results))
(ivy-height 16)
(ivy-posframe-height 16)
(ivy-posframe-width 100) ;; Adjust width
(ivy-posframe-display-functions-alist '((t . ivy-posframe-display-at-point))))
(when results
(let* ((titles (mapcar (lambda (s) (decode-coding-string s 'utf-8)) (mapcar #'car results)))
(last-selection nil)) ;; Store the last selection
(catch 'exit
(while t
(let* ((selected (decode-coding-string (consult--read
titles
:prompt "Select entry: "
:category 'org-roam
:require-match t
:sort nil
:history t
:default last-selection) 'utf-8))) ;; Decode user selection
(if selected
(progn
(setq last-selection selected) ;; Update last selection
(let ((entry (assoc selected (mapcar (lambda (r)
(cons (decode-coding-string (car r) 'utf-8) (cdr r)))
results)))) ;; Match correctly
(when entry
(my/open-org-roam-node-in-side-buffer entry))))
;; Exit the menu if no selection is made
(throw 'exit nil)))))))))))
(global-set-key (kbd "C-c s") 'my/display-popup-at-point)
;; To clean the faces if I'm writing
(defun chunky-semantic-search/clear-faces (&optional start end)
"Remove all face overlays applied by `chunky-semantic-search` from the selection or entire buffer."
(interactive (if (use-region-p) (list (region-beginning) (region-end)) (list nil nil)))
(let ((start (or start (point-min)))
(end (or end (point-max))))
(dolist (ov (overlays-in start end))
(when (overlay-get ov 'face) ;; Only remove overlays that set a face
(delete-overlay ov))))
(message "Chunky Semantic Search highlights cleared."))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment