%pylab inline from numpy import random from scipy.optimize import minimize, show_options N = 1000 a = 0.3 s1 = normal(0, 0.08, size=N*a) s2 = normal(0.6,0.12, size=N*(1-a)) s = concatenate([s1,s2]) hist(s, bins=20); def pdf_model(x, p): mu1, sig1, mu2, sig2, pi_1 = p return pi_1*normpdf(x, mu1, sig1) + (1-pi_1)*normpdf(x, mu2, sig2) def log_likelihood_two_1d_gauss(p, sample): return -log(pdf_model(sample, p)).sum() # Initial guess p0 = array([-0.2,0.2,0.8,0.2,0.5]) # Minimization 1 res = minimize(log_likelihood_two_1d_gauss, x0=p0, args=(s,), method='BFGS') #res # NOT CONVERGED # Minimization 2 res = minimize(log_likelihood_two_1d_gauss, x0=p0, args=(s,), method='powell', options=dict(maxiter=10e3, maxfev=2e4)) #res # NOT CONVERGED # Minimization 3 res = minimize(log_likelihood_two_1d_gauss, x0=p0, args=(s,), method='Nelder-Mead', options=dict(maxiter=10e3, maxfev=2e4)) res res.x # Minimization 4 res = minimize(log_likelihood_two_1d_gauss, x0=p0, args=(s,), method='L-BFGS-B', bounds=[(-0.5,2),(0.01,0.5),(-0.5,2),(0.01,0.5),(0.01,0.99)]) res # Finds additional options for the different solvers: #show_options('minimize', 'powell') max_iter = 100 # Initial guess of parameters and initializations p0 = array([-0.2,0.2,0.8,0.2,0.5]) mu1, sig1, mu2, sig2, pi_1 = p0 mu = array([mu1, mu2]) sig = array([sig1, sig2]) pi_ = array([pi_1, 1-pi_1]) gamma = zeros((2, s.size)) N_ = zeros(2) p_new = p0 # EM loop counter = 0 converged = False while not converged: # Compute the responsibility func. and new parameters for k in [0,1]: gamma[k,:] = pi_[k]*normpdf(s, mu[k], sig[k])/pdf_model(s, p_new) N_[k] = 1.*gamma[k].sum() mu[k] = sum(gamma[k]*s)/N_[k] sig[k] = sqrt( sum(gamma[k]*(s-mu[k])**2)/N_[k] ) pi_[k] = N_[k]/s.size p_new = [mu[0], sig[0], mu[1], sig[1], pi_[0]] assert abs(N_.sum() - N)/float(N) < 1e-6 assert abs(pi_.sum() - 1) < 1e-6 # Convergence check counter += 1 converged = counter >= max_iter print "Means: %6.3f %6.3f" % (p_new[0], p_new[2]) print "Std dev: %6.3f %6.3f" % (p_new[1], p_new[3]) print "Mix (1): %6.3f " % p_new[4] print pi_.sum(), N_.sum() res.x from scipy.stats import expon def sim_single_population(mu, N=1000, max_sigma=0.5, mean_sigma=0.08): """Extract samples from a normal distribution with variance distributed as an exponetial distribution """ exp_min_size = 1./max_sigma**2 exp_mean_size = 1./mean_sigma**2 sigma = 1/sqrt(expon.rvs(loc=exp_min_size, scale=exp_mean_size, size=N)) return normal(mu, scale=sigma, size=N), sigma N = 1000 a = 0.3 s1, sig1 = sim_single_population(0, N=N*a) s2, sig2 = sim_single_population(0.5, N=N*(1-a)) s = concatenate([s1, s2]) sigma_tot = concatenate([sig1, sig2]) hist(s, bins=r_[-1:2:0.025], alpha=0.3, color='g', histtype='stepfilled'); ax = twinx(); ax.grid(False) ax.plot(s, 0.1/sigma_tot, 'o', mew=0, ms=2, alpha=0.6, color='b') xlim(-0.5, 1.5); title('Simulated sample (to be fitted)') print "Means: %6.3f %6.3f" % (s1.mean(), s2.mean()) print "Std dev: %6.3f %6.3f" % (sqrt((sig1**2).mean()), sqrt((sig2**2).mean())) print "Mix (1): %6.3f " % a max_iter = 300 weights = 1./sigma_tot**2 # Renormalizing the weights so they sum to N weights *= 1.*weights.size/weights.sum() # No weights case #weights = ones(s.size) # Initial guess of parameters and initializations p0 = array([-0.05,0.1,0.6,0.1,0.5]) mu1, sig1, mu2, sig2, pi_1 = p0 mu = array([mu1, mu2]) sig = array([sig1, sig2]) pi_ = array([pi_1, 1-pi_1]) gamma = zeros((2, s.size)) N_ = zeros(2) p_new = p0 # EM loop counter = 0 converged = False while not converged: # Compute the responsibility func. and new parameters for k in [0,1]: gamma[k,:] = weights*pi_[k]*normpdf(s, mu[k], sig[k])/pdf_model(s, p_new) # SCHEME1 #gamma[k,:] = pi_[k]*normpdf(s, mu[k], sig[k])/pdf_model(s, p_new) # SCHEME2 N_[k] = gamma[k,:].sum() mu[k] = sum(gamma[k]*s)/N_[k] # SCHEME1 #mu[k] = sum(weights*gamma[k]*s)/sum(weights*gamma[k]) # SCHEME2 sig[k] = sqrt( sum(gamma[k]*(s-mu[k])**2)/N_[k] ) pi_[k] = 1.*N_[k]/N p_new = [mu[0], sig[0], mu[1], sig[1], pi_[0]] assert abs(N_.sum() - N)/float(N) < 1e-6 assert abs(pi_.sum() - 1) < 1e-6 # Convergence check counter += 1 converged = counter >= max_iter print ">> NO WEIGHTS" print "Means: %6.3f %6.3f" % (p_new[0], p_new[2]) print "Std dev: %6.3f %6.3f" % (p_new[1], p_new[3]) print "Mix (1): %6.3f " % p_new[4] print ">> WEIGHTED SCHEME1" print "Means: %7.4f %7.4f" % (p_new[0], p_new[2]) print "Std dev: %7.4f %7.4f" % (p_new[1], p_new[3]) print "Mix (1): %7.4f " % p_new[4] print ">> WEIGHTED SCHEME2" print "Means: %6.3f %6.3f" % (p_new[0], p_new[2]) print "Std dev: %6.3f %6.3f" % (p_new[1], p_new[3]) print "Mix (1): %6.3f " % p_new[4] title('NO WEIGHTS') #hist(s, bins=r_[-1:2:0.05], normed=True); x = r_[-1:2:0.01] plot(x, pdf_model(x, p_new), color='k', lw=2); grid(True) plot(s, 0.1/sigma_tot, 'o', mew=0, ms=2, alpha=0.5); title('WEIGHTED SCHEME1') #hist(s, bins=r_[-1:2:0.05], normed=True); x = r_[-1:2:0.01] plot(x, pdf_model(x, p_new), color='k', lw=2); grid(True) plot(s, 0.1/sigma_tot, 'o', mew=0, ms=2, alpha=0.5); title('WEIGHTED SCHEME2') #hist(s, bins=r_[-1:2:0.05], normed=True); x = r_[-1:2:0.01] plot(x, pdf_model(x, p_new), color='k', lw=2); grid(True) plot(s, 0.1/sigma_tot, 'o', mew=0, ms=2, alpha=0.5); %pylab inline from scipy.stats import poisson, expon, binom from scipy.optimize import minimize, leastsq from scipy.special import erf def fit_two_peaks_EM(sample, sigma, weights=False, p0=array([0.1,0.2,0.6,0.2,0.5]), max_iter=300, tollerance=1e-3): if not weights: w = ones(sample.size) else: w = 1./(sigma**2) w *= 1.*w.size/w.sum() # renormalization so they sum to N # Initial guess of parameters and initializations mu = array([p0[0], p0[2]]) sig = array([p0[1], p0[3]]) pi_ = array([p0[4], 1-p0[4]]) gamma, N_ = zeros((2, sample.size)), zeros(2) p_new = array(p0) N = sample.size # EM loop counter = 0 converged, stop_iteration = False, False while not stop_iteration: p_old = p_new # Compute the responsibility func. and new parameters for k in [0,1]: gamma[k,:] = w*pi_[k]*normpdf(sample, mu[k], sig[k])/pdf_model(sample, p_new) # SCHEME1 #gamma[k,:] = pi_[k]*normpdf(sample, mu[k], sig[k])/pdf_model(sample, p_new) # SCHEME2 N_[k] = gamma[k,:].sum() mu[k] = sum(gamma[k]*sample)/N_[k] # SCHEME1 #mu[k] = sum(w*gamma[k]*sample)/sum(w*gamma[k]) # SCHEME2 sig[k] = sqrt( sum(gamma[k]*(sample-mu[k])**2)/N_[k] ) pi_[k] = 1.*N_[k]/N p_new = array([mu[0], sig[0], mu[1], sig[1], pi_[0]]) assert abs(N_.sum() - N)/float(N) < 1e-6 assert abs(pi_.sum() - 1) < 1e-6 # Convergence check counter += 1 max_variation = max((p_new-p_old)/p_old) converged = True if max_variation < tollerance else False stop_iteration = converged or (counter >= max_iter) #print "Iterations:", counter if not converged: print "WARNING: Not converged" return p_new def fit_two_gauss_mix_hist(s, bins=r_[-0.5:1.5:0.001], weights=None, p0=[0,0.1,0.5,0.1,0.5]): """Fit the (optionally weighted) histogram with a 2-Gaussian mix PDF. """ H = histogram(s, bins=bins, weights=weights, normed=True) x = .5*(H[1][1:]+H[1][:-1]) y = H[0] assert x.size == y.size err_func = lambda p, x_, y_: (y_ - pdf_model(x_, p)) res = leastsq(err_func, x0=p0, args=(x,y), full_output=1) #print res return array(res[0]) def fit_two_gauss_mix_cdf(s, p0=[0.2,1,0.8,1,0.3], weights=None): """Fit the sample s with two gaussians. """ ## Empirical CDF ecdf = [sort(s), arange(0.5,s.size+0.5)*1./s.size] ## CDF for a Gaussian distribution, and for a 2-Gaussian mix gauss_cdf = lambda x, mu, sigma: 0.5*(1+erf((x-mu)/(sqrt(2)*sigma))) two_gauss_cdf = lambda x, m1, s1, m2, s2, a:\ a*gauss_cdf(x,m1,s1)+(1-a)*gauss_cdf(x,m2,s2) ## Fitting the empirical CDF fit_func = lambda p, x: two_gauss_cdf(x, *p) err_func = lambda p, x, y: fit_func(p, x) - y p,v = leastsq(err_func, x0=p0, args=(ecdf[0],ecdf[1])) return array(p) def sim_two_gauss_mix(p, N=1000): s1 = normal(p[0], p[1], size=N*p[4]) s2 = normal(p[2], p[3], size=N*(1-p[4])) s = concatenate([s1,s2]) return s def test_accuracy(N_test=200, **kwargs): fit_kwargs = dict() if 'p0' in kwargs: fit_kwargs.update(p0=kwargs.pop('p0')) if 'max_iter' in kwargs: fit_kwargs.update(max_iter=kwargs.pop('max_iter')) pop_kwargs = dict() pop_kwargs.update(kwargs) P_em = zeros((N_test, 5)) P_h = zeros((N_test, 5)) P_cdf = zeros((N_test, 5)) for i_test in xrange(N_test): s = sim_two_gauss_mix(**pop_kwargs) P_em[i_test,:] = fit_two_peaks_EM(s, sigma=None, weights=False, **fit_kwargs) if 'max_iter' in fit_kwargs: fit_kwargs.pop('max_iter') P_h[i_test,:] = fit_two_gauss_mix_hist(s, bins=r_[-0.5:1.5:0.01], weights=None, **fit_kwargs) P_cdf[i_test,:] = fit_two_gauss_mix_cdf(s) return P_em, P_h, P_cdf def plot_accuracy(P_LIST, labels, name="Fit comparison", ip=0, bins=(r_[0.2:0.5:0.004]+0.002)): print "METHOD\t MEAN STD.DEV." for P, label in zip(P_LIST, labels): hist(P[:,ip], bins=bins, histtype='stepfilled', alpha=0.5, label=label); print "%s:\t %6.2f %6.2f" % (label, P[:,ip].mean()*100, P[:,ip].std()*100) legend(); grid(True); title(name); s = sim_two_gauss_mix(N=1000, p=[0,0.15,0.5,0.15,0.3]) hist(s, bins=r_[-0.4:1:0.05]); P_em, P_h, P_cdf = test_accuracy(N_test=2000, N=200, p=[0,0.15,0.5,0.15,0.3], p0=[0,0.1,0.6,0.1,0.5]) plot_accuracy([P_em, P_h, P_cdf], labels=['EM', 'hist', 'CDF'], ip=0, bins=(r_[-.2:.2:0.01])) plot_accuracy([P_em, P_h, P_cdf], labels=['EM', 'hist', 'CDF'], ip=2, bins=(r_[.4:.6:0.005])) sample_parameters = dict(N=200, p=[0,0.15,0.8,0.15,0.7]) s = sim_two_gauss_mix(**sample_parameters) hist(s, bins=r_[-0.4:1.4:0.05]); P_em, P_h, P_cdf = test_accuracy(N_test=2000, p0=[0,0.1,0.6,0.1,0.5], **sample_parameters) plot_accuracy([P_em, P_h, P_cdf], labels=['EM', 'hist', 'CDF'], ip=0, bins=(r_[-.1:.1:0.005])) plot_accuracy([P_em, P_h, P_cdf], labels=['EM', 'hist', 'CDF'], ip=2, bins=(r_[.7:.9:0.005])) from IPython.core.display import HTML def css_styling(): styles = open("./styles/custom.css", "r").read() return HTML(styles) css_styling()