In [1]:
import math
import numpy

class HMM(object):
''' Simple Hidden Markov Model implementation.  User provides
transition, emission and initial probabilities in dictionaries
mapping 2-character codes onto floating-point probabilities
for those table entries.  States and emissions are represented
with single characters.  Emission symbols comes from a finite.  '''

def __init__(self, A, E, I):
''' Initialize the HMM given transition, emission and initial
probability tables. '''

# put state labels to the set self.Q
self.Q, self.S = set(), set() # states and symbols
for a, prob in A.items():
# add all the symbols to the set self.S
for e, prob in E.items():
eq, es = e[0], e[1]

self.Q = sorted(list(self.Q))
self.S = sorted(list(self.S))

# create maps from state labels / emission symbols to integers
# that function as unique IDs
qmap, smap = {}, {}
for i in range(len(self.Q)): qmap[self.Q[i]] = i
for i in range(len(self.S)): smap[self.S[i]] = i
lenq = len(self.Q)

# create and populate transition probability matrix
self.A = numpy.zeros(shape=(lenq, lenq), dtype=float)
for a, prob in A.items():
# make A stochastic (i.e. make rows add to 1)
self.A /= self.A.sum(axis=1)[:, numpy.newaxis]

# create and populate emission probability matrix
self.E = numpy.zeros(shape=(lenq, len(self.S)), dtype=float)
for e, prob in E.items():
eq, es = e[0], e[1]
self.E[qmap[eq], smap[es]] = prob
# make E stochastic (i.e. make rows add to 1)
self.E /= self.E.sum(axis=1)[:, numpy.newaxis]

# initial probabilities
self.I = [ 0.0 ] * len(self.Q)
for a, prob in I.items():
self.I[qmap[a]] = prob
# make I stochastic (i.e. adds to 1)
self.I = numpy.divide(self.I, sum(self.I))

self.qmap, self.smap = qmap, smap

# Make log-base-2 versions for log-space functions
self.Alog = numpy.log2(self.A)
self.Elog = numpy.log2(self.E)
self.Ilog = numpy.log2(self.I)

def jointProb(self, p, x):
''' Return joint probability of path p and emission string x '''
p = list(map(self.qmap.get, p)) # turn state characters into ids
x = list(map(self.smap.get, x)) # turn emission characters into ids
for i in range(1, len(p)):
tot *= self.A[p[i-1], p[i]] # transition probability
for i in range(0, len(p)):
tot *= self.E[p[i], x[i]] # emission probability

def jointProbL(self, p, x):
''' Return log2 of joint probability of path p and emission
string x.  Just like self.jointProb(...) but log2 domain. '''
p = list(map(self.qmap.get, p)) # turn state characters into ids
x = list(map(self.smap.get, x)) # turn emission characters into ids
for i in range(1, len(p)):
tot += self.Alog[p[i-1], p[i]] # transition probability
for i in range(0, len(p)):
tot += self.Elog[p[i], x[i]] # emission probability

def viterbi(self, x):
''' Given sequence of emissions, return the most probable path
along with its probability. '''
x = list(map(self.smap.get, x)) # turn emission characters into ids
nrow, ncol = len(self.Q), len(x)
mat   = numpy.zeros(shape=(nrow, ncol), dtype=float) # prob
matTb = numpy.zeros(shape=(nrow, ncol), dtype=int)   # backtrace
# Fill in first column
for i in range(0, nrow):
mat[i, 0] = self.E[i, x[0]] * self.I[i]
# Fill in rest of prob and Tb tables
for j in range(1, ncol):
for i in range(0, nrow):
ep = self.E[i, x[j]]
mx, mxi = mat[0, j-1] * self.A[0, i] * ep, 0
for i2 in range(1, nrow):
pr = mat[i2, j-1] * self.A[i2, i] * ep
if pr > mx:
mx, mxi = pr, i2
mat[i, j], matTb[i, j] = mx, mxi
# Find final state with maximal probability
omx, omxi = mat[0, ncol-1], 0
for i in range(1, nrow):
if mat[i, ncol-1] > omx:
omx, omxi = mat[i, ncol-1], i
# Backtrace
i, p = omxi, [omxi]
for j in range(ncol-1, 0, -1):
i = matTb[i, j]
p.append(i)
p = ''.join(map(lambda x: self.Q[x], p[::-1]))
return omx, p # Return probability and path

def viterbiL(self, x):
''' Given sequence of emissions, return the most probable path
along with log2 of its probability.  Just like viterbi(...)
but in log2 domain. '''
x = list(map(self.smap.get, x)) # turn emission characters into ids
nrow, ncol = len(self.Q), len(x)
mat   = numpy.zeros(shape=(nrow, ncol), dtype=float) # prob
matTb = numpy.zeros(shape=(nrow, ncol), dtype=int)   # backtrace
# Fill in first column
for i in range(0, nrow):
mat[i, 0] = self.Elog[i, x[0]] + self.Ilog[i]
# Fill in rest of log prob and Tb tables
for j in range(1, ncol):
for i in range(0, nrow):
ep = self.Elog[i, x[j]]
mx, mxi = mat[0, j-1] + self.Alog[0, i] + ep, 0
for i2 in range(1, nrow):
pr = mat[i2, j-1] + self.Alog[i2, i] + ep
if pr > mx:
mx, mxi = pr, i2
mat[i, j], matTb[i, j] = mx, mxi
# Find final state with maximal log probability
omx, omxi = mat[0, ncol-1], 0
for i in range(1, nrow):
if mat[i, ncol-1] > omx:
omx, omxi = mat[i, ncol-1], i
# Backtrace
i, p = omxi, [omxi]
for j in range(ncol-1, 0, -1):
i = matTb[i, j]
p.append(i)
p = ''.join(map(lambda x: self.Q[x], p[::-1]))
return omx, p # Return log probability and path

In [2]:
# We experiment with joint probabilities first

hmm = HMM({"FF":0.9, "FL":0.1, "LF":0.1, "LL":0.9}, # transition matrix A
{"FH":0.5, "FT":0.5, "LH":0.75, "LT":0.25}, # emission matrix E
{"F":0.5, "L":0.5}) # initial probabilities I
jprob1 = hmm.jointProb("FFFLLLFFFFF", "THTHHHTHTTH")
myprob1 = (0.5 ** 9) * (0.75 ** 3) * (0.9 ** 8) * (0.1 ** 2)
jprob1, myprob1
# these should be about equal

Out[2]:
(3.5469405120849628e-06, 3.5469405120849624e-06)
In [3]:
# confirming that log version of jointProb works as expected
jprobL1 = hmm.jointProbL("FFFLLLFFFFF", "THTHHHTHTTH")
math.log(jprob1, 2), jprobL1

Out[3]:
(-18.104993435171657, -18.104993435171654)
In [4]:
# Trying another path
jprob2 = hmm.jointProb("FFFFFFFFFFF", "THTHHHTHTTH")
myprob2 = (0.5 ** 12) * (0.9 ** 10)
jprob2, myprob2
# these should be about equal

Out[4]:
(8.51265722900391e-05, 8.512657229003909e-05)
In [5]:
# Note that jprob2 is greater than jprob1

# Now we experiment with viterbi decoding
jprobOpt, path = hmm.viterbi("THTHHHTHTTH")
path

Out[5]:
'FFFFFFFFFFF'
In [6]:
# maximum likelihood path is same path (all fair) as the second one
# we tried above, so jprobOpt should equal jprob2
jprobOpt, jprob2

Out[6]:
(8.51265722900391e-05, 8.51265722900391e-05)
In [7]:
# confirming that log version of viterbi works as expected
jprobLOpt, _ = hmm.viterbiL("THTHHHTHTTH")
math.log(jprobOpt, 2), jprobLOpt

Out[7]:
(-13.520030934450498, -13.520030934450496)
In [8]:
# Now let's make a new HMM with the same states but where jumps
# between fair (F) and loaded (L) are much more probable
hmm = HMM({"FF":0.6, "FL":0.4, "LF":0.4, "LL":0.6}, # transition matrix A
{"FH":0.5, "FT":0.5, "LH":0.8, "LT":0.2}, # emission matrix E
{"F":0.5, "L":0.5}) # initial probabilities I
hmm.viterbi("THTHHHTHTTH")

Out[8]:
(2.8665446400000001e-06, 'FFFLLLFFFFL')
In [9]:
# Here's an example of underflow.  Note that probability returned
# is 0.0 and the state string becomes all Fs after a while.
hmm.viterbi("THTHHHTHTTH" * 100)

Out[9]:
(0.0,
'FFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF')
In [10]:
# Moving to log2 domain fixes underflow
hmm.viterbiL("THTHHHTHTTH" * 100)

Out[10]:
(-1824.4030071946879,
'FFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFL')
In [11]:
cpgHmm = HMM({'IO':0.20, 'OI':0.20, 'II':0.80, 'OO':0.80},
{'IA':0.10, 'IC':0.40, 'IG':0.40, 'IT':0.10,
'OA':0.25, 'OC':0.25, 'OG':0.25, 'OT':0.25},
{'I' :0.50, 'O' :0.50})
x = 'ATATATACGCGCGCGCGCGCGATATATATATATA'
logp, path = cpgHmm.viterbiL(x)
print(x)
print(path) # finds the CpG island fine

ATATATACGCGCGCGCGCGCGATATATATATATA
OOOOOOOIIIIIIIIIIIIIIOOOOOOOOOOOOO

In [12]:
x = 'ATATCGCGCGCGATATATCGCGCGCGATATATAT'
logp, path = cpgHmm.viterbiL(x)
print(x)
print(path) # finds two CpG islands fine

ATATCGCGCGCGATATATCGCGCGCGATATATAT
OOOOIIIIIIIIOOOOOOIIIIIIIIOOOOOOOO

In [13]:
x = 'ATATATACCCCCCCCCCCCCCATATATATATATA'
logp, path = cpgHmm.viterbiL(x)
print(x)
print(path) # oops! - this is just a bunch of Cs.  What went wrong?

ATATATACCCCCCCCCCCCCCATATATATATATA
OOOOOOOIIIIIIIIIIIIIIOOOOOOOOOOOOO