Skip to content

Instantly share code, notes, and snippets.

@rynomad
Last active July 6, 2024 01:24
Show Gist options
  • Save rynomad/849d63929e86c5a7383d3bfa653df27a to your computer and use it in GitHub Desktop.
Save rynomad/849d63929e86c5a7383d3bfa653df27a to your computer and use it in GitHub Desktop.
"""
title: OODA Loop Long-Term Memory
author: Ryan Bennett
date: 2024-07-05
version: 1.0
license: MIT
description: A pipeline for the OODA loop that uses long-term memory to help with decision making.
requirements: neo4j, openai, ollama
environment_variables: NEO4J_URI, NEO4J_USER, NEO4J_PASSWORD, OPENAI_API_KEY, OPENAI_BASE_URL, OPENAI_MODEL, EMBEDDINGS_MODEL, EMBEDDINGS_DIMENSIONS
"""
import os
import json
from typing import List, Optional, Dict, Any
from pydantic import BaseModel
from schemas import OpenAIChatMessage
import time
from neo4j import GraphDatabase, Transaction
import ollama
import openai
import logging
logging.basicConfig(level=logging.INFO)
class Pipeline:
class Valves(BaseModel):
pipelines: List[str] = ["*"]
priority: int = 0
neo4j_uri: str = os.getenv("NEO4J_URI", "bolt://localhost:7687")
neo4j_user: str = os.getenv("NEO4J_USER", "neo4j")
neo4j_password: str = os.getenv("NEO4J_PASSWORD", "password")
openai_api_key: str = os.getenv("OPENAI_API_KEY", "your-api-key-here")
openai_base_url: str = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1")
openai_model: str = os.getenv("OPENAI_MODEL", "gpt-3.5-turbo")
embeddings_model: str = os.getenv("EMBEDDINGS_MODEL", "gte-qwen2-1.5b-instruct-embed-f16")
embeddings_dimensions: int = int(os.getenv("EMBEDDINGS_DIMENSIONS", 4096))
def __init__(self):
self.type = "filter"
self.name = "OODA Loop Long-Term Memory"
self.valves = self.Valves()
try:
self.driver = GraphDatabase.driver(
self.valves.neo4j_uri,
auth=(self.valves.neo4j_user, self.valves.neo4j_password)
)
openai.api_key = self.valves.openai_api_key
openai.api_base = self.valves.openai_base_url
except Exception as e:
logging.error(f"Error during startup: {str(e)}")
async def on_startup(self):
logging.info(f"on_startup:{__name__}")
try:
with self.driver.session() as session:
session.run("""
CREATE CONSTRAINT decision_id IF NOT EXISTS FOR (d:Decision) REQUIRE d.id IS UNIQUE;
CREATE CONSTRAINT action_id IF NOT EXISTS FOR (a:Action) REQUIRE a.id IS UNIQUE;
CREATE CONSTRAINT observation_id IF NOT EXISTS FOR (o:Observation) REQUIRE o.id IS UNIQUE;
CREATE CONSTRAINT orientation_id IF NOT EXISTS FOR (or:Orientation) REQUIRE or.id IS UNIQUE;
CREATE INDEX decision_chat IF NOT EXISTS FOR (d:Decision) ON (d.chat_id, d.chat_index);
CREATE INDEX action_chat IF NOT EXISTS FOR (a:Action) ON (a.chat_id, a.chat_index);
CREATE VECTOR INDEX observation_embeddings IF NOT EXISTS
FOR (o:Observation)
ON o.embedding
OPTIONS {indexConfig: {
`vector.dimensions`: $embeddings_dimensions,
`vector.similarity_function`: 'cosine'
} }
""", embeddings_dimensions=self.valves.embeddings_dimensions)
except Exception as e:
logging.error(f"Error during startup: {str(e)}")
async def on_shutdown(self):
logging.info(f"on_shutdown:{__name__}")
self.driver.close()
async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
logging.info(f"inlet:{__name__}")
try:
self.make_decision(body)
return body
except Exception as e:
logging.error(f"Error in inlet: {str(e)}")
return body
async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
logging.info(f"outlet:{__name__}")
try:
action_id = self.make_action(body)
observations_and_orientation = self.make_observations_and_orientation(body, action_id)
body['messages'][-1]['content'] += f"\n\n{observations_and_orientation}"
return body
except Exception as e:
logging.error(f"Error in outlet: {str(e)}")
return body
def make_decision(self, body: dict) -> str:
chat_id = body.get('chat_id', 'default_chat')
chat_index = len(body.get('messages', [])) - 1
user_input = body['messages'][-1]['content']
try:
with self.driver.session() as session:
result = session.run("""
CREATE (d:Decision {
id: apoc.create.uuid(),
chat_id: $chat_id,
chat_index: $chat_index,
content: $content,
timestamp: datetime()
})
RETURN d.id as decision_id
""", chat_id=chat_id, chat_index=chat_index, content=user_input)
decision_id = result.single()['decision_id']
return decision_id
except Exception as e:
logging.error(f"Error in make_decision: {str(e)}")
raise
def make_action(self, body: dict) -> str:
chat_id = body.get('chat_id', 'default_chat')
decision_index = len(body.get('messages', [])) - 2
chat_index = len(body.get('messages', [])) - 1
assistant_response = body['messages'][-1]['content']
try:
with self.driver.session() as session:
result = session.run("""
MERGE (d:Decision {
chat_id: $chat_id,
chat_index: $decision_index
})
CREATE (a:Action {
id: randomUUID(),
chat_id: $chat_id,
chat_index: $chat_index,
content: $content,
timestamp: datetime()
})
CREATE (d)-[:CAUSED]->(a)
RETURN a.id as action_id
""", chat_id=chat_id, chat_index=chat_index, content=assistant_response, decision_index=decision_index)
action_id = result.single()['action_id']
self.make_embeddings(session, action_id)
return action_id
except Exception as e:
logging.error(f"Error in make_action: {str(e)}")
raise
def make_observations_and_orientation(self, body: dict, action_id: str) -> str:
instructions = """
You are an expert in the OODA (Observe, Orient, Decide, Act) loop process.
Analyze the most recent turn in the conversation and extract key observations.
Focus on identifying relevant facts, context, and potential implications.
Provide your response as a JSON object with an "observations" key containing an array of observation objects.
Each observation object should have the following properties:
1. content: The main observation text
2. relevance: A score from 1-10 indicating how relevant this observation is to the current context
3. category: One of ['user_intent', 'key_information', 'emotional_state', 'action_item', 'background_context']
4. confidence: A score from 0-1 indicating your confidence in this observation
Provide at least 3 observations, but no more than 5.
"""
messages = body['messages']
observations = self.llm_call(instructions, messages)['observations']
try:
with self.driver.session() as session:
with session.begin_transaction() as tx:
for observation in observations:
result = tx.run("""
MATCH (a:Action {id: $action_id})
CREATE (o:Observation {
id: apoc.create.uuid(),
content: $content,
relevance: $relevance,
category: $category,
confidence: $confidence,
timestamp: datetime()
})
CREATE (a)-[:CAUSED]->(o)
RETURN o.id as observation_id
""", action_id=action_id, content=observation['content'], relevance=observation['relevance'], category=observation['category'], confidence=observation['confidence'])
observation_id = result.single()['observation_id']
self.make_embeddings(tx, observation_id)
observation['id'] = observation_id
observation_ids = [observation['id'] for observation in observations]
results = self.get_orientations_from_similar_observations(tx, observation_ids)
past_observations_and_orientations = self.stringify_ooda_results(observations, results)
orientation_instructions = """
You are an expert in the OODA (Observe, Orient, Decide, Act) loop process.
Create a new orientation based on the current conversation, the given observations and past orientations from similar conversations.
Your response should be a JSON object with an "orientation" key containing a detailed description of the new orientation,
as well as "observation_ids" and "orientation_ids" keys containing the IDs of the observations and orientations that are most relevant to the new orientation.
"""
orientation_messages = [
{"role": "user", "content": past_observations_and_orientations}
]
new_orientation = self.llm_call(orientation_instructions, orientation_messages)
orientation_id = tx.run("""
UNWIND $observation_ids AS obs_id
UNWIND $orientation_ids AS orient_id
CREATE (o:Orientation {
id: apoc.create.uuid(),
content: $content,
timestamp: datetime()
})
WITH o, obs_id, orient_id
MATCH (obs:Observation {id: obs_id})
MATCH (prior_orient:Orientation {id: orient_id})
CREATE (o)-[:BASED_ON]->(obs)
CREATE (o)-[:INCORPORATES]->(prior_orient)
WITH o
RETURN o.id as orientation_id
""", content=new_orientation['orientation'], observation_ids=new_orientation['observation_ids'], orientation_ids=new_orientation['orientation_ids']).single()['orientation_id']
return f"New Orientation({orientation_id}):\n\n {new_orientation['orientation']}\n\n{past_observations_and_orientations}"
except Exception as e:
logging.error(f"Error in make_observations_and_orientation: {str(e)}")
raise
def get_orientations_from_similar_observations(self, tx: Transaction, observation_ids: List[str]) -> List[Dict[str, Any]]:
query = """
UNWIND $observationIds AS observationId
MATCH (o:Observation {id: observationId})
CALL {
WITH o
CALL db.index.vector.queryNodes('observation_embeddings', 5, o.embedding) YIELD node, score
WHERE node <> o
RETURN node AS similarObservation, score
}
WITH o, collect({observation: similarObservation, score: score}) AS similarObservations
UNWIND similarObservations AS similar
MATCH (similar.observation)-[:CAUSED]->(orientation:Orientation)
WITH o, collect(DISTINCT orientation) AS uniqueOrientations
RETURN o.id AS originalObservationId, uniqueOrientations, similarObservations
"""
try:
result = tx.run(query, observationIds=observation_ids)
return list(result)
except Exception as e:
logging.error(f"Error in get_orientations_from_similar_observations: {str(e)}")
raise
def llm_call(self, instructions: str, messages: List[Dict[str, str]]) -> Dict[str, Any]:
system_message = f"""
{instructions}
Format your response as JSON within <output></output> tags:
<output>
{{
....
}}
</output>
"""
messages = [{"role": "system", "content": system_message}] + messages
try:
response = openai.ChatCompletion.create(
model=self.valves.openai_model,
messages=messages,
temperature=0.0,
max_tokens=2048
)
content = response.choices[0].message['content']
json_str = content.split('<output>')[1].split('</output>')[0]
return json.loads(json_str)
except Exception as e:
logging.error(f"Error in LLM call: {str(e)}")
raise
def make_embeddings(self, session: Transaction, id: str) -> None:
try:
node = session.run("""
MATCH (n {id: $id})
RETURN apoc.map.removeKey(properties(n), 'embedding') AS n
""", id=id).single()['n']
prompt = f"Represent this sentence for searching relevant passages: {json.dumps(node)}"
response = ollama.embeddings(model=self.valves.embeddings_model, prompt=prompt)
session.run("""
MATCH (n {id: $id})
CALL db.create.setNodeVectorProperty(n, $key, $vector)
""", id=id, key="embedding", vector=response['embedding'])
except Exception as e:
logging.error(f"Error in make_embeddings: {str(e)}")
raise
def stringify_ooda_results(self, original_observations: List[Dict[str, Any]], similar_observations_results: List[Dict[str, Any]]) -> str:
result_strings = []
for original_obs in original_observations:
obs_string = f"Original Observation:\n"
obs_string += f" ID: {original_obs['id']}\n"
obs_string += f" Content: {original_obs['content']}\n"
obs_string += f" Relevance: {original_obs['relevance']}\n"
obs_string += f" Category: {original_obs['category']}\n"
obs_string += f" Confidence: {original_obs['confidence']}\n\n"
similar_obs = next((r for r in similar_observations_results if r['originalObservationId'] == original_obs['id']), None)
if similar_obs:
obs_string += "Similar Observations and Their Orientations:\n"
for orientation in similar_obs['uniqueOrientations']:
obs_string += f" Orientation ID: {orientation['id']}\n"
obs_string += f" Content: {orientation.get('content', 'N/A')}\n"
obs_string += f" Timestamp: {orientation.get('timestamp', 'N/A')}\n\n"
else:
obs_string += "No similar observations found.\n\n"
result_strings.append(obs_string)
return "\n".join(result_strings)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment