code was downloaded from this website and revised for run it in command line.
It is a simple EM algorithm for two Gaussian Mixture Model.
code was downloaded from this website and revised for run it in command line.
It is a simple EM algorithm for two Gaussian Mixture Model.
# -*- coding: utf-8 -*- | |
from numpy import random | |
from numpy.random import normal | |
from numpy import concatenate,array,zeros,r_,ones | |
from math import log,sqrt | |
from scipy.stats import norm | |
def model_str(p): | |
return "mu_1:{u1:.3f},sig_1:{s1:.3f},pi_1:{p1:.3f}\tmu_2:{u2:.3f},sig_2:{s2:.3f},pi_2:{p2:.3f}".format(u1=p[0],s1=p[1],p1=p[4],u2=p[2],s2=p[3],p2=1-p[4]) | |
def pdf_model(x, p): | |
mu1, sig1, mu2, sig2, pi_1 = p | |
return pi_1*norm.pdf(x, mu1, sig1) + (1-pi_1)*norm.pdf(x, mu2, sig2) | |
def log_likelihood_two_1d_gauss(p, sample): | |
return -log(pdf_model(sample, p)).sum() | |
def sim_two_gauss_mix(p, N=1000): | |
s1 = normal(p[0], p[1], size=N*p[4]) | |
s2 = normal(p[2], p[3], size=N*(1-p[4])) | |
return concatenate([s1,s2]) | |
def fit_two_peaks_EM(sample, sigma=None, weights=False, p0=array([0.2,0.2,0.7,0.2,0.5]), | |
max_iter=300, tollerance=1e-5): | |
if not weights: w = ones(sample.size) | |
else: w = 1./(sigma**2) | |
w *= 1.*w.size/w.sum() # renormalization so they sum to N | |
# Initial guess of parameters and initializations | |
mu = array([p0[0], p0[2]]) | |
sig = array([p0[1], p0[3]]) | |
pi_ = array([p0[4], 1-p0[4]]) | |
gamma, N_ = zeros((2, sample.size)), zeros(2) | |
p_new = array(p0) | |
N = sample.size | |
# EM loop | |
counter = 0 | |
converged, stop_iteration = False, False | |
while not stop_iteration: | |
p_old = p_new | |
# Compute the responsibility func. and new parameters | |
for k in [0,1]: | |
gamma[k,:] = w*pi_[k]*norm.pdf(sample, mu[k], sig[k])/pdf_model(sample, p_new) # SCHEME1 | |
#gamma[k,:] = pi_[k]*norm.pdf(sample, mu[k], sig[k])/pdf_model(sample, p_new) # SCHEME2 | |
N_[k] = gamma[k,:].sum() | |
mu[k] = sum(gamma[k]*sample)/N_[k] # SCHEME1 | |
#mu[k] = sum(w*gamma[k]*sample)/sum(w*gamma[k]) # SCHEME2 | |
sig[k] = sqrt( sum(gamma[k]*(sample-mu[k])**2)/N_[k] ) | |
pi_[k] = 1.*N_[k]/N | |
p_new = array([mu[0], sig[0], mu[1], sig[1], pi_[0]]) | |
assert abs(N_.sum() - N)/float(N) < 1e-6 | |
assert abs(pi_.sum() - 1) < 1e-6 | |
# Convergence check | |
counter += 1 | |
max_variation = max((p_new-p_old)/p_old) | |
converged = True if max_variation < tollerance else False | |
stop_iteration = converged or (counter >= max_iter) | |
#print "Iterations:", counter | |
if not converged: logging.warning("WARNING: Not converged") | |
return p_new | |
if __name__=="__main__": | |
import logging | |
logging.basicConfig(level=logging.WARNING) | |
p=[0.2,0.2,0.8,0.3,0.3] | |
s = sim_two_gauss_mix(N=1000, p=p) | |
print "data:",model_str(p) | |
model=fit_two_peaks_EM(s) | |
print "em :",model_str(model) |
# -*- coding: utf-8 -*- | |
from numpy import random | |
from numpy.random import normal | |
from numpy import concatenate,array,zeros,r_,ones,log,sqrt,pi,exp,abs | |
import sys | |
import math | |
_norm_pdf_C = math.sqrt(2*pi) | |
def normpdf(x, mu=0.0, sigma=1.0): | |
u = (x-mu)/abs(sigma) | |
y = exp(-u*u/2.0)/(_norm_pdf_C*abs(sigma)) | |
return y | |
def model_str(p): | |
return "mu_1:{u1:.3f},sig_1:{s1:.3f},pi_1:{p1:.3f}\tmu_2:{u2:.3f},sig_2:{s2:.3f},pi_2:{p2:.3f}".format(u1=p[0],s1=p[1],p1=p[4],u2=p[2],s2=p[3],p2=1-p[4]) | |
def pdf_model(x, p): | |
mu1, sig1, mu2, sig2, pi_1 = p | |
return pi_1*normpdf(x, mu1, sig1) + (1-pi_1)*normpdf(x, mu2, sig2) | |
def log_likelihood_two_1d_gauss(p, sample): | |
return -log(pdf_model(sample, p)).sum() | |
def sim_two_gauss_mix(p, N=1000): | |
s1 = normal(p[0], p[1], size=N*p[4]) | |
s2 = normal(p[2], p[3], size=N*(1-p[4])) | |
return concatenate([s1,s2]) | |
def fit_two_peaks_EM(sample, sigma=None, weights=False, p0=array([0.2,0.2,0.7,0.2,0.5]), | |
max_iter=300, tollerance=1e-5): | |
if not weights: w = ones(sample.size) | |
else: w = 1./(sigma**2) | |
w *= 1.*w.size/w.sum() # renormalization so they sum to N | |
# Initial guess of parameters and initializations | |
mu = array([p0[0], p0[2]]) | |
sig = array([p0[1], p0[3]]) | |
pi_ = array([p0[4], 1-p0[4]]) | |
gamma, N_ = zeros((2, sample.size)), zeros(2) | |
p_new = array(p0) | |
N = sample.size | |
# EM loop | |
counter = 0 | |
converged, stop_iteration = False, False | |
while not stop_iteration: | |
p_old = p_new | |
# Compute the responsibility func. and new parameters | |
for k in [0,1]: | |
gamma[k,:] = w*pi_[k]*normpdf(sample, mu[k], sig[k])/pdf_model(sample, p_new) # SCHEME1 | |
#gamma[k,:] = pi_[k]*normpdf(sample, mu[k], sig[k])/pdf_model(sample, p_new) # SCHEME2 | |
N_[k] = gamma[k,:].sum() | |
mu[k] = sum(gamma[k]*sample)/N_[k] # SCHEME1 | |
#mu[k] = sum(w*gamma[k]*sample)/sum(w*gamma[k]) # SCHEME2 | |
sig[k] = sqrt( sum(gamma[k]*(sample-mu[k])**2)/N_[k] ) | |
pi_[k] = 1.*N_[k]/N | |
p_new = array([mu[0], sig[0], mu[1], sig[1], pi_[0]]) | |
assert abs(N_.sum() - N)/float(N) < 1e-6 | |
assert abs(pi_.sum() - 1) < 1e-6 | |
# Convergence check | |
counter += 1 | |
max_variation = max((p_new-p_old)/p_old) | |
converged = True if max_variation < tollerance else False | |
stop_iteration = converged or (counter >= max_iter) | |
#print "Iterations:", counter | |
if not converged: logging.warning("WARNING: Not converged") | |
return p_new | |
if __name__=="__main__": | |
import logging | |
logging.basicConfig(level=logging.WARNING) | |
p=[0.2,0.1,0.8,0.1,0.3] | |
s = sim_two_gauss_mix(N=1000, p=p) | |
print "data:",model_str(p) | |
model=fit_two_peaks_EM(s) | |
print "em :",model_str(model) |
# -*- coding: utf-8 -*- | |
# <nbformat>3.0</nbformat> | |
# <codecell> | |
from numpy import random | |
from scipy.optimize import minimize, show_options | |
from numpy.random import normal | |
from numpy import concatenate,array,zeros | |
from math import log,sqrt | |
# <codecell> | |
from scipy.stats import norm | |
N = 1000 | |
a = 0.3 | |
s1 = normal(0, 0.08, size=N*a) | |
s2 = normal(0.6,0.12, size=N*(1-a)) | |
s = concatenate([s1,s2]) | |
def pdf_model(x, p): | |
mu1, sig1, mu2, sig2, pi_1 = p | |
return pi_1*norm.pdf(x, mu1, sig1) + (1-pi_1)*norm.pdf(x, mu2, sig2) | |
def log_likelihood_two_1d_gauss(p, sample): | |
return -log(pdf_model(sample, p)).sum() | |
max_iter=100 | |
# Initial guess of parameters and initializations | |
p0 = array([-0.2,0.2,0.8,0.2,0.5]) | |
mu1, sig1, mu2, sig2, pi_1 = p0 | |
mu = array([mu1, mu2]) | |
sig = array([sig1, sig2]) | |
pi_ = array([pi_1, 1-pi_1]) | |
gamma = zeros((2, s.size)) | |
N_ = zeros(2) | |
p_new = p0 | |
# EM loop | |
counter = 0 | |
converged = False | |
while not converged: | |
# Compute the responsibility func. and new parameters | |
for k in [0,1]: | |
gamma[k,:] = pi_[k]*norm.pdf(s, mu[k], sig[k])/pdf_model(s, p_new) | |
N_[k] = 1.*gamma[k].sum() | |
mu[k] = sum(gamma[k]*s)/N_[k] | |
sig[k] = sqrt( sum(gamma[k]*(s-mu[k])**2)/N_[k] ) | |
pi_[k] = N_[k]/s.size | |
p_new = [mu[0], sig[0], mu[1], sig[1], pi_[0]] | |
assert abs(N_.sum() - N)/float(N) < 1e-6 | |
assert abs(pi_.sum() - 1) < 1e-6 | |
# Convergence check | |
counter += 1 | |
converged = counter >= max_iter | |
print "Means: %6.3f %6.3f" % (p_new[0], p_new[2]) | |
print "Std dev: %6.3f %6.3f" % (p_new[1], p_new[3]) | |
print "Mix (1): %6.3f " % p_new[4] | |