import numpy as np import math from matplotlib import pyplot as plt import scipy.integrate as sciInt import scipy.optimize as sciOpt #select truth data values, these will NOT be known to the training algorithm, they #are only used in generating the sample data tu1 =-2.0 tu2 = 2.0 tSigma = 2.0 tGamma = 0.5 #the class conditional probability p(x|Ck) def p_xCk(x, mu, sigma): denom = math.sqrt(2.0 * math.pi * sigma) arg = -0.5 * (x - mu) * (x - mu) / sigma return math.exp(arg) / denom #the marginal probability p(x) def p_x(x, mu1, mu2, sigma, gamma): return gamma * p_xCk(x, mu1, sigma) + (1.0 - gamma) * p_xCk(x, mu2, sigma) #posterior class probability vector (p(C_1|x), p(C_2|x)) def p_Ckx(x, mu1, mu2, sigma, gamma): a = math.log(p_xCk(x, mu1, sigma)*gamma/(p_xCk(x,mu2,sigma)*(1-gamma))) pc1 = 1.0/(1.0 + math.exp(-a)) return (pc1, 1.0 - pc1) domain = range(0,100,1) domain = [(-5.0 + x/10.0) for x in domain] f, axarr = plt.subplots(2,2) f.subplots_adjust(right=1.5) f.subplots_adjust(top=1.5) #plot p(x|Ck) pxC1 = [p_xCk(x,tu1,tSigma) for x in domain] pxC2 = [p_xCk(x,tu2,tSigma) for x in domain] ax1 = axarr[0,0] ax1.plot(domain, pxC1) ax1.plot(domain, pxC2) ax1.set_ylabel('p(x|C_k)') #plot the marginal distribution p(x) px = [p_x(x, tu1, tu2, tSigma, tGamma) for x in domain] ax2 = axarr[0,1] ax2.plot(domain,px) ax2.set_ylabel('p(x)') #plot the posterior distributions p(C_1|x) & p(C_2|x) pc1x = [] pc2x = [] for x in domain: pck = p_Ckx(x, tu1, tu2, tSigma, tGamma) pc1x.append(pck[0]) pc2x.append(pck[1]) ax3 = axarr[1,0] ax3.plot(domain, pc1x) ax3.plot(domain, pc2x) ax3.set_ylabel('p(C_k|x)') #the cumulative distribution function P(x pcx2: t[i] = 1 n1 = n1 + 1 else: t[i]=0 #plot the simulated data ax4 = axarr[1,1] ax4.scatter(x, t) ax4.axvline(0) ax4.set_ylabel('class') plt.show() #compute the paratmer estimates using the synthetic training data n2 = num_samples - n1 eGamma = n1 / float(num_samples) zipped = zip(x,t) eu1 = reduce(lambda accum, Z: accum + Z[0]*Z[1], zipped, 0) / float(n1) eu2 = reduce(lambda accum, Z: accum + Z[0]*(1-Z[1]), zipped, 0) / float(n2) def Sn(Z, ck, mu=(eu1,eu2)): idx = 1-int(ck) return (Z[0]-mu[idx])*(Z[0]-mu[idx]) eSigma = reduce(lambda accum, Z: accum + Sn(Z, Z[1]), zipped, 0) / float(num_samples) print 'The number of inputs in C1 is {0}'.format(n1) print 'The number of inputs that were assigned based on the assignment factor is {0}'.format(nae) print 'The true model parameters are gamma={0}, u1={1}, u2={2}, Sigma={3}'.format(tGamma, tu1, tu2, tSigma) print 'The estimated model parameters are gamma={0}, u1={1}, u2={2}, Sigma={3}'.format(eGamma, eu1, eu2, eSigma) import numpy as np from matplotlib import pyplot as plt seed = 123456789 np.random.seed(seed) N = 100 #number of data points, SEE WHAT HAPPENS AS YOU INCREASE THIS TO SAY 200 D = 2 #dimension of input vector t = np.zeros(N) #training set classifications X = np.zeros((N,D)) #training data in input space sigma = .25 mu0 = 0.0 mu1 = 1.0 #func for generating 2D class scatter plot def createScatter(X, t, ax): C1x = [] C1y = [] C2x = [] C2y = [] for i in range(len(t)): if t[i] > 0: C1x.append(X[i,0]) C1y.append(X[i,1]) else: C2x.append(X[i,0]) C2y.append(X[i,1]) ax.scatter(C1x,C1y) ax.scatter(C2x,C2y, color='r') #Generate test data. (NOTE THIS IS NOT BASED ON A GENERATIVE APPROACH. I AM PICKING SOMETHING THAT I KNOW (OK, THINK) WILL WORK. IN PRACTICE THIS DATA WOULD BE OBTAINED VIA OBSERVATION) #Pick a value from a uniform distribution [0,1). If it is less than 0.5, assign class 1 and pick x1,x2 from a ND(mu0,sigma) otherwise assign class 2 and pick x1,x2 from ND(mu1,sigma) for i in range(N): #choose class to sample for fac = 1 if np.random.rand() <= 0.5: thismu = mu0 t[i] = 1 else: thismu = mu1 t[i] = 0 if np.random.rand() < 0.5: fac = -1 X[i,0] = fac * np.random.normal(thismu, sigma) X[i,1] = fac * np.random.normal(thismu, sigma) f, axarr = plt.subplots(1,3) f.subplots_adjust(right=2.5) f.text(0.75,0.975,'Fixed Input Transformation Yields Linear Class Boundary',horizontalalignment='center',verticalalignment='top') ax1 = axarr[0] ax1.set_xlabel('x1') ax1.set_ylabel('x2') createScatter(X,t,ax1) #The training data does not have a linear boundary in the original input space. So lets apply a tranformation, phi to try to make it linearly seperable #NOTE: This transformation is not the only one that works. For example try switching the values of MU1 and MU2. The result will be a different mapping #that is still linearly seperable def phi(x,mu,sigma): detSigma = np.linalg.det(sigma) fac = math.pow(2*math.pi, len(mu)/2.0) * math.sqrt(detSigma) arg = -0.5 * np.dot((x-mu).T, np.dot(np.linalg.inv(sigma), x-mu) ) return math.exp(arg) / fac phiX = np.zeros((N,D)) MU1 = np.ones(D)*mu0 MU2 = np.ones(D)*mu1 SIGMA = np.diag(np.ones(D))*sigma for i in range(N): phiX[i,0] = phi(x=X[i,:], mu=MU2, sigma=SIGMA) phiX[i,1] = phi(x=X[i,:], mu=MU1, sigma=SIGMA) ax2 = axarr[1] ax2.set_xlabel('phi1(x)') ax2.set_ylabel('phi2(x)') createScatter(phiX, t, ax2) #Now lets apply machine learning to determine the boundary. We will assume M = 3, i.e. that there are 3 free parameters that is w = [w0, w1, w2]^T and phi_n = [1, phiX[0], phiX[1]] M = 3 Phi = np.ones((N,M)) Phi[:,1] = phiX[:,0] Phi[:,2] = phiX[:,1] w = np.zeros(M) R = np.zeros((N,N)) y = np.zeros(N) def sigmoid(a): return 1.0 / (1.0 + math.exp(-a)) def totalErr(y,t): e = 0.0 for i in range(len(y)): if t[i] > 0: e += math.log(y[i]) else: e += math.log(1.0 - y[i]) return -e #start Newton-Raphson. As a stopping criteria we will use a tolerance on the change in the error function and a max number of iterations max_its = 100 tol = 1e-2 w0 = [w[0]] w1 = [w[1]] w2 = [w[2]] err = [] error_delta = 1 + tol current_error = 0 idx = 0 while math.fabs(error_delta)>tol and idx < max_its: #update y & R for i in range(N): zipped = zip(w, Phi[i,:]) y[i] = sigmoid(reduce(lambda accum, Z: accum + Z[0]*Z[1], zipped, 0)) R[i,i] = y[i] - y[i]*y[i] #update w z = np.dot(Phi,w) - np.dot(np.linalg.pinv(R),y-t) temp = np.linalg.pinv(np.dot(np.dot(Phi.T,R),Phi)) temp2 = np.dot(np.dot(temp, Phi.T),R) w = np.dot(temp2, z) w0.append(w[0]) w1.append(w[1]) w2.append(w[2]) idx += 1 temp = totalErr(y,t) error_delta = current_error - temp current_error = temp err.append(error_delta) print 'The total number of iterations was {0}'.format(idx) print 'The total error was {0}'.format(current_error) print 'The final change in error was {0}'.format(error_delta) print 'The final parameters were {0}'.format(w) #our decision boundary is now formed by the line where sigma(a) = 0.5, i.e. where a = 0, which for this example is where phi2 = -(w1/w2)phi1, i.e. where w * phi = .5 bdryx = (0,1) bdryy = (-(w[0]+w[1]*bdryx[0])/w[2], -(w[0]+w[1]*bdryx[1])/w[2]) ax2.plot(bdryx, bdryy) ax3 = axarr[2] ax3.plot(w0, color = 'blue') ax3.plot(w1, color = 'red') ax3.plot(w2, color = 'green') ax3.plot(err, color = 'black') ax3.legend(("w0","w1","w2", "error delta"), loc='upper left') ax3.set_xlabel('Newton-Raphson Iteration')