Last active
July 6, 2024 01:24
-
-
Save rynomad/849d63929e86c5a7383d3bfa653df27a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
title: OODA Loop Long-Term Memory | |
author: Ryan Bennett | |
date: 2024-07-05 | |
version: 1.0 | |
license: MIT | |
description: A pipeline for the OODA loop that uses long-term memory to help with decision making. | |
requirements: neo4j, openai, ollama | |
environment_variables: NEO4J_URI, NEO4J_USER, NEO4J_PASSWORD, OPENAI_API_KEY, OPENAI_BASE_URL, OPENAI_MODEL, EMBEDDINGS_MODEL, EMBEDDINGS_DIMENSIONS | |
""" | |
import os | |
import json | |
from typing import List, Optional, Dict, Any | |
from pydantic import BaseModel | |
from schemas import OpenAIChatMessage | |
import time | |
from neo4j import GraphDatabase, Transaction | |
import ollama | |
import openai | |
import logging | |
logging.basicConfig(level=logging.INFO) | |
class Pipeline: | |
class Valves(BaseModel): | |
pipelines: List[str] = ["*"] | |
priority: int = 0 | |
neo4j_uri: str = os.getenv("NEO4J_URI", "bolt://localhost:7687") | |
neo4j_user: str = os.getenv("NEO4J_USER", "neo4j") | |
neo4j_password: str = os.getenv("NEO4J_PASSWORD", "password") | |
openai_api_key: str = os.getenv("OPENAI_API_KEY", "your-api-key-here") | |
openai_base_url: str = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1") | |
openai_model: str = os.getenv("OPENAI_MODEL", "gpt-3.5-turbo") | |
embeddings_model: str = os.getenv("EMBEDDINGS_MODEL", "gte-qwen2-1.5b-instruct-embed-f16") | |
embeddings_dimensions: int = int(os.getenv("EMBEDDINGS_DIMENSIONS", 4096)) | |
def __init__(self): | |
self.type = "filter" | |
self.name = "OODA Loop Long-Term Memory" | |
self.valves = self.Valves() | |
try: | |
self.driver = GraphDatabase.driver( | |
self.valves.neo4j_uri, | |
auth=(self.valves.neo4j_user, self.valves.neo4j_password) | |
) | |
openai.api_key = self.valves.openai_api_key | |
openai.api_base = self.valves.openai_base_url | |
except Exception as e: | |
logging.error(f"Error during startup: {str(e)}") | |
async def on_startup(self): | |
logging.info(f"on_startup:{__name__}") | |
try: | |
with self.driver.session() as session: | |
session.run(""" | |
CREATE CONSTRAINT decision_id IF NOT EXISTS FOR (d:Decision) REQUIRE d.id IS UNIQUE; | |
CREATE CONSTRAINT action_id IF NOT EXISTS FOR (a:Action) REQUIRE a.id IS UNIQUE; | |
CREATE CONSTRAINT observation_id IF NOT EXISTS FOR (o:Observation) REQUIRE o.id IS UNIQUE; | |
CREATE CONSTRAINT orientation_id IF NOT EXISTS FOR (or:Orientation) REQUIRE or.id IS UNIQUE; | |
CREATE INDEX decision_chat IF NOT EXISTS FOR (d:Decision) ON (d.chat_id, d.chat_index); | |
CREATE INDEX action_chat IF NOT EXISTS FOR (a:Action) ON (a.chat_id, a.chat_index); | |
CREATE VECTOR INDEX observation_embeddings IF NOT EXISTS | |
FOR (o:Observation) | |
ON o.embedding | |
OPTIONS {indexConfig: { | |
`vector.dimensions`: $embeddings_dimensions, | |
`vector.similarity_function`: 'cosine' | |
} } | |
""", embeddings_dimensions=self.valves.embeddings_dimensions) | |
except Exception as e: | |
logging.error(f"Error during startup: {str(e)}") | |
async def on_shutdown(self): | |
logging.info(f"on_shutdown:{__name__}") | |
self.driver.close() | |
async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: | |
logging.info(f"inlet:{__name__}") | |
try: | |
self.make_decision(body) | |
return body | |
except Exception as e: | |
logging.error(f"Error in inlet: {str(e)}") | |
return body | |
async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: | |
logging.info(f"outlet:{__name__}") | |
try: | |
action_id = self.make_action(body) | |
observations_and_orientation = self.make_observations_and_orientation(body, action_id) | |
body['messages'][-1]['content'] += f"\n\n{observations_and_orientation}" | |
return body | |
except Exception as e: | |
logging.error(f"Error in outlet: {str(e)}") | |
return body | |
def make_decision(self, body: dict) -> str: | |
chat_id = body.get('chat_id', 'default_chat') | |
chat_index = len(body.get('messages', [])) - 1 | |
user_input = body['messages'][-1]['content'] | |
try: | |
with self.driver.session() as session: | |
result = session.run(""" | |
CREATE (d:Decision { | |
id: apoc.create.uuid(), | |
chat_id: $chat_id, | |
chat_index: $chat_index, | |
content: $content, | |
timestamp: datetime() | |
}) | |
RETURN d.id as decision_id | |
""", chat_id=chat_id, chat_index=chat_index, content=user_input) | |
decision_id = result.single()['decision_id'] | |
return decision_id | |
except Exception as e: | |
logging.error(f"Error in make_decision: {str(e)}") | |
raise | |
def make_action(self, body: dict) -> str: | |
chat_id = body.get('chat_id', 'default_chat') | |
decision_index = len(body.get('messages', [])) - 2 | |
chat_index = len(body.get('messages', [])) - 1 | |
assistant_response = body['messages'][-1]['content'] | |
try: | |
with self.driver.session() as session: | |
result = session.run(""" | |
MERGE (d:Decision { | |
chat_id: $chat_id, | |
chat_index: $decision_index | |
}) | |
CREATE (a:Action { | |
id: randomUUID(), | |
chat_id: $chat_id, | |
chat_index: $chat_index, | |
content: $content, | |
timestamp: datetime() | |
}) | |
CREATE (d)-[:CAUSED]->(a) | |
RETURN a.id as action_id | |
""", chat_id=chat_id, chat_index=chat_index, content=assistant_response, decision_index=decision_index) | |
action_id = result.single()['action_id'] | |
self.make_embeddings(session, action_id) | |
return action_id | |
except Exception as e: | |
logging.error(f"Error in make_action: {str(e)}") | |
raise | |
def make_observations_and_orientation(self, body: dict, action_id: str) -> str: | |
instructions = """ | |
You are an expert in the OODA (Observe, Orient, Decide, Act) loop process. | |
Analyze the most recent turn in the conversation and extract key observations. | |
Focus on identifying relevant facts, context, and potential implications. | |
Provide your response as a JSON object with an "observations" key containing an array of observation objects. | |
Each observation object should have the following properties: | |
1. content: The main observation text | |
2. relevance: A score from 1-10 indicating how relevant this observation is to the current context | |
3. category: One of ['user_intent', 'key_information', 'emotional_state', 'action_item', 'background_context'] | |
4. confidence: A score from 0-1 indicating your confidence in this observation | |
Provide at least 3 observations, but no more than 5. | |
""" | |
messages = body['messages'] | |
observations = self.llm_call(instructions, messages)['observations'] | |
try: | |
with self.driver.session() as session: | |
with session.begin_transaction() as tx: | |
for observation in observations: | |
result = tx.run(""" | |
MATCH (a:Action {id: $action_id}) | |
CREATE (o:Observation { | |
id: apoc.create.uuid(), | |
content: $content, | |
relevance: $relevance, | |
category: $category, | |
confidence: $confidence, | |
timestamp: datetime() | |
}) | |
CREATE (a)-[:CAUSED]->(o) | |
RETURN o.id as observation_id | |
""", action_id=action_id, content=observation['content'], relevance=observation['relevance'], category=observation['category'], confidence=observation['confidence']) | |
observation_id = result.single()['observation_id'] | |
self.make_embeddings(tx, observation_id) | |
observation['id'] = observation_id | |
observation_ids = [observation['id'] for observation in observations] | |
results = self.get_orientations_from_similar_observations(tx, observation_ids) | |
past_observations_and_orientations = self.stringify_ooda_results(observations, results) | |
orientation_instructions = """ | |
You are an expert in the OODA (Observe, Orient, Decide, Act) loop process. | |
Create a new orientation based on the current conversation, the given observations and past orientations from similar conversations. | |
Your response should be a JSON object with an "orientation" key containing a detailed description of the new orientation, | |
as well as "observation_ids" and "orientation_ids" keys containing the IDs of the observations and orientations that are most relevant to the new orientation. | |
""" | |
orientation_messages = [ | |
{"role": "user", "content": past_observations_and_orientations} | |
] | |
new_orientation = self.llm_call(orientation_instructions, orientation_messages) | |
orientation_id = tx.run(""" | |
UNWIND $observation_ids AS obs_id | |
UNWIND $orientation_ids AS orient_id | |
CREATE (o:Orientation { | |
id: apoc.create.uuid(), | |
content: $content, | |
timestamp: datetime() | |
}) | |
WITH o, obs_id, orient_id | |
MATCH (obs:Observation {id: obs_id}) | |
MATCH (prior_orient:Orientation {id: orient_id}) | |
CREATE (o)-[:BASED_ON]->(obs) | |
CREATE (o)-[:INCORPORATES]->(prior_orient) | |
WITH o | |
RETURN o.id as orientation_id | |
""", content=new_orientation['orientation'], observation_ids=new_orientation['observation_ids'], orientation_ids=new_orientation['orientation_ids']).single()['orientation_id'] | |
return f"New Orientation({orientation_id}):\n\n {new_orientation['orientation']}\n\n{past_observations_and_orientations}" | |
except Exception as e: | |
logging.error(f"Error in make_observations_and_orientation: {str(e)}") | |
raise | |
def get_orientations_from_similar_observations(self, tx: Transaction, observation_ids: List[str]) -> List[Dict[str, Any]]: | |
query = """ | |
UNWIND $observationIds AS observationId | |
MATCH (o:Observation {id: observationId}) | |
CALL { | |
WITH o | |
CALL db.index.vector.queryNodes('observation_embeddings', 5, o.embedding) YIELD node, score | |
WHERE node <> o | |
RETURN node AS similarObservation, score | |
} | |
WITH o, collect({observation: similarObservation, score: score}) AS similarObservations | |
UNWIND similarObservations AS similar | |
MATCH (similar.observation)-[:CAUSED]->(orientation:Orientation) | |
WITH o, collect(DISTINCT orientation) AS uniqueOrientations | |
RETURN o.id AS originalObservationId, uniqueOrientations, similarObservations | |
""" | |
try: | |
result = tx.run(query, observationIds=observation_ids) | |
return list(result) | |
except Exception as e: | |
logging.error(f"Error in get_orientations_from_similar_observations: {str(e)}") | |
raise | |
def llm_call(self, instructions: str, messages: List[Dict[str, str]]) -> Dict[str, Any]: | |
system_message = f""" | |
{instructions} | |
Format your response as JSON within <output></output> tags: | |
<output> | |
{{ | |
.... | |
}} | |
</output> | |
""" | |
messages = [{"role": "system", "content": system_message}] + messages | |
try: | |
response = openai.ChatCompletion.create( | |
model=self.valves.openai_model, | |
messages=messages, | |
temperature=0.0, | |
max_tokens=2048 | |
) | |
content = response.choices[0].message['content'] | |
json_str = content.split('<output>')[1].split('</output>')[0] | |
return json.loads(json_str) | |
except Exception as e: | |
logging.error(f"Error in LLM call: {str(e)}") | |
raise | |
def make_embeddings(self, session: Transaction, id: str) -> None: | |
try: | |
node = session.run(""" | |
MATCH (n {id: $id}) | |
RETURN apoc.map.removeKey(properties(n), 'embedding') AS n | |
""", id=id).single()['n'] | |
prompt = f"Represent this sentence for searching relevant passages: {json.dumps(node)}" | |
response = ollama.embeddings(model=self.valves.embeddings_model, prompt=prompt) | |
session.run(""" | |
MATCH (n {id: $id}) | |
CALL db.create.setNodeVectorProperty(n, $key, $vector) | |
""", id=id, key="embedding", vector=response['embedding']) | |
except Exception as e: | |
logging.error(f"Error in make_embeddings: {str(e)}") | |
raise | |
def stringify_ooda_results(self, original_observations: List[Dict[str, Any]], similar_observations_results: List[Dict[str, Any]]) -> str: | |
result_strings = [] | |
for original_obs in original_observations: | |
obs_string = f"Original Observation:\n" | |
obs_string += f" ID: {original_obs['id']}\n" | |
obs_string += f" Content: {original_obs['content']}\n" | |
obs_string += f" Relevance: {original_obs['relevance']}\n" | |
obs_string += f" Category: {original_obs['category']}\n" | |
obs_string += f" Confidence: {original_obs['confidence']}\n\n" | |
similar_obs = next((r for r in similar_observations_results if r['originalObservationId'] == original_obs['id']), None) | |
if similar_obs: | |
obs_string += "Similar Observations and Their Orientations:\n" | |
for orientation in similar_obs['uniqueOrientations']: | |
obs_string += f" Orientation ID: {orientation['id']}\n" | |
obs_string += f" Content: {orientation.get('content', 'N/A')}\n" | |
obs_string += f" Timestamp: {orientation.get('timestamp', 'N/A')}\n\n" | |
else: | |
obs_string += "No similar observations found.\n\n" | |
result_strings.append(obs_string) | |
return "\n".join(result_strings) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment