import matplotlib.pyplot as plt
import numpy as np
import pandas
import scipy.stats
data = np.loadtxt('data/Urn_data.txt', dtype=int)
data.shape
(50, 21)
data
array([[1, 2, 1, ..., 4, 0, 0], [1, 3, 1, ..., 0, 0, 4], [1, 2, 4, ..., 0, 4, 0], ..., [2, 2, 1, ..., 0, 0, 4], [2, 2, 0, ..., 3, 1, 0], [2, 1, 1, ..., 4, 0, 4]])
class BaseProposalGenerator(object):
"""Base MCMC proposal generator class."""
# Sublcasses should set this to a function that takes parameters
# and returns a measure of how well it fits the data (e.g. log
# likelihood).
_fit_metric_func = None
def __init__(self, data):
"""Construct a proposal generator.
Parameters
----------
data : array-like
The data to fit.
"""
self.data = data
def __call__(self, params_current):
"""Called to propose an MCMC step.
Given current parameters, returns `(params_proposed, pi_ratio, q_ratio)`.
"""
raise NotImplementedError
def plot_fit_metric(self, ax, chain, **kwargs):
"""Plot the fit metric along the chain."""
defaults = (('alpha', 0.5), ('s', 5), ('edgecolors', 'none'),
('c', np.linspace(0, 1, len(chain))))
for key, val in defaults:
if key not in kwargs:
kwargs[key] = val
x = np.arange(0, len(chain))
metric = np.array([self._fit_metric_func(params) for params in chain])
cax = ax.scatter(x, metric, **kwargs)
ax.set_xlim(0, len(chain))
return cax
class MCMC(object):
"""Implementation of Markov Chain Monte Carlo.
Attributes
----------
burned_in_index : int
Index at which the chain is considered to be burned in. Affects
the chain returned by the `get_chain` method. Defaults to 0 and
can be manually set to something else.
"""
def __init__(self, proposal_generator, params_start, param_names=None, initial_chain_length=10000):
"""Initialize an MCMC object.
Parameters
----------
proposal_generator : callable
A function that, when called with the current parameters, proposes the
next step by returning a tuple `(params_proposed, pi_ratio, q_ratio)`.
params_start : array-like
Initial parameters.
param_names : array-like, optional
Names for each of the parameters. If specified, this allows methods
that take an index for a parameter to accept its name instead of its
index.
initial_chain_length: int, optional
The initial length of the chain. The chain is automatically extended if
`run` is called with a number of steps that would extend past the end
of the chain.
"""
self._proposal_generator = proposal_generator
self._chain = np.zeros((initial_chain_length, len(params_start)), dtype=float)
self._chain[0, :] = params_start
self._steps_completed = 1
self.burned_in_index = 0
self._param_name_to_idx = None
self.num_params = len(params_start)
self.param_names = param_names
if param_names is not None:
assert len(param_names) == len(params_start)
self._param_name_to_idx = dict(zip(param_names, range(len(param_names))))
def run(self, num_steps=None):
"""Run the chain for multiple steps.
Parameters
----------
num_steps : int, optional
The number of steps to run the chain. If not specified, the chain is
run up to the size of the currently allocated chain matrix (initially
set in the constructor).
Returns
-------
MCMC
Returns `self`.
"""
remaining_chain_length = self._chain.shape[0] - self._steps_completed
if num_steps is None:
num_steps = remaining_chain_length
else:
shape = self._chain.shape
add = num_steps - remaining_chain_length
new_chain = np.zeros((shape[0] + add, shape[1]), dtype=self._chain.dtype)
new_chain[:shape[0], :] = self._chain
self._chain = new_chain
while num_steps > 0:
self._chain[self._steps_completed, :] = self._step(self._chain[self._steps_completed - 1, :])
self._steps_completed += 1
num_steps -= 1
return self
def get_chain(self, from_beginning=False):
"""Get a view of the chain.
Returns a view of the chain starting at `self.burned_in_index` by default
and ending at the last step that has been completed.
Parameters
----------
from_beginning : bool, optional
If True, returns the chain from the beginning (ignoring `self.burned_in_index`).
"""
return self._chain[self.burned_in_index:self._steps_completed, :]
def hist(self, ax, param, **kwargs):
"""Histogram of a single parameter."""
defaults = (('bins', 100),)
for key, val in defaults:
if key not in kwargs:
kwargs[key] = val
param = self._params_to_idx(param)
chain = self.get_chain()
return ax.hist(chain[:, param], **kwargs)
def scatter(self, ax, param_1, param_2, **kwargs):
"""Add a scatter plot to the axis provided.
Gets the chain from `self.get_chain` and plots a scatter plot of two params.
Parameters
----------
ax : Matplotlib axis
The axis to plot to.
param_1 : string, int
The name or index of the first parameter.
param_2 : string, int
The name or index of the second parameter.
"""
param_1, param_2 = self._params_to_idx(param_1, param_2)
chain = self.get_chain()
defaults = (('alpha', 0.5), ('s', 5), ('marker', 'o'), ('edgecolors', 'none'),
('c', np.linspace(0, 1, len(chain))))
for key, val in defaults:
if key not in kwargs:
kwargs[key] = val
cax = ax.scatter(chain[:, param_1], chain[:, param_2], **kwargs)
cmap = cax.get_cmap()
ax.scatter(chain[0, param_1], chain[0, param_2], label='Start',
s=400, color=cmap(0.0), marker='*', edgecolors='black')
ax.scatter(chain[-1, param_1], chain[-1, param_2],
label='Finish', s=400, color=cmap(1.0), marker='*',
edgecolors='black')
return cax
def scatter_matrix(self, params=None, **kwargs):
"""Scatter plot matrix of the parameter values."""
if params is None:
params = self.param_names
if params is None:
params = range(self.num_params)
fig, axs = plt.subplots(len(params), len(params), figsize=(16, 16))
fig.subplots_adjust(hspace=0.01, wspace=0.01)
for i, p1 in enumerate(params):
for j, p2 in enumerate(params):
ax = axs[i][j]
ax.xaxis.set_visible(False)
ax.yaxis.set_visible(False)
if i == j:
ax.annotate(params[i], (0.5, 0.5), xycoords='axes fraction',
ha='center', va='center')
elif i != j:
cax = self.scatter(ax, p1, p2, **kwargs)
cbar = fig.colorbar(cax, ax=axs[0][-1], ticks=(0, 1))
cbar.set_ticklabels((0, len(self.get_chain())))
return fig
def _step(self, params_current):
# Takes a single step.
params_proposed, pi_ratio, q_ratio = self._proposal_generator(params_current)
alpha = np.min([1.0, pi_ratio * q_ratio])
if np.random.random() < alpha:
return params_proposed
return params_current
def _params_to_idx(self, *params):
# Converts params (specified as strings or indices) to indices.
idxs = []
for param in params:
if isinstance(param, basestring):
# Assume it's a parameter's name; map it to its index.
idxs.append(self._param_name_to_idx[param])
else:
idxs.append(param)
return tuple(idxs)
def param_hists(mcmc, colors=None, **kwargs):
"""Grid of histograms, one for each parameter."""
if mcmc.param_names is not None:
param_names = mcmc.param_names
else:
param_names = ['Param {0}'.format(i) for i in xrange(mcmc.num_params)]
num_params = len(param_names)
chain = mcmc.get_chain()
nrows = int(np.ceil(num_params / 3.0))
ncols = 3
fig, ax = plt.subplots(nrows, ncols, figsize=(16, 4 * nrows))
for i, param in enumerate(param_names):
row = i // ncols
col = i % ncols
if colors is None:
color = 'blue'
else:
color = colors[i]
mcmc.hist(ax[row][col], param, color=color, normed=True, **kwargs)
mean, sd = np.mean(chain[:, i]), np.std(chain[:, i])
title = '{0}: {1:.4f} +/- {2:.4f}'.format(param, mean, sd)
ax[row][col].set_title(title)
return fig
BALL_COUNTS = (2, 4, 7, 3, 5)
def draw_probability(balls_drawn, ball_weights):
"""Probability of drawing the balls in `balls_drawn` in exactly that order."""
balls_drawn = np.array(balls_drawn, dtype=int)
probs = []
# Start with all the balls in the urn.
ball_counts = np.array(BALL_COUNTS)
# Go through each of the draws and compute the probability
# of the ball we observed, then multiply these probabilities
# together to get the probability of seeing exactly this
# sequence of draws.
for draw in balls_drawn:
draw_probs = ball_weights * ball_counts
draw_probs = draw_probs / draw_probs.sum()
prob = draw_probs[draw]
probs.append(prob)
if prob == 0.0:
return 0.0
ball_counts[draw] -= 1
return np.product(probs)
def log_likelihood(data, ball_weights):
"""Compute the log likelihood of all the experiments given the ball weights."""
return np.sum(np.log([draw_probability(balls_drawn, ball_weights) for balls_drawn in data]))
# Sanity check.
log_likelihood(data, [0.2]*5)
-1320.2262657246752
class ProposalGenerator(BaseProposalGenerator):
def __init__(self, data):
self.data = data
self._fit_metric_func = self.log_likelihood
self._ll_cache = {}
def __call__(self, params_current):
# Generate a new proposal.
lognorm = np.random.lognormal(mean=[0]*5, sigma=0.01)
params_proposed = params_current * lognorm
params_proposed = params_proposed / params_proposed.sum()
pi_ratio = np.exp(log_likelihood(data, params_proposed) - log_likelihood(data, params_current))
q_ratio = np.product(params_proposed / params_current)
return params_proposed, pi_ratio, q_ratio
def log_likelihood(self, params):
# Retrieve from cache if we're already computed it.
hash_val = hash('/'.join(str(v) for v in params))
cached = self._ll_cache.get(hash_val, None)
if cached is not None:
return cached
ll = log_likelihood(self.data, params)
self._ll_cache[hash_val] = ll
return ll
Run 50,000 MCMC steps.
proposer = ProposalGenerator(data)
ball_colors = ('red', 'blue', 'orange', 'purple', 'green')
np.random.seed(42)
chain_start = [.2] * 5
mcmc = MCMC(proposer, chain_start, param_names=ball_colors, initial_chain_length=50000)
%time mcmc.run()
CPU times: user 41min 33s, sys: 300 ms, total: 41min 33s Wall time: 41min 35s
<__main__.MCMC at 0xba4474c>
Plot the log likelihood along the chain:
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 6))
proposer.plot_fit_metric(ax1, mcmc.get_chain()[:5000])
ax1.set_title('Log likelihood (first 5,000 steps)')
proposer.plot_fit_metric(ax2, mcmc.get_chain())
ax2.set_title('Log likelihood (full chain)')
<matplotlib.text.Text at 0xc42890c>
Plot the parameter values along the chain:
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 6))
chain = mcmc.get_chain()
for i, color in enumerate(ball_colors):
ax1.plot(chain[:5000, i], color=color)
ax1.set_title('Parameters (first 5,000 steps)')
for i, color in enumerate(ball_colors):
ax2.plot(chain[:, i], color=color)
ax2.set_title('Parameters (full chain)')
<matplotlib.text.Text at 0x1096974c>
The chain seems to be well mixed by step 5000, so we throw away the first 5000 steps and look at the remaining 45,000.
Here are the distributions of each ball weight shown with their means and standard deviations:
mcmc.burned_in_index = 5000
chain = mcmc.get_chain()
_ = param_hists(mcmc, ball_colors, bins=50)
Looking at the parameter values at that step of the chain with the highest log likelihood:
best_idx = np.argmax([proposer.log_likelihood(params) for params in chain])
print 'Highest log likelihood at step', best_idx
best_params = chain[best_idx]
print 'Best parameters:'
zip(ball_colors, best_params)
Highest log likelihood at step 10779 Best parameters:
[('red', 0.078313429295680748), ('blue', 0.25249125858070098), ('orange', 0.34019285745954869), ('purple', 0.20144517259116607), ('green', 0.1275572820729034)]
Plotting the distribution of all of the ball weights on the same axis:
fig, ax = plt.subplots()
for color in ball_colors:
mcmc.hist(ax, color, histtype='step', color=color, bins=25, normed=True)
ax.set_title('Ball Weights')
<matplotlib.text.Text at 0x10d1476c>
Finally, even though it's not very informative on this problem, here is a scatter plot matrix showing the marginal distributions for each pair of variables. Each point is colored by the number of the step on the chain. Each pair of parameters looks a bit negatively correlated which makes sense because the ball weights must sum to one in our implementation (when one goes up, another one must go down).
_ = mcmc.scatter_matrix(alpha=0.3)