import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import pandas
import scipy
import scipy.stats
np.random.seed(13)
class BaseProposalGenerator(object):
"""Base MCMC proposal generator class."""
# Sublcasses should set this to a function that takes parameters
# and returns a measure of how well it fits the data (e.g. log
# likelihood).
_fit_metric_func = None
def __init__(self, data):
"""Construct a proposal generator.
Parameters
----------
data : array-like
The data to fit.
"""
self.data = data
def __call__(self, params_current):
"""Called to propose an MCMC step.
Given current parameters, returns `(params_proposed, pi_ratio, q_ratio)`.
"""
raise NotImplementedError
def plot_fit(self, ax, params):
"""Plot the fit given by the params."""
raise NotImplementedError
def plot_fit_metric(self, ax, chain, **kwargs):
"""Plot the fit metric along the chain."""
defaults = (('alpha', 0.5), ('s', 5), ('edgecolors', 'none'),
('c', np.linspace(0, 1, len(chain))))
for key, val in defaults:
if key not in kwargs:
kwargs[key] = val
x = np.arange(0, len(chain))
metric = np.array([self._fit_metric_func(params) for params in chain])
cax = ax.scatter(x, metric, **kwargs)
ax.set_xlim(0, len(chain))
return cax
class MCMC(object):
"""Implementation of Markov Chain Monte Carlo.
Attributes
----------
burned_in_index : int
Index at which the chain is considered to be burned in. Affects
the chain returned by the `get_chain` method. Defaults to 0 and
can be manually set to something else.
"""
def __init__(self, proposal_generator, params_start, param_names=None, initial_chain_length=10000):
"""Initialize an MCMC object.
Parameters
----------
proposal_generator : callable
A function that, when called with the current parameters, proposes the
next step by returning a tuple `(params_proposed, pi_ratio, q_ratio)`.
params_start : array-like
Initial parameters.
param_names : array-like, optional
Names for each of the parameters. If specified, this allows methods
that take an index for a parameter to accept its name instead of its
index.
initial_chain_length: int, optional
The initial length of the chain. The chain is automatically extended if
`run` is called with a number of steps that would extend past the end
of the chain.
"""
self._proposal_generator = proposal_generator
self._chain = np.zeros((initial_chain_length, len(params_start)), dtype=float)
self._chain[0, :] = params_start
self._steps_completed = 1
self.burned_in_index = 0
self._param_name_to_idx = None
self.num_params = len(params_start)
self.param_names = param_names
if param_names is not None:
assert len(param_names) == len(params_start)
self._param_name_to_idx = dict(zip(param_names, range(len(param_names))))
def run(self, num_steps=None):
"""Run the chain for multiple steps.
Parameters
----------
num_steps : int, optional
The number of steps to run the chain. If not specified, the chain is
run up to the size of the currently allocated chain matrix (initially
set in the constructor).
Returns
-------
MCMC
Returns `self`.
"""
remaining_chain_length = self._chain.shape[0] - self._steps_completed
if num_steps is None:
num_steps = remaining_chain_length
else:
shape = self._chain.shape
add = num_steps - remaining_chain_length
new_chain = np.zeros((shape[0] + add, shape[1]), dtype=self._chain.dtype)
new_chain[:shape[0], :] = self._chain
self._chain = new_chain
while num_steps > 0:
self._chain[self._steps_completed, :] = self._step(self._chain[self._steps_completed - 1, :])
self._steps_completed += 1
num_steps -= 1
return self
def get_chain(self, from_beginning=False):
"""Get a view of the chain.
Returns a view of the chain starting at `self.burned_in_index` by default
and ending at the last step that has been completed.
Parameters
----------
from_beginning : bool, optional
If True, returns the chain from the beginning (ignoring `self.burned_in_index`).
"""
return self._chain[self.burned_in_index:self._steps_completed, :]
def _step(self, params_current):
# Takes a single step.
params_proposed, pi_ratio, q_ratio = self._proposal_generator(params_current)
alpha = np.min([1.0, pi_ratio * q_ratio])
if np.random.random() < alpha:
return params_proposed
return params_current
def _params_to_idx(self, *params):
# Converts params (specified as strings or indices) to indices.
idxs = []
for param in params:
if isinstance(param, basestring):
# Assume it's a parameter's name; map it to its index.
idxs.append(self._param_name_to_idx[param])
else:
idxs.append(param)
return tuple(idxs)
class ColoredBallsProposalGenerator(BaseProposalGenerator):
PARAMS = ('ball1', 'ball2', 'ball3', 'ball4', 'ball5', 'ball6')
BALL_TYPES = ('large-orange', 'medium-purple', 'small-green')
BALL_COUNTS = (7, 3, 5)
BALL_PROPORTIONS = (6, 4, 3)
def __init__(self):
# Deduce the ball weights from the initial counts and draw proportions.
self._fit_metric_func = self.draw_probability
draw1_probs = np.array(self.BALL_PROPORTIONS, dtype=np.double) / np.sum(self.BALL_PROPORTIONS)
weights = draw1_probs / np.sum(self.BALL_COUNTS)
self.ball_weights = weights / np.sum(weights)
# Index of the next parameter to update.
self._next_param = 0
def __call__(self, params_current):
draws = params_current.copy()
# Draw a new value for the parameter `self._next_param`.
# i.e. sample a new ball to pick from the urn at this step.
probs = np.zeros(len(self.BALL_TYPES), dtype=np.double)
for ball_type in xrange(len(self.BALL_TYPES)):
draws[self._next_param] = ball_type
p = self.draw_probability(draws)
probs[ball_type] = self.draw_probability(draws)
probs = probs / probs.sum()
sample = np.nonzero(np.random.multinomial(1, probs))[0][0]
draws[self._next_param] = sample
self._next_param = (self._next_param + 1) % len(self.PARAMS)
# Return 1.0 for pi_ratio and q_ratio since the proposal should
# always be accepted when doing Gibbs sampling.
return draws, 1.0, 1.0
def draw_probability(self, balls_drawn):
"""Probability of drawing the 6 balls in `balls_drawn` in exactly that order."""
balls_drawn = np.array(balls_drawn, dtype=int)
probs = []
# Start with all the balls in the urn.
ball_counts = np.array(self.BALL_COUNTS)
# Go through each of the 6 draws and compute the probability
# of the ball we observed, then multiply these probabilities
# together to get the probability of seeing exactly this
# sequence of draws.
for draw in balls_drawn:
draw_probs = self.ball_weights * ball_counts
draw_probs = draw_probs / draw_probs.sum()
prob = draw_probs[draw]
probs.append(prob)
if prob == 0.0:
return 0.0
ball_counts[draw] -= 1
return np.product(probs)
proposer = ColoredBallsProposalGenerator()
mcmc = MCMC(proposer, [0, 0, 1, 1, 2, 2], param_names=ColoredBallsProposalGenerator.PARAMS, initial_chain_length=100000)
%time mcmc.run()
CPU times: user 1min 41s, sys: 4 ms, total: 1min 41s Wall time: 1min 41s
<__main__.MCMC at 0xba7238c>
Each step on the chain gives a specific sequences of 6 balls types drawn from the urn. Here I plot the probability of the specific sequence at each sten on the chain:
fig, ax = plt.subplots()
proposer.plot_fit_metric(ax, mcmc.get_chain(), alpha=0.3)
ax.set_xlabel('Iteration')
ax.set_ylabel('Probability of draws')
<matplotlib.text.Text at 0xba841ac>
As we see above, the chain seems to mix very quickly and explore the same space over and over again. To be safe, I discard the first 20,000 iterations and compute metrics over the remaining results.
mcmc.burned_in_index = 20000
chain = np.array(mcmc.get_chain(), dtype=np.int)
(a) How often do you get 4 orange plus 2 of the same (non-orange) color?
orange_count = np.sum(chain == 0, axis=1)
purple_count = np.sum(chain == 1, axis=1)
green_count = np.sum(chain == 2, axis=1)
orange_4 = (orange_count == 4)
other_2 = (purple_count == 2) | (green_count == 2)
np.mean(orange_4 & other_2)
0.15208749999999999
(b) What is the expectation (mean) of the product of the number of purple and number of green balls drawn?
np.mean(purple_count * green_count)
1.3495625