import matplotlib.pyplot as plt
import numpy as np
import scipy.stats
# Experiment: `NUM_SETS` times, pick one of the `NUM_COINS` coins and
# flip it `TOSSES_PER_SET` times.
NUM_SETS = 5
TOSSES_PER_SET = 10
NUM_COINS = 2
# Data collected from experiment: Number of heads/tails in each set.
X_HEAD_COUNTS = np.array([5, 9, 8, 4, 7])
assert len(X_HEAD_COUNTS) == NUM_SETS
X_TAIL_COUNTS = TOSSES_PER_SET - X_HEAD_COUNTS
def log_likelihood(head_counts, assignments, theta):
"""Log likelihood of coin flips given coin assignments and theta."""
set_probs = theta[assignments]
ll = 0.0
for prob_heads, head_count in zip(set_probs, head_counts):
ll += np.log(scipy.stats.binom.pmf(head_count, TOSSES_PER_SET, prob_heads))
return ll
Define a function to do a single run of EM given an initial guess of the parameters, then call it multiple times with random initial guesses and keep the one that maximizes the [log] likelihood.
def _coin_flip_em(theta_guess, hard_assignments=False):
"""EM with an initial guess of the parameters."""
theta = np.asarray(theta_guess)
log_likelihoods = []
while True:
# E-step
coin_probs = np.zeros((NUM_SETS, NUM_COINS), dtype=float)
for set_idx in xrange(NUM_SETS):
num_heads = X_HEAD_COUNTS[set_idx]
num_tails = TOSSES_PER_SET - num_heads
for coin_idx in xrange(NUM_COINS):
p_heads = theta[coin_idx]
p_tails = 1.0 - p_heads
unnormalized_prob = (p_heads ** num_heads) * (p_tails ** num_tails)
coin_probs[set_idx, coin_idx] = unnormalized_prob
# Normalize the probabilities in each row.
if hard_assignments:
hard_probs = np.zeros_like(coin_probs)
hard_probs[coin_probs.argmax(axis=1) == 1, 1] = 1
hard_probs[:, 0] = 1 - hard_probs[:, 1]
coin_probs = hard_probs
else:
coin_probs = coin_probs / np.tile(coin_probs.sum(axis=1), (2, 1)).T
# M-step.
# Get the expected number of head and tail counts from each coin.
expected_head_counts = (coin_probs * np.tile(X_HEAD_COUNTS, (2, 1)).T).sum(axis=0)
expected_tail_counts = (coin_probs * np.tile(X_TAIL_COUNTS, (2, 1)).T).sum(axis=0)
# Re-estimate the probability of heads from each coin.
for coin_idx in xrange(len(theta)):
denom = expected_head_counts[coin_idx] + expected_tail_counts[coin_idx]
if denom != 0:
theta[coin_idx] = expected_head_counts[coin_idx] / denom
# Quit when converged.
log_likelihoods.append(log_likelihood(X_HEAD_COUNTS, coin_probs.argmax(axis=1), theta))
if len(log_likelihoods) >= 2 and abs(log_likelihoods[-1] - log_likelihoods[-2]) < 10e-10:
break
return theta, log_likelihoods
def coin_flip_em(hard_assignments=False, num_trials=100):
"""Do repeated EM trials and keep the best one."""
best_theta, best_ll, all_lls = None, None, []
for _ in xrange(num_trials):
theta_guess = np.random.random(NUM_COINS)
theta, lls = _coin_flip_em(theta_guess, hard_assignments=hard_assignments)
all_lls.append(lls[-1])
if best_ll is None or lls[-1] > best_ll:
best_theta = theta
best_ll = lls[-1]
return best_theta, best_ll, all_lls
print 'Best theta from hard assignments:', coin_flip_em(hard_assignments=True)[0]
print 'Best theta from soft assignments:', coin_flip_em(hard_assignments=False)[0]
Best theta from hard assignments: [ 0.45 0.8 ] Best theta from soft assignments: [ 0.51958312 0.79678907]