Created
August 3, 2016 20:09
-
-
Save usholanb/ba4aa249993a6a903262ba1d4b2ac4a5 to your computer and use it in GitHub Desktop.
Python RNN with numpy only
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
############################################################################# | |
# Abstract: This code trains a model to predict next character based on | |
# the previous ones. | |
# | |
# More details: | |
# 1. Each character is represented as a vector of size 256. It is all | |
# zeros except the index where that characters stands in ASCII table. | |
# 2. Code divides the text on chunks of input_size. | |
# 3. The teaching labels of all the characters in a chunk are the next | |
# characters, so we just shift the chuck to the right and assign the | |
# last character to space(ASCII code - 32 in decimal) | |
# 4. The prediction of next character is based on the previous character | |
# and all the previous text which influence on prediction decision | |
# via the hidden layer. | |
# 5. After all the predictions are made, the gradient is calculated | |
# based on the errors. | |
############################################################################# | |
import numpy as np | |
import matplotlib.pyplot as plt | |
import pickle | |
hidden_layer_size = 100 | |
character_size = 256 | |
input_size = 20 | |
learning_rate = 0.01 | |
epsilon = 10 ** (-5) | |
epochus = 25 | |
train_new = 1 | |
training_file = open("christmas_carol.txt") | |
training_data = [] | |
for line in training_file: | |
temp = list(line.strip()) | |
for i in xrange(len(temp)): | |
temp[i] = ord(temp[i]) | |
training_data += temp | |
temp = training_data | |
training_labels = temp[1:] + [32] | |
training_data = np.zeros((len(temp), character_size)) | |
training_data[xrange(len(temp)), temp] = 1.0 | |
training_file.close() | |
# W_xh = (100, 256) | |
wts_xh = np.random.uniform(-np.sqrt(1.0 / character_size), \ | |
np.sqrt(1.0 / character_size), \ | |
(hidden_layer_size, character_size)) | |
# b_h = (256, 1) | |
bias_h = np.random.uniform(-np.sqrt(1.0 / hidden_layer_size), \ | |
np.sqrt(1.0 / hidden_layer_size), \ | |
(hidden_layer_size)) | |
# W_hh = (100, 100) | |
wts_hh = np.random.uniform(-np.sqrt(1.0 / hidden_layer_size), \ | |
np.sqrt(1.0 / hidden_layer_size), \ | |
(hidden_layer_size, hidden_layer_size)) | |
# b_o = (256, 1) | |
bias_o = np.random.uniform(-np.sqrt(1.0 / hidden_layer_size), \ | |
np.sqrt(1.0 / hidden_layer_size), \ | |
(character_size)) | |
# W_ho = (256, 100) | |
wts_ho = np.random.uniform(-np.sqrt(1.0 / hidden_layer_size), \ | |
np.sqrt(1.0 / hidden_layer_size), \ | |
(character_size, hidden_layer_size)) | |
m_wxh = np.zeros(wts_xh.shape) | |
m_bh = np.zeros(bias_h.shape) | |
m_whh = np.zeros(wts_hh.shape) | |
m_bo = np.zeros(bias_o.shape) | |
m_who = np.zeros(wts_ho.shape) | |
def softmax(x): | |
xt = np.exp(x - np.max(x)) | |
return xt / np.sum(xt) | |
def total_loss(data, labels, wts_xh, wts_hh, wts_ho, bias_h, bias_o): | |
loss = 0.0 | |
i = 0 | |
while i + input_size <= len(data): | |
output, hidden = forward_prop(data[i:i+input_size], wts_xh,\ | |
wts_hh, wts_ho, bias_h, bias_o) | |
predictions = output[np.arange(input_size), labels[i:i+input_size]] | |
loss += -1 * np.sum(np.log(predictions)) | |
i += input_size | |
return loss / (i / input_size * 1.0) | |
def forward_prop(x, wts_xh, wts_hh, wts_ho, bias_h, bias_o): | |
T = len(x) | |
hidden_layers = np.zeros((T + 1, hidden_layer_size)) | |
hidden_layers[-1] = np.zeros(hidden_layer_size) | |
outputs = np.zeros((T, character_size)) | |
for t in range(T): | |
hidden_layers[t] = np.tanh(wts_xh[:,int(np.argmax(x[t]))] + | |
wts_hh.dot(hidden_layers[t]) + bias_h) | |
outputs[t] = softmax(wts_ho.dot(hidden_layers[t]) + bias_o) | |
return [outputs, hidden_layers] | |
def predict_sequence(x, wts_xh, wts_hh, wts_ho, bias_h, bias_o): | |
sequence = [chr(np.argmax(x))] | |
sequence_length = input_size | |
hidden_layers = np.zeros((sequence_length + 1, hidden_layer_size)) | |
hidden_layers[-1] = np.zeros(hidden_layer_size) | |
outputs = np.zeros((sequence_length, character_size)) | |
for t in range(sequence_length): | |
hidden_layers[t] = np.tanh(wts_xh[:, np.argmax(x)] + | |
wts_hh.dot(hidden_layers[t]) + bias_h) | |
outputs[t] = softmax(wts_ho.dot(hidden_layers[t]) + bias_o) | |
x = outputs[t] | |
for i in range(sequence_length): | |
flip = np.random.multinomial(1, outputs[i], size = 1) | |
c = np.argmax(flip) | |
sequence.append(chr(c)) | |
return "".join(sequence) | |
epochs = 1 | |
training_loss = [] | |
training_epochs = [] | |
if train_new == 1: | |
while epochs < epochus + 1: | |
i = 0 # Iterate through examples | |
loss = 0 | |
while i + input_size < len(training_data): | |
#for t in range(input_size): | |
x = np.array(training_data[i:i+input_size]) | |
y = np.array(training_labels[i:i+input_size]) | |
T = len(y) | |
### Forward Prop ### | |
outputs, hidden_layers = forward_prop(x, wts_xh, wts_hh, wts_ho, | |
bias_h, bias_o) | |
### Back Propogation ### | |
# We accumulate the gradients in these variables: | |
g_wts_ho = np.zeros(wts_ho.shape) | |
g_wts_hh = np.zeros(wts_hh.shape) | |
g_bias_o = np.zeros(bias_o.shape) | |
g_bias_h = np.zeros(bias_h.shape) | |
g_wts_xh = np.zeros(wts_xh.shape) | |
# Delta for output layer: y-t (softmax) | |
delta_o = outputs | |
delta_o[np.arange(T), y] -= 1 | |
# For each output backwards... | |
for t in np.arange(T)[::-1]: | |
g_bias_o += delta_o[t] | |
g_wts_ho += np.outer(delta_o[t], hidden_layers[t].T) | |
delta_h = wts_ho.T.dot(delta_o[t]) * (1- hidden_layers[t] ** 2) | |
# Backpropagation through time (for at most [truncate] steps) | |
# Add to gradients at each previous step | |
g_bias_h += delta_h | |
g_wts_hh += np.outer(delta_h, hidden_layers[t-1]) | |
g_wts_xh += np.outer(delta_h, x[t].T) | |
# Update delta for next step at t-1 | |
delta_h = wts_hh.T.dot(delta_h) * (1 - hidden_layers[t-1] ** 2) | |
for param in [g_wts_ho, g_wts_hh, g_wts_xh, g_bias_o, g_bias_h]: | |
np.clip(param, -5, 5, out = param) | |
### Weight updates ### | |
for param, dparam, mem, in zip([wts_xh, wts_hh, wts_ho, bias_h, bias_o],\ | |
[g_wts_xh, g_wts_hh, g_wts_ho, g_bias_h, g_bias_o],\ | |
[m_wxh, m_whh, m_who, m_bh, m_bo]): | |
mem += dparam * dparam | |
param += - learning_rate * dparam / np.sqrt(mem + 1e-8) | |
i += input_size # Next training example | |
loss = total_loss(training_data, training_labels, wts_xh, \ | |
wts_hh, wts_ho, bias_h, bias_o) | |
print "epoch # ", epochs, " training_loss = ", loss | |
if epochs % 5 == 0: | |
print predict_sequence(np.array(training_data[3]), wts_xh, wts_hh, wts_ho,\ | |
bias_h, bias_o) | |
training_loss.append(loss) | |
training_epochs.append(epochs) | |
epochs += 1 | |
else: | |
wts_ho, wts_hh, wts_xh, bias_o, bias_h = pickle.load(open("hw4_weights.p", "rb")) | |
print predict_sequence(np.array(training_data[4343]), wts_xh, wts_hh, wts_ho, | |
bias_h, bias_o) | |
plt.plot(training_epochs, training_loss) | |
plt.xlabel("Training Epochs") | |
plt.ylabel("Training Loss") | |
plt.title("Training Loss vs. Number of Training Epochs") | |
plt.show() | |
# save the model hw4_weights#epochs#input_size#update_of_input_size | |
pickle.dump((wts_ho, wts_hh, wts_xh, bias_o, bias_h), open("hw4_weights25_10_10.p", "wb")) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment