Skip to content

Instantly share code, notes, and snippets.

@nimezhu
Last active August 29, 2015 14:00
Show Gist options
  • Save nimezhu/11404870 to your computer and use it in GitHub Desktop.
Save nimezhu/11404870 to your computer and use it in GitHub Desktop.
GMM EM Algorithm

code was downloaded from this website and revised for run it in command line.

It is a simple EM algorithm for two Gaussian Mixture Model.

# -*- coding: utf-8 -*-
from numpy import random
from numpy.random import normal
from numpy import concatenate,array,zeros,r_,ones
from math import log,sqrt
from scipy.stats import norm
def model_str(p):
return "mu_1:{u1:.3f},sig_1:{s1:.3f},pi_1:{p1:.3f}\tmu_2:{u2:.3f},sig_2:{s2:.3f},pi_2:{p2:.3f}".format(u1=p[0],s1=p[1],p1=p[4],u2=p[2],s2=p[3],p2=1-p[4])
def pdf_model(x, p):
mu1, sig1, mu2, sig2, pi_1 = p
return pi_1*norm.pdf(x, mu1, sig1) + (1-pi_1)*norm.pdf(x, mu2, sig2)
def log_likelihood_two_1d_gauss(p, sample):
return -log(pdf_model(sample, p)).sum()
def sim_two_gauss_mix(p, N=1000):
s1 = normal(p[0], p[1], size=N*p[4])
s2 = normal(p[2], p[3], size=N*(1-p[4]))
return concatenate([s1,s2])
def fit_two_peaks_EM(sample, sigma=None, weights=False, p0=array([0.2,0.2,0.7,0.2,0.5]),
max_iter=300, tollerance=1e-5):
if not weights: w = ones(sample.size)
else: w = 1./(sigma**2)
w *= 1.*w.size/w.sum() # renormalization so they sum to N
# Initial guess of parameters and initializations
mu = array([p0[0], p0[2]])
sig = array([p0[1], p0[3]])
pi_ = array([p0[4], 1-p0[4]])
gamma, N_ = zeros((2, sample.size)), zeros(2)
p_new = array(p0)
N = sample.size
# EM loop
counter = 0
converged, stop_iteration = False, False
while not stop_iteration:
p_old = p_new
# Compute the responsibility func. and new parameters
for k in [0,1]:
gamma[k,:] = w*pi_[k]*norm.pdf(sample, mu[k], sig[k])/pdf_model(sample, p_new) # SCHEME1
#gamma[k,:] = pi_[k]*norm.pdf(sample, mu[k], sig[k])/pdf_model(sample, p_new) # SCHEME2
N_[k] = gamma[k,:].sum()
mu[k] = sum(gamma[k]*sample)/N_[k] # SCHEME1
#mu[k] = sum(w*gamma[k]*sample)/sum(w*gamma[k]) # SCHEME2
sig[k] = sqrt( sum(gamma[k]*(sample-mu[k])**2)/N_[k] )
pi_[k] = 1.*N_[k]/N
p_new = array([mu[0], sig[0], mu[1], sig[1], pi_[0]])
assert abs(N_.sum() - N)/float(N) < 1e-6
assert abs(pi_.sum() - 1) < 1e-6
# Convergence check
counter += 1
max_variation = max((p_new-p_old)/p_old)
converged = True if max_variation < tollerance else False
stop_iteration = converged or (counter >= max_iter)
#print "Iterations:", counter
if not converged: logging.warning("WARNING: Not converged")
return p_new
if __name__=="__main__":
import logging
logging.basicConfig(level=logging.WARNING)
p=[0.2,0.2,0.8,0.3,0.3]
s = sim_two_gauss_mix(N=1000, p=p)
print "data:",model_str(p)
model=fit_two_peaks_EM(s)
print "em :",model_str(model)
# -*- coding: utf-8 -*-
from numpy import random
from numpy.random import normal
from numpy import concatenate,array,zeros,r_,ones,log,sqrt,pi,exp,abs
import sys
import math
_norm_pdf_C = math.sqrt(2*pi)
def normpdf(x, mu=0.0, sigma=1.0):
u = (x-mu)/abs(sigma)
y = exp(-u*u/2.0)/(_norm_pdf_C*abs(sigma))
return y
def model_str(p):
return "mu_1:{u1:.3f},sig_1:{s1:.3f},pi_1:{p1:.3f}\tmu_2:{u2:.3f},sig_2:{s2:.3f},pi_2:{p2:.3f}".format(u1=p[0],s1=p[1],p1=p[4],u2=p[2],s2=p[3],p2=1-p[4])
def pdf_model(x, p):
mu1, sig1, mu2, sig2, pi_1 = p
return pi_1*normpdf(x, mu1, sig1) + (1-pi_1)*normpdf(x, mu2, sig2)
def log_likelihood_two_1d_gauss(p, sample):
return -log(pdf_model(sample, p)).sum()
def sim_two_gauss_mix(p, N=1000):
s1 = normal(p[0], p[1], size=N*p[4])
s2 = normal(p[2], p[3], size=N*(1-p[4]))
return concatenate([s1,s2])
def fit_two_peaks_EM(sample, sigma=None, weights=False, p0=array([0.2,0.2,0.7,0.2,0.5]),
max_iter=300, tollerance=1e-5):
if not weights: w = ones(sample.size)
else: w = 1./(sigma**2)
w *= 1.*w.size/w.sum() # renormalization so they sum to N
# Initial guess of parameters and initializations
mu = array([p0[0], p0[2]])
sig = array([p0[1], p0[3]])
pi_ = array([p0[4], 1-p0[4]])
gamma, N_ = zeros((2, sample.size)), zeros(2)
p_new = array(p0)
N = sample.size
# EM loop
counter = 0
converged, stop_iteration = False, False
while not stop_iteration:
p_old = p_new
# Compute the responsibility func. and new parameters
for k in [0,1]:
gamma[k,:] = w*pi_[k]*normpdf(sample, mu[k], sig[k])/pdf_model(sample, p_new) # SCHEME1
#gamma[k,:] = pi_[k]*normpdf(sample, mu[k], sig[k])/pdf_model(sample, p_new) # SCHEME2
N_[k] = gamma[k,:].sum()
mu[k] = sum(gamma[k]*sample)/N_[k] # SCHEME1
#mu[k] = sum(w*gamma[k]*sample)/sum(w*gamma[k]) # SCHEME2
sig[k] = sqrt( sum(gamma[k]*(sample-mu[k])**2)/N_[k] )
pi_[k] = 1.*N_[k]/N
p_new = array([mu[0], sig[0], mu[1], sig[1], pi_[0]])
assert abs(N_.sum() - N)/float(N) < 1e-6
assert abs(pi_.sum() - 1) < 1e-6
# Convergence check
counter += 1
max_variation = max((p_new-p_old)/p_old)
converged = True if max_variation < tollerance else False
stop_iteration = converged or (counter >= max_iter)
#print "Iterations:", counter
if not converged: logging.warning("WARNING: Not converged")
return p_new
if __name__=="__main__":
import logging
logging.basicConfig(level=logging.WARNING)
p=[0.2,0.1,0.8,0.1,0.3]
s = sim_two_gauss_mix(N=1000, p=p)
print "data:",model_str(p)
model=fit_two_peaks_EM(s)
print "em :",model_str(model)
# -*- coding: utf-8 -*-
# <nbformat>3.0</nbformat>
# <codecell>
from numpy import random
from scipy.optimize import minimize, show_options
from numpy.random import normal
from numpy import concatenate,array,zeros
from math import log,sqrt
# <codecell>
from scipy.stats import norm
N = 1000
a = 0.3
s1 = normal(0, 0.08, size=N*a)
s2 = normal(0.6,0.12, size=N*(1-a))
s = concatenate([s1,s2])
def pdf_model(x, p):
mu1, sig1, mu2, sig2, pi_1 = p
return pi_1*norm.pdf(x, mu1, sig1) + (1-pi_1)*norm.pdf(x, mu2, sig2)
def log_likelihood_two_1d_gauss(p, sample):
return -log(pdf_model(sample, p)).sum()
max_iter=100
# Initial guess of parameters and initializations
p0 = array([-0.2,0.2,0.8,0.2,0.5])
mu1, sig1, mu2, sig2, pi_1 = p0
mu = array([mu1, mu2])
sig = array([sig1, sig2])
pi_ = array([pi_1, 1-pi_1])
gamma = zeros((2, s.size))
N_ = zeros(2)
p_new = p0
# EM loop
counter = 0
converged = False
while not converged:
# Compute the responsibility func. and new parameters
for k in [0,1]:
gamma[k,:] = pi_[k]*norm.pdf(s, mu[k], sig[k])/pdf_model(s, p_new)
N_[k] = 1.*gamma[k].sum()
mu[k] = sum(gamma[k]*s)/N_[k]
sig[k] = sqrt( sum(gamma[k]*(s-mu[k])**2)/N_[k] )
pi_[k] = N_[k]/s.size
p_new = [mu[0], sig[0], mu[1], sig[1], pi_[0]]
assert abs(N_.sum() - N)/float(N) < 1e-6
assert abs(pi_.sum() - 1) < 1e-6
# Convergence check
counter += 1
converged = counter >= max_iter
print "Means: %6.3f %6.3f" % (p_new[0], p_new[2])
print "Std dev: %6.3f %6.3f" % (p_new[1], p_new[3])
print "Mix (1): %6.3f " % p_new[4]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment