Created
January 8, 2025 03:57
-
-
Save cnmoro/f91b57c9964418569bc671b1469e35f0 to your computer and use it in GitHub Desktop.
GraphWalkEncoder
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import random | |
from collections import defaultdict | |
class GraphWalkEncoder: | |
def __init__(self, vocab_size, vector_size=64, walk_length=5): | |
""" | |
Initializes the encoder with a dynamic graph and node states. | |
""" | |
self.vector_size = vector_size | |
self.walk_length = walk_length | |
self.vocab_size = vocab_size | |
# Node states: Each word in the vocab gets a vector initialized randomly | |
self.node_states = {i: np.random.rand(vector_size) for i in range(vocab_size)} | |
self.edge_weights = defaultdict(lambda: 1) # All edges start with equal weight | |
def add_edge(self, node1, node2): | |
""" | |
Adds or strengthens an edge between two nodes (words). | |
""" | |
self.edge_weights[(node1, node2)] += 1 | |
def random_walk(self, sentence): | |
""" | |
Perform a random walk through the sentence and encode relationships. | |
""" | |
nodes = sentence | |
walk = [random.choice(nodes)] | |
for _ in range(self.walk_length - 1): | |
current_node = walk[-1] | |
neighbors = [node for node in nodes if node != current_node] | |
# Weighted choice based on edge weights | |
weights = [self.edge_weights[(current_node, neighbor)] for neighbor in neighbors] | |
probabilities = np.array(weights) / sum(weights) | |
next_node = np.random.choice(neighbors, p=probabilities) | |
walk.append(next_node) | |
return walk | |
def encode(self, sentence): | |
""" | |
Encodes a sentence into fixed-size vectors. | |
""" | |
# Convert the sentence into node indices | |
nodes = sentence | |
# Perform a random walk | |
walk = self.random_walk(nodes) | |
# Update the node states based on the walk | |
updated_states = np.zeros((len(nodes), self.vector_size)) | |
for i, node in enumerate(nodes): | |
contributions = [self.node_states[step] for step in walk if step == node] | |
if contributions: | |
updated_states[i] = np.mean(contributions, axis=0) | |
return updated_states | |
# Preprocessing | |
class SimpleVocab: | |
def __init__(self): | |
self.word_to_idx = {} | |
self.idx_to_word = {} | |
def build_vocab(self, texts): | |
""" | |
Builds a vocabulary from a list of texts. | |
""" | |
idx = 0 | |
for text in texts: | |
for word in text.split(): | |
if word not in self.word_to_idx: | |
self.word_to_idx[word] = idx | |
self.idx_to_word[idx] = word | |
idx += 1 | |
def encode(self, text): | |
""" | |
Encodes a text as a sequence of word indices. | |
""" | |
return [self.word_to_idx[word] for word in text.split()] | |
def decode(self, indices): | |
""" | |
Decodes a sequence of word indices back into text. | |
""" | |
return [self.idx_to_word[idx] for idx in indices] | |
# Example Usage | |
if __name__ == "__main__": | |
# Sample texts | |
texts = [ | |
"Today there are more fans", | |
"Netflix popular show fans", | |
"Squid Game fans are everywhere", | |
] | |
# Build vocab | |
vocab = SimpleVocab() | |
vocab.build_vocab(texts) | |
# Initialize encoder | |
encoder = GraphWalkEncoder(vocab_size=len(vocab.word_to_idx), vector_size=16, walk_length=3) | |
# Process texts | |
for text in texts: | |
encoded_sentence = vocab.encode(text) | |
for i in range(len(encoded_sentence) - 1): | |
encoder.add_edge(encoded_sentence[i], encoded_sentence[i + 1]) | |
# Encode a sample sentence | |
test_text = "Today there are fans" | |
encoded_test = vocab.encode(test_text) | |
encoded_vectors = encoder.encode(encoded_test) | |
# Display results | |
print("Encoded Vectors for Sentence:") | |
for word, vector in zip(test_text.split(), encoded_vectors): | |
print(f"Word: {word}, Vector: {vector}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment