Created
February 9, 2024 18:22
-
-
Save DGaffney/1f4401f5922ba2b7532ae3ca5174bdfc to your computer and use it in GitHub Desktop.
LSTM Demo Code with tunable transition matrix to illustrate increasing predictive power with increasing memory dependence on prior states
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#apt-get update && apt-get install -y tmux htop unzip nvtop zip rsync libpq-dev nano | |
#python3 -m pip install scikit-learn pandas tensorflow | |
from collections import Counter, defaultdict | |
import csv | |
import hashlib | |
import random | |
import numpy as np | |
import pandas as pd | |
from sklearn.model_selection import KFold | |
from tensorflow.keras.models import Sequential | |
from tensorflow.keras.regularizers import l2 | |
from tensorflow.keras.optimizers import Adam | |
from tensorflow.keras.layers import LSTM, Dense, Embedding, Dropout, Masking | |
from tensorflow.keras.preprocessing.sequence import pad_sequences | |
from tensorflow.keras.utils import to_categorical | |
from tensorflow.keras.callbacks import EarlyStopping | |
import numpy as np | |
from tensorflow.keras.models import load_model, save_model | |
class SyntheticDataGenerator: | |
@staticmethod | |
def generate(num_sequences=1000, sequence_length=10, memory_size=5, memory_influence=0.5): | |
""" | |
Generates synthetic sequences of events with a memory-influenced transition probability, | |
adjusted by a tunable memory influence parameter. | |
Parameters: | |
- num_sequences (int): Number of sequences to generate. Default is 1000. | |
- sequence_length (int): Length of each sequence. Default is 10. | |
- memory_size (int): The number of recent events to consider for memory influence. Default is 5. | |
- memory_influence (float): Tunable parameter to adjust memory's influence on next event selection (0 to 1). | |
Returns: | |
- events (list of str): List of unique events that can occur. | |
- synthetic_data (list of str): List of generated event sequences as strings. | |
""" | |
events = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H'] | |
transition_dict = { | |
'A': [('B', 0.5), ('C', 0.1), ('D', 0.4)], | |
'C': [('D', 0.8)], | |
'D': [('A', 0.9)], | |
} | |
synthetic_data = [] | |
for _ in range(num_sequences): | |
sequence_memory = defaultdict(int) # Initialize memory for the sequence | |
sequence = [random.choice(events)] | |
sequence_memory[sequence[-1]] += 1 # Update memory for the first event | |
for _ in range(1, sequence_length): | |
current_event = sequence[-1] | |
next_event = SyntheticDataGenerator.choose_next_event_with_memory(transition_dict, current_event, events, sequence_memory, memory_size, memory_influence) | |
sequence.append(next_event) | |
# Update memory | |
sequence_memory[next_event] += 1 | |
# Keep memory size within the limit | |
if len(sequence) > memory_size: | |
sequence_memory[sequence[-(memory_size + 1)]] -= 1 | |
synthetic_data.append(''.join(sequence)) | |
return events, synthetic_data | |
@staticmethod | |
def choose_next_event_with_memory(transition_dict, current_event, events, sequence_memory, memory_size, memory_influence): | |
""" | |
Chooses the next event in a sequence based on a transition dictionary, current event, recent event memory, | |
and a tunable memory influence parameter. | |
Parameters: | |
- transition_dict (dict): Transition probabilities between events. | |
- current_event (str): The current event in the sequence. | |
- events (list of str): All possible events. | |
- sequence_memory (defaultdict(int)): The memory of recent events and their counts. | |
- memory_size (int): The size of the memory window. | |
- memory_influence (float): Determines the probability of choosing the next event based on memory vs. transition probabilities. | |
Returns: | |
- next_event (str): The next event in the sequence, chosen based on transition probabilities, memory influence, | |
and the tunable parameter. | |
""" | |
# Decide whether to choose based on memory or transition probabilities | |
if random.random() < memory_influence and sequence_memory: | |
# Choose based on memory | |
memory_events = [event for event, count in sequence_memory.items() for _ in range(count)] | |
if memory_events: # Ensure there are events in memory to choose from | |
return random.choice(memory_events) | |
else: | |
# Choose based on transition probabilities or uniformly if no specific transitions are defined | |
if current_event in transition_dict and transition_dict[current_event]: | |
transitions = transition_dict[current_event] | |
choices, weights = zip(*transitions) | |
return random.choices(choices, weights=weights, k=1)[0] | |
else: | |
# No transitions defined or opting for uniform choice among all events | |
return random.choice(events) | |
# Fallback in case of empty memory (should not occur due to checks, but included for robustness) | |
return random.choice(events) | |
@staticmethod | |
def normalize_transition_probabilities(transition_dict): | |
""" | |
Normalizes the transition probabilities in the transition dictionary so that the sum of probabilities | |
for transitions from any given event equals 1. | |
Parameters: | |
- transition_dict (dict): A dictionary where keys are events and values are lists of tuples (event, probability). | |
Modifies: | |
- transition_dict: Each event's transition probabilities are normalized if their sum exceeds 1. | |
""" | |
for event, transitions in transition_dict.items(): | |
total_probability = sum(prob for _, prob in transitions) | |
if total_probability > 1: | |
normalized_transitions = [(next_event, prob / total_probability) for next_event, prob in transitions] | |
transition_dict[event] = normalized_transitions | |
@staticmethod | |
def write_to_file(synthetic_data, file_path): | |
""" | |
Writes the synthetic event data to a file, with each sequence on a new line. | |
Parameters: | |
- synthetic_data (list of str): The synthetic event data, where each string represents a sequence of events. | |
- file_path (str): The path to the file where the synthetic data will be written. | |
""" | |
with open(file_path, 'w') as file: | |
for sequence in synthetic_data: | |
# Convert the sequence to a comma-separated string and write it to the file | |
file.write(','.join(sequence) + '\n') | |
class ValidationDiagnostics: | |
@staticmethod | |
def generate_transition_diff_percentage(transition_map, actual_map): | |
""" | |
Calculates the percentage difference between predicted and actual transition counts for each transition. | |
Parameters: | |
- transition_map (dict): A dictionary with predicted transition counts, where keys are starting states and values are dictionaries of end states with their predicted counts. | |
- actual_map (dict): A dictionary with actual transition counts, similar in structure to transition_map. | |
Returns: | |
- diff_percentage (dict): A dictionary structured similarly to the input maps, including the actual count, predicted count, and the absolute percentage difference for each transition. | |
""" | |
diff_percentage = {} | |
# Calculate differences in percentage | |
for state, actual_transitions in actual_map.items(): | |
if state not in diff_percentage: | |
diff_percentage[state] = {} | |
for actual_transition, actual_count in actual_transitions.items(): | |
predicted_count = transition_map.get(state, {}).get(actual_transition, 0) | |
# Calculate percentage difference | |
if actual_count > 0: | |
percentage_diff = ((predicted_count - actual_count) / actual_count) * 100 | |
elif predicted_count > 0: | |
percentage_diff = 100 # If actual_count is 0 but predicted_count > 0 | |
else: | |
percentage_diff = 0 # If both are 0 | |
diff_percentage[state][actual_transition] = { | |
'actual': actual_count, | |
'predicted': predicted_count, | |
'percentage_difference': np.abs(percentage_diff) | |
} | |
# Check for predicted transitions not present in actual transitions | |
for state, predicted_transitions in transition_map.items(): | |
if state not in actual_map: | |
if state not in diff_percentage: | |
diff_percentage[state] = {} | |
for predicted_transition, predicted_count in predicted_transitions.items(): | |
if predicted_transition not in actual_map.get(state, {}): | |
diff_percentage[state][predicted_transition] = { | |
'actual': 0, | |
'predicted': predicted_count, | |
'percentage_difference': 100 # Predicted when there were none actual | |
} | |
return diff_percentage | |
@staticmethod | |
def calculate_transition_maps(diff): | |
""" | |
Generates normalized transition probability maps based on the differences calculated between actual and predicted transitions. | |
Parameters: | |
- diff (dict): The output from `generate_transition_diff_percentage`, containing detailed differences for each transition. | |
Returns: | |
- observed_transition_map (dict): A dictionary with normalized actual transition probabilities. | |
- predicted_transition_map (dict): A dictionary with normalized predicted transition probabilities. | |
""" | |
observed_transition_map = {} | |
predicted_transition_map = {} | |
for start, end_states in diff.items(): | |
observed_sum = sum([e['actual'] for e in end_states.values()]) | |
predicted_sum = sum([e['predicted'] for e in end_states.values()]) | |
if not observed_transition_map.get(start): | |
observed_transition_map[start] = {} | |
for k,v in end_states.items(): | |
observed_transition_map[start][k] = v["actual"]/observed_sum | |
if not predicted_transition_map.get(start): | |
predicted_transition_map[start] = {} | |
for k,v in end_states.items(): | |
predicted_transition_map[start][k] = v["predicted"]/predicted_sum | |
return observed_transition_map, predicted_transition_map | |
def generate_transition_and_actual_maps(model, event_to_int, real_data_file): | |
""" | |
Processes a dataset to generate actual and predicted transition maps, and calculates the accuracy of predictions. | |
Parameters: | |
- model: The predictive model being evaluated. | |
- event_to_int (dict): A mapping from events to integer representations, used by the model for predictions. | |
- real_data_file (str): The file path to the dataset used for validation, containing real sequences of events. | |
Returns: | |
- transition_map (dict): A dictionary with predicted transitions and their counts based on model predictions. | |
- actual_map (dict): A dictionary with actual transitions and their counts derived from the validation dataset. | |
- accuracy (float): The overall accuracy of the model's predictions, defined as the proportion of correct predictions. | |
""" | |
transition_map = {} | |
actual_map = {} | |
correct_hits = 0 | |
total_hits = 0 | |
test_dataset = [e.split(",") for e in open(real_data_file).read().split("\n")][:-1] | |
total_by_k = {str(k): 0 for k in range(1, 21)} | |
hits_by_k = {str(k): 0 for k in range(1, 21)} | |
for sample in test_dataset:#DemoModel.generate_synthetic_data(): | |
top_events = DemoModel.predict_next_events(model, sample[:-1], event_to_int, 20) | |
sorted_top_events = sorted(top_events, key=lambda x: x[1], reverse=True) | |
for i in range(1, 21): | |
total_by_k[str(i)] += 1 | |
if sample[-1] in [e[0] for e in sorted_top_events[:i]]: | |
hits_by_k[str(i)] += 1 | |
predicted = top_events[0][0] | |
if predicted == sample[-1]: | |
correct_hits += 1 | |
total_hits += 1 | |
print(f"Current Accuracy: {correct_hits}/{total_hits}") | |
if not transition_map.get(sample[-2]): | |
transition_map[sample[-2]] = {} | |
if not transition_map[sample[-2]].get(predicted): | |
transition_map[sample[-2]][predicted] = 0 | |
transition_map[sample[-2]][predicted] += 1 | |
if not actual_map.get(sample[-2]): | |
actual_map[sample[-2]] = {} | |
if not actual_map[sample[-2]].get(sample[-1]): | |
actual_map[sample[-2]][sample[-1]] = 0 | |
actual_map[sample[-2]][sample[-1]] += 1 | |
return transition_map, actual_map, correct_hits/total_hits, hits_by_k | |
class DemoModel: | |
def read_event_stream(file_path): | |
""" | |
Reads event sequences from a file and preprocesses the data for the model. | |
Parameters: | |
- file_path (str): Path to the file containing event sequences. | |
Returns: | |
- features (list of list of int): Encoded sequences of events, excluding the last event in each sequence. | |
- labels (numpy array): One-hot encoded array of the last event in each sequence. | |
- event_to_int (dict): Dictionary mapping event strings to unique integer codes. | |
""" | |
with open(file_path, 'r') as file: | |
lines = file.readlines() | |
event_streams = [line.strip().split(',') for line in lines] | |
flattened_list = [event for sublist in event_streams for event in sublist] | |
event_to_int = {event: i for i, event in enumerate(set(flattened_list))} | |
int_event_streams = [[event_to_int[event] for event in stream] for stream in event_streams] | |
features = [stream[:-1] for stream in int_event_streams] | |
labels = [stream[-1] for stream in int_event_streams] | |
labels = to_categorical(labels, num_classes=len(event_to_int)) | |
return features, labels, event_to_int | |
@staticmethod | |
def create_model(num_unique_events=4): | |
""" | |
Builds and compiles a LSTM-based neural network model for sequence prediction. | |
Parameters: | |
- num_unique_events (int): Number of unique events in the dataset to determine the output layer size. | |
Returns: | |
- model: A compiled Keras model ready for training. | |
""" | |
model = Sequential() | |
# Embedding layer for categorical input | |
# model.add(Embedding(input_dim=num_unique_events, output_dim=50, input_length=input_length)) | |
model.add(Embedding(input_dim=num_unique_events, output_dim=50)) | |
# LSTM layer | |
model.add(Masking(mask_value=0)) # Assuming 0 is not a valid event | |
model.add(LSTM(512, return_sequences=False)) # return_sequences might be True if stacking LSTMs | |
# Optional: Additional LSTM or Dense layers here | |
# Optional: Dropout for regularization | |
model.add(Dropout(0.5)) | |
# Dense layer for further processing | |
model.add(Dense(100, activation='relu')) # Size and number of Dense layers can vary | |
# Output layer: assuming a classification task | |
model.add(Dense(num_unique_events, activation='softmax')) | |
# Compile the model | |
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy']) | |
return model | |
@staticmethod | |
def data_generator(features, labels, batch_size, indices): | |
""" | |
Creates a generator that yields batches of data suitable for training or evaluating the model. This method ensures that data is fed to the model in manageable chunks, optimizing memory usage and facilitating efficient training, especially with large datasets. | |
Parameters: | |
- features (list of list of int): Preprocessed sequences of events, where each event is represented by an integer code. | |
- labels (numpy array): The corresponding labels for each sequence, typically one-hot encoded to represent the next event in the sequence. | |
- batch_size (int): The number of sequences to include in each batch. Determines how many sequences the model will process at once. | |
- indices (list of int): Indices of the samples to be included in the batches. This allows for selective inclusion of samples, useful in scenarios like K-fold cross-validation. | |
Yields: | |
- batch_features (numpy array): A batch of sequence features, padded to ensure consistent length across the batch. | |
- batch_labels (numpy array): The labels corresponding to each sequence in the batch, suitable for model training or evaluation. | |
""" | |
num_samples = len(indices) | |
while True: # Loop forever so the generator never terminates | |
for offset in range(0, num_samples, batch_size): | |
batch_indices = indices[offset:offset + batch_size] | |
# Select the current batch | |
batch_features = [features[i] for i in batch_indices] | |
batch_labels = [labels[i] for i in batch_indices] | |
# Pad sequences within each batch | |
padded_features = pad_sequences(batch_features) | |
yield np.array(padded_features), np.array(batch_labels) | |
def train_model(file_path, batch_size=32, n_folds=5, n_epochs=100, save_path='final_model.h5', use_kfold=True): | |
""" | |
Trains the predictive model using either K-fold cross-validation or on the entire dataset. | |
Parameters: | |
- file_path (str): Path to the dataset file. | |
- batch_size (int): Number of samples per batch. | |
- n_folds (int): Number of folds for K-fold cross-validation. | |
- n_epochs (int): Number of epochs for training. | |
- save_path (str): File path to save the trained model. | |
- use_kfold (bool): Whether to use K-fold cross-validation. | |
Returns: | |
- model: The trained Keras model. | |
- event_to_int (dict): Mapping of event strings to integer codes. | |
""" | |
features, labels, event_to_int = DemoModel.read_event_stream(file_path) | |
num_unique_events = len(event_to_int) | |
if use_kfold: | |
# KFold cross-validator | |
kf = KFold(n_splits=n_folds, shuffle=True) | |
fold = 1 | |
for train_index, test_index in kf.split(features): | |
print(f"Training on fold {fold}/{n_folds}") | |
# Create the data generators for train and test sets | |
train_generator = DemoModel.data_generator(features, labels, batch_size, train_index) | |
test_generator = DemoModel.data_generator(features, labels, batch_size, test_index) | |
steps_per_epoch = len(train_index) // batch_size | |
validation_steps = len(test_index) // batch_size | |
# Create and train a new model instance for each fold | |
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True) | |
model = DemoModel.create_model(num_unique_events) | |
model.fit(train_generator, steps_per_epoch=steps_per_epoch, epochs=n_epochs, | |
validation_data=test_generator, validation_steps=validation_steps, verbose=2, callbacks=[early_stopping]) | |
scores = model.evaluate(test_generator, steps=validation_steps, verbose=0) | |
print(f"Fold {fold} Accuracy: {scores[1]*100}%") | |
fold += 1 | |
# Optionally, save the model from the last fold | |
save_model(model, save_path) | |
print(f"Model from last fold saved to {save_path}") | |
else: | |
# Train on the entire dataset | |
print("Training on the entire dataset") | |
entire_dataset_generator = DemoModel.data_generator(features, labels, batch_size, range(len(features))) | |
steps_per_epoch = len(features) // batch_size | |
# Create and train the model | |
model = DemoModel.create_model(num_unique_events) | |
model.fit(entire_dataset_generator, steps_per_epoch=steps_per_epoch, epochs=n_epochs, verbose=2) | |
# Save the model trained on the entire dataset | |
save_model(model, save_path) | |
print(f"Model trained on the entire dataset saved to {save_path}") | |
return model, event_to_int | |
@staticmethod | |
def predict_next_event(model, event_stream, event_to_int): | |
""" | |
Predicts the most likely next event given a sequence of previous events. | |
Parameters: | |
- model: The trained Keras model for prediction. | |
- event_stream (list of str): The sequence of previous events. | |
- event_to_int (dict): Mapping of event strings to integer codes. | |
Returns: | |
- str: The predicted next event. | |
""" | |
# Directly leverage predict_next_events with k=1 to get the single most probable event | |
top_event = DemoModel.predict_next_events(model, event_stream, event_to_int, 1)[0] | |
# Extract and return only the event from the tuple (event, probability) | |
return top_event[0] | |
@staticmethod | |
def predict_next_events(model, event_stream, event_to_int, k=3): | |
""" | |
Predicts the top k most likely next events given a sequence of previous events, along with their probabilities. | |
Parameters: | |
- model: The trained Keras model for prediction. | |
- event_stream (list of str): The sequence of previous events. | |
- event_to_int (dict): Mapping of event strings to integer codes. | |
- k (int): The number of predictions to return. | |
Returns: | |
- list of tuples (str, float): The top k predicted events and their respective probabilities. | |
""" | |
# Convert event stream to integers using the mapping | |
int_stream = [event_to_int[event] for event in event_stream] | |
# Pad the sequence if necessary | |
padded_stream = pad_sequences([int_stream]) | |
# Predict the next events | |
prediction = model.predict(padded_stream)[0] # Get the first (and only) prediction | |
# Get the top k predictions | |
top_k_indices = prediction.argsort()[-k:][::-1] # Indices of top k predictions | |
top_k_probabilities = prediction[top_k_indices] # Probabilities of top k predictions | |
# Convert indices to events | |
int_to_event = {v: k for k, v in event_to_int.items()} | |
top_k_events = [(int_to_event[idx], prob) for idx, prob in zip(top_k_indices, top_k_probabilities)] | |
return top_k_events | |
@staticmethod | |
def run_memoryless_demo(): | |
""" | |
Demonstrates the complete workflow from data generation, model training, to evaluation using synthetic data. | |
""" | |
#First, generate a training dataset | |
events, synthetic_data = SyntheticDataGenerator.generate(1000, 10, 5, 0.0) | |
SyntheticDataGenerator.write_to_file( | |
synthetic_data, | |
"synth_data_train.txt" | |
) | |
#Then build a model trained on that data | |
model, event_to_int = DemoModel.train_model("synth_data_train.txt", batch_size=64, n_folds=5, n_epochs=50, save_path='final_model.h5', use_kfold=False) | |
#Then, generate another dataset using the same generation procedure, but a unique set of traces that haven't been seen yet | |
events, synthetic_data = SyntheticDataGenerator.generate(1000, 10, 5, 0.0) | |
SyntheticDataGenerator.write_to_file( | |
synthetic_data, | |
"synth_data_test.txt" | |
) | |
#Then use the model to predict the last transition in the chain, and compare to actual last transition in the chain. | |
transition_map, actual_map, accuracy, hits_by_k = ValidationDiagnostics.generate_transition_and_actual_maps(model, event_to_int, "synth_data_test.txt") | |
diff = ValidationDiagnostics.generate_transition_diff_percentage(transition_map, actual_map) | |
observed_transition_map, predicted_transition_map = ValidationDiagnostics.calculate_transition_maps(diff) | |
return {"transition_map": transition_map, "actual_map": actual_map, "accuracy": accuracy, "diff": diff, "observed_transition_map": observed_transition_map, "predicted_transition_map": predicted_transition_map, "hits_by_k": hits_by_k} | |
@staticmethod | |
def run_memory_demo(): | |
""" | |
Demonstrates the complete workflow from data generation, model training, to evaluation using synthetic memory-based data. | |
""" | |
#First, generate a training dataset | |
events, synthetic_data = SyntheticDataGenerator.generate(1000, 10, 5, 0.8) | |
SyntheticDataGenerator.write_to_file( | |
synthetic_data, | |
"synth_data_train_memory.txt" | |
) | |
#Then build a model trained on that data | |
model, event_to_int = DemoModel.train_model("synth_data_train_memory.txt", batch_size=64, n_folds=5, n_epochs=50, save_path='final_model.h5', use_kfold=False) | |
#Then, generate another dataset using the same generation procedure, but a unique set of traces that haven't been seen yet | |
events, synthetic_data = SyntheticDataGenerator.generate(1000, 10, 5, 0.8) | |
SyntheticDataGenerator.write_to_file( | |
synthetic_data, | |
"synth_data_test_memory.txt" | |
) | |
#Then use the model to predict the last transition in the chain, and compare to actual last transition in the chain. | |
transition_map, actual_map, accuracy, hits_by_k = ValidationDiagnostics.generate_transition_and_actual_maps(model, event_to_int, "synth_data_test_memory.txt") | |
diff = ValidationDiagnostics.generate_transition_diff_percentage(transition_map, actual_map) | |
observed_transition_map, predicted_transition_map = ValidationDiagnostics.calculate_transition_maps(diff) | |
return {"transition_map": transition_map, "actual_map": actual_map, "accuracy": accuracy, "diff": diff, "observed_transition_map": observed_transition_map, "predicted_transition_map": predicted_transition_map, "hits_by_k": hits_by_k} | |
@staticmethod | |
def run_demo(): | |
""" | |
Demonstrates the complete workflow from data generation, model training, to evaluation using synthetic memory-based data. | |
""" | |
memoryless_demo = DemoModel.run_memoryless_demo() | |
memory_demo = DemoModel.run_memory_demo() | |
memoryless_demo["hits_by_k"] | |
memoryless_demo["accuracy"] | |
memory_demo["hits_by_k"] | |
memory_demo["accuracy"] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment