Last active
April 23, 2019 22:51
-
-
Save bhtucker/641827cf8024e286335855aae72e0613 to your computer and use it in GitHub Desktop.
Use a Hogwild-inspired algorithm to learn logistic regression over a sample dataset in parallel
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Demo of using Hogwild algorthim for parallel learning with shared memory | |
Uses sklearn's LogisticRegression for accuracy comparison | |
Output | |
('initial accuracy:', 0.45333333333333331) | |
worker 25974 score 0.93 | |
worker 25975 score 0.92 | |
worker 25976 score 0.88 | |
worker 25974 score 0.94 | |
worker 25975 score 0.92 | |
worker 25976 score 0.88 | |
worker 25974 score 0.94 | |
worker 25975 score 0.92 | |
worker 25976 score 0.88 | |
worker 25974 score 0.94 | |
worker 25975 score 0.92 | |
worker 25976 score 0.88 | |
worker 25974 score 0.94 | |
worker 25976 score 0.88 | |
worker 25975 score 0.92 | |
worker 25974 score 0.94 | |
worker 25976 score 0.88 | |
worker 25975 score 0.92 | |
('final accuracy:', 0.91333333333333333) | |
('sklearn accuracy:', 0.91333333333333333) | |
""" | |
import numpy as np | |
from sklearn.datasets import make_classification | |
from sklearn.linear_model import LogisticRegression | |
from sklearn.metrics import accuracy_score | |
import os | |
from multiprocessing import Process, Pool | |
from multiprocessing.sharedctypes import Array | |
from ctypes import c_double | |
base_learning_rate = .1 # in addition to normalization relative to batch size | |
seed = 338 # for reproducibility (though the process-unsafe code is nondeterministic) | |
proc_count = 3 # how many processes? | |
n_features = 4 # dimensionality of generated data | |
def sigmoid(z): | |
# returns activation for dot product | |
return 1 / (1 + np.exp(-z)) | |
def sigmoid_prime(a): | |
# returns activation derivative + 1e-5 to avoid vanishing gradients | |
return (a * (1 - a)) + 1e-5 | |
def train_step(x, w, y): | |
activation = sigmoid(np.dot(x, w)) | |
# how much did we miss? | |
error = y - activation | |
# multiply how much we missed by the | |
# slope of the sigmoid at the current activation | |
delta = error * sigmoid_prime(activation) | |
# return gradient update | |
return x * delta * base_learning_rate | |
def mse(data, w): | |
x, y = data | |
y_hat = np.apply_along_axis(sigmoid, arr=np.dot(x, w), axis=0) | |
return np.mean(y_hat - y) | |
def accuracy(data, w): | |
x, y = data | |
y_hat = np.apply_along_axis(lambda v: np.round(sigmoid(v)), arr=np.dot(x, w), axis=0) | |
return accuracy_score(y_hat, y) | |
def worker_func(data, batch_size=10, passes=300, score_func=accuracy): | |
w = np.ctypeslib.as_array(shared_array_base) | |
pid = os.getpid() | |
learning_rate = 1. / batch_size | |
update_buffer = [] | |
for p in range(passes): | |
for x, y in zip(*data): | |
update_buffer.append(train_step(x, w, y)) | |
if len(update_buffer) >= batch_size: | |
w += (sum(update_buffer) * learning_rate) | |
update_buffer = [] | |
if p % 50 == 0: | |
print('worker %s score %s' % (pid, score_func(data, w))) | |
def setup_shared_array(): | |
# returns an array that's shareable via Pool.map | |
shared_array_base = Array(c_double, n_features, lock=False) | |
np.random.seed(seed) | |
for ix, val in enumerate(np.random.rand(n_features)): | |
shared_array_base[ix] = val / 4. # divide by 4 to get closer to zero -> stronger gradients | |
return shared_array_base | |
if __name__ == '__main__': | |
data_size = proc_count * 100 | |
data = make_classification( | |
n_samples=data_size, n_classes=2, random_state=seed, n_features=n_features | |
) | |
shared_array_base = setup_shared_array() | |
# split up data for workers | |
worker_chunk_size = data_size / proc_count | |
data_chunks = [ | |
(data[0][i: i + worker_chunk_size], data[1][i: i + worker_chunk_size]) | |
for i in range(0, data_size, worker_chunk_size) | |
] | |
# create a shared array | |
shared_array_base = setup_shared_array() | |
pool = Pool(proc_count) | |
print("initial accuracy:", accuracy(data, np.ctypeslib.as_array(shared_array_base))) | |
pool.map(worker_func, data_chunks) | |
print("final accuracy:", accuracy(data, np.ctypeslib.as_array(shared_array_base))) | |
lr = LogisticRegression() | |
lr.fit(*data) | |
print("sklearn accuracy:", lr.score(*data)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment