43

I'm doing a Python project in which I'd like to use the Viterbi Algorithm. Does anyone know of a complete Python implementation of the Viterbi algorithm? The correctness of the one on Wikipedia seems to be in question on the talk page. Does anyone have a pointer?

Jeffrey
  • 1,681
  • 3
  • 13
  • 23

6 Answers6

35

Here's mine. Its paraphrased directly from the psuedocode implemenation from wikipedia. It uses numpy for conveince of their ndarray but is otherwise a pure python3 implementation.

import numpy as np

def viterbi(y, A, B, Pi=None):
    """
    Return the MAP estimate of state trajectory of Hidden Markov Model.

    Parameters
    ----------
    y : array (T,)
        Observation state sequence. int dtype.
    A : array (K, K)
        State transition matrix. See HiddenMarkovModel.state_transition  for
        details.
    B : array (K, M)
        Emission matrix. See HiddenMarkovModel.emission for details.
    Pi: optional, (K,)
        Initial state probabilities: Pi[i] is the probability x[0] == i. If
        None, uniform initial distribution is assumed (Pi[:] == 1/K).

    Returns
    -------
    x : array (T,)
        Maximum a posteriori probability estimate of hidden state trajectory,
        conditioned on observation sequence y under the model parameters A, B,
        Pi.
    T1: array (K, T)
        the probability of the most likely path so far
    T2: array (K, T)
        the x_j-1 of the most likely path so far
    """
    # Cardinality of the state space
    K = A.shape[0]
    # Initialize the priors with default (uniform dist) if not given by caller
    Pi = Pi if Pi is not None else np.full(K, 1 / K)
    T = len(y)
    T1 = np.empty((K, T), 'd')
    T2 = np.empty((K, T), 'B')

    # Initilaize the tracking tables from first observation
    T1[:, 0] = Pi * B[:, y[0]]
    T2[:, 0] = 0

    # Iterate throught the observations updating the tracking tables
    for i in range(1, T):
        T1[:, i] = np.max(T1[:, i - 1] * A.T * B[np.newaxis, :, y[i]].T, 1)
        T2[:, i] = np.argmax(T1[:, i - 1] * A.T, 1)

    # Build the output, optimal model trajectory
    x = np.empty(T, 'B')
    x[-1] = np.argmax(T1[:, T - 1])
    for i in reversed(range(1, T)):
        x[i - 1] = T2[x[i], i]

    return x, T1, T2
RBF06
  • 2,013
  • 2
  • 21
  • 20
  • I get errors when running this algorithm on a np.array of 12 values containing 4 possible categories – John Stud Dec 21 '20 at 00:25
  • What errors do you get? and how are you trying to call the function? – RBF06 Jan 02 '21 at 18:06
  • What python packages are these functions HiddenMarkovModel.state_transition? I can't find the package "HiddenMarkovModel" in pip – Chris Feb 17 '23 at 15:16
15

I found the following code in the example repository of Artificial Intelligence: A Modern Approach. Is something like this what you're looking for?

def viterbi_segment(text, P):
    """Find the best segmentation of the string of characters, given the
    UnigramTextModel P."""
    # best[i] = best probability for text[0:i]
    # words[i] = best word ending at position i
    n = len(text)
    words = [''] + list(text)
    best = [1.0] + [0.0] * n
    ## Fill in the vectors best, words via dynamic programming
    for i in range(n+1):
        for j in range(0, i):
            w = text[j:i]
            if P[w] * best[i - len(w)] >= best[i]:
                best[i] = P[w] * best[i - len(w)]
                words[i] = w
    ## Now recover the sequence of best words
    sequence = []; i = len(words)-1
    while i > 0:
        sequence[0:0] = [words[i]]
        i = i - len(words[i])
    ## Return sequence of best words and overall probability
    return sequence, best[-1]
Will
  • 2,014
  • 2
  • 19
  • 42
11

Hmm I can post mine. Its not pretty though, please let me know if you need clarification. I wrote this relatively recently for specifically part of speech tagging.

class Trellis:
    trell = []
    def __init__(self, hmm, words):
        self.trell = []
        temp = {}
        for label in hmm.labels:
           temp[label] = [0,None]
        for word in words:
            self.trell.append([word,copy.deepcopy(temp)])
        self.fill_in(hmm)

    def fill_in(self,hmm):
        for i in range(len(self.trell)):
            for token in self.trell[i][1]:
                word = self.trell[i][0]
                if i == 0:
                    self.trell[i][1][token][0] = hmm.e(token,word)
                else:
                    max = None
                    guess = None
                    c = None
                    for k in self.trell[i-1][1]:
                        c = self.trell[i-1][1][k][0] + hmm.t(k,token)
                        if max == None or c > max:
                            max = c
                            guess = k
                    max += hmm.e(token,word)
                    self.trell[i][1][token][0] = max
                    self.trell[i][1][token][1] = guess

    def return_max(self):
        tokens = []
        token = None
        for i in range(len(self.trell)-1,-1,-1):
            if token == None:
                max = None
                guess = None
                for k in self.trell[i][1]:
                    if max == None or self.trell[i][1][k][0] > max:
                        max = self.trell[i][1][k][0]
                        token = self.trell[i][1][k][1]
                        guess = k
                tokens.append(guess)
            else:
                tokens.append(token)
                token = self.trell[i][1][token][1]
        tokens.reverse()
        return tokens
placeybordeaux
  • 2,138
  • 1
  • 20
  • 42
  • 1
    I am a bit confused why this is higher than the NLTK post, is their implementation incorrect? OP did you find my completely undocumented code satisfactory? – placeybordeaux Feb 19 '14 at 00:36
  • 4
    Probably the reason it is more easy to hack around to get adapted to one's needs than the NLTK code. – chiffa Nov 04 '14 at 17:34
  • @placeybordeaux What does this function `hmm.t(k,token)` do? I tried to replicate the code but I could not figure out what `hmm.t(k,token)` do. Can you provide an example for it? – Mohammed May 12 '17 at 14:37
  • 1
    @Mohammed hmm going back pretty far here, but I am pretty sure that `hmm.t(k, token)` is the probability of transitioning to token from state k and `hmm.e(token, word)` is the probability of emitting word given token. Looking at the NLTK code may be helpful as well. Honestly my post is not particularly pretty or readable. – placeybordeaux Jun 19 '17 at 18:17
7

I have just corrected the pseudo implementation of Viterbi in Wikipedia. From the initial (incorrect) version, it took me a while to figure out where I was going wrong but I finally managed it, thanks partly to Kevin Murphy's implementation of the viterbi_path.m in the MatLab HMM toolbox.

In the context of an HMM object with variables as shown:

hmm = HMM()
hmm.priors = np.array([0.5, 0.5]) # pi = prior probs
hmm.transition = np.array([[0.75, 0.25], # A = transition probs. / 2 states
                           [0.32, 0.68]])
hmm.emission = np.array([[0.8, 0.1, 0.1], # B = emission (observation) probs. / 3 obs modes
                         [0.1, 0.2, 0.7]])

The Python function to run Viterbi (best-path) algorithm is below:

def viterbi (self,observations):
    """Return the best path, given an HMM model and a sequence of observations"""
    # A - initialise stuff
    nSamples = len(observations[0])
    nStates = self.transition.shape[0] # number of states
    c = np.zeros(nSamples) #scale factors (necessary to prevent underflow)
    viterbi = np.zeros((nStates,nSamples)) # initialise viterbi table
    psi = np.zeros((nStates,nSamples)) # initialise the best path table
    best_path = np.zeros(nSamples); # this will be your output

    # B- appoint initial values for viterbi and best path (bp) tables - Eq (32a-32b)
    viterbi[:,0] = self.priors.T * self.emission[:,observations(0)]
    c[0] = 1.0/np.sum(viterbi[:,0])
    viterbi[:,0] = c[0] * viterbi[:,0] # apply the scaling factor
    psi[0] = 0;

    # C- Do the iterations for viterbi and psi for time>0 until T
    for t in range(1,nSamples): # loop through time
        for s in range (0,nStates): # loop through the states @(t-1)
            trans_p = viterbi[:,t-1] * self.transition[:,s]
            psi[s,t], viterbi[s,t] = max(enumerate(trans_p), key=operator.itemgetter(1))
            viterbi[s,t] = viterbi[s,t]*self.emission[s,observations(t)]

        c[t] = 1.0/np.sum(viterbi[:,t]) # scaling factor
        viterbi[:,t] = c[t] * viterbi[:,t]

    # D - Back-tracking
    best_path[nSamples-1] =  viterbi[:,nSamples-1].argmax() # last state
    for t in range(nSamples-1,0,-1): # states of (last-1)th to 0th time step
        best_path[t-1] = psi[best_path[t],t]

    return best_path
Zhubarb
  • 11,432
  • 18
  • 75
  • 114
  • 1
    Comment by [jahrulesoverall](http://stackoverflow.com/users/6925587/jahrulesoverall) posted incorrectly in answer, *observations(0) is wrong right? should be observations[0] and observations[t]?* – Petter Friberg Oct 05 '16 at 10:24
  • 1
    I don't understand how you don't get an error when doing `psi[best_path[t],t]` since `best_path` is type float and you can only index with ints? – Mike Vella Nov 13 '17 at 13:16
  • 1
    @MikeVella I added : `bp = np.zeros(nSamples).astype(int)` – Ant Nov 20 '17 at 00:22
5

This is an old question, but none of the other answers were quite what I needed because my application doesn't have specific observed states.

Taking after @Rhubarb, I've also re-implemented Kevin Murphey's Matlab implementation (see viterbi_path.m), but I've kept it closer to the original. I've included a simple test case as well.

import numpy as np


def viterbi_path(prior, transmat, obslik, scaled=True, ret_loglik=False):
    '''Finds the most-probable (Viterbi) path through the HMM state trellis
    Notation:
        Z[t] := Observation at time t
        Q[t] := Hidden state at time t
    Inputs:
        prior: np.array(num_hid)
            prior[i] := Pr(Q[0] == i)
        transmat: np.ndarray((num_hid,num_hid))
            transmat[i,j] := Pr(Q[t+1] == j | Q[t] == i)
        obslik: np.ndarray((num_hid,num_obs))
            obslik[i,t] := Pr(Z[t] | Q[t] == i)
        scaled: bool
            whether or not to normalize the probability trellis along the way
            doing so prevents underflow by repeated multiplications of probabilities
        ret_loglik: bool
            whether or not to return the log-likelihood of the best path
    Outputs:
        path: np.array(num_obs)
            path[t] := Q[t]
    '''
    num_hid = obslik.shape[0] # number of hidden states
    num_obs = obslik.shape[1] # number of observations (not observation *states*)

    # trellis_prob[i,t] := Pr((best sequence of length t-1 goes to state i), Z[1:(t+1)])
    trellis_prob = np.zeros((num_hid,num_obs))
    # trellis_state[i,t] := best predecessor state given that we ended up in state i at t
    trellis_state = np.zeros((num_hid,num_obs), dtype=int) # int because its elements will be used as indicies
    path = np.zeros(num_obs, dtype=int) # int because its elements will be used as indicies

    trellis_prob[:,0] = prior * obslik[:,0] # element-wise mult
    if scaled:
        scale = np.ones(num_obs) # only instantiated if necessary to save memory
        scale[0] = 1.0 / np.sum(trellis_prob[:,0])
        trellis_prob[:,0] *= scale[0]

    trellis_state[:,0] = 0 # arbitrary value since t == 0 has no predecessor
    for t in xrange(1, num_obs):
        for j in xrange(num_hid):
            trans_probs = trellis_prob[:,t-1] * transmat[:,j] # element-wise mult
            trellis_state[j,t] = trans_probs.argmax()
            trellis_prob[j,t] = trans_probs[trellis_state[j,t]] # max of trans_probs
            trellis_prob[j,t] *= obslik[j,t]
        if scaled:
            scale[t] = 1.0 / np.sum(trellis_prob[:,t])
            trellis_prob[:,t] *= scale[t]

    path[-1] = trellis_prob[:,-1].argmax()
    for t in range(num_obs-2, -1, -1):
        path[t] = trellis_state[(path[t+1]), t+1]

    if not ret_loglik:
        return path
    else:
        if scaled:
            loglik = -np.sum(np.log(scale))
        else:
            p = trellis_prob[path[-1],-1]
            loglik = np.log(p)
        return path, loglik


if __name__=='__main__':
    # Assume there are 3 observation states, 2 hidden states, and 5 observations
    priors = np.array([0.5, 0.5])
    transmat = np.array([
        [0.75, 0.25],
        [0.32, 0.68]])
    emmat = np.array([
        [0.8, 0.1, 0.1],
        [0.1, 0.2, 0.7]])
    observations = np.array([0, 1, 2, 1, 0], dtype=int)
    obslik = np.array([emmat[:,z] for z in observations]).T
    print viterbi_path(priors, transmat, obslik)                                #=> [0 1 1 1 0]
    print viterbi_path(priors, transmat, obslik, scaled=False)                  #=> [0 1 1 1 0]
    print viterbi_path(priors, transmat, obslik, ret_loglik=True)               #=> (array([0, 1, 1, 1, 0]), -7.776472586614755)
    print viterbi_path(priors, transmat, obslik, scaled=False, ret_loglik=True) #=> (array([0, 1, 1, 1, 0]), -8.0120386579275227)

Note that this implementation does not use emission probabilities directly but uses a variable obslik. Generally, emissions[i,j] := Pr(observed_state == j | hidden_state == i) for a particular observed state i, making emissions.shape == (num_hidden_states, num_obs_states).

However, given a sequence observations[t] := observation at time t, all the Viterbi Algorithm requires is the likelihood of that observation for each hidden state. Hence, obslik[i,t] := Pr(observations[t] | hidden_state == i). The actual value the of the observed state isn't necessary.

hhquark
  • 379
  • 3
  • 10
  • 1
    I know that this thread is pretty old, bt can you explain what exactly the obslik variable does. Does it tell me the probability distribution of the states for each timestep? If so, in your example , obslik = np.array([emmat[:,z] for z in observations]).T gives : [[0.8 0.1 0.1 0.1 0.8] [0.1 0.2 0.7 0.2 0.1]] Does that mean that for time step 2 I have a probability of 0.1 to be hidden state 1 and a probability of 0.2 to be hidden state 2 ? If so, shouldn't the values in each column add up to 1 ? Thanks! – teoML Jun 24 '22 at 10:20
  • Good question! No, `obslik` does not work like that (hence `lik` for "likelihood" instead of `prob` for "probability"). Instead it works like this: if `observations[t] == j`, then `obslik[t, :] == emmat[:, j]`. It's taking column slices of `emmat`. But `emmat` sums to 1 across its *rows*, not its columns. So the columns of `obslik` don't sum to 1 because the columns of `emmat` don't sum to 1. – hhquark Jun 25 '22 at 20:37
  • And how can I compute obslik ? In my particular case I have for each observation a probability distribution over the hidden states. – teoML Jun 27 '22 at 10:01
2

I have modified @Rhubarb's answer for the condition where the marginal probabilities are already known (e.g by computing the Forward Backward algorithm).

def viterbi (transition_probabilities, conditional_probabilities):
    # Initialise everything
    num_samples = conditional_probabilities.shape[1]
    num_states = transition_probabilities.shape[0] # number of states

    c = np.zeros(num_samples) #scale factors (necessary to prevent underflow)
    viterbi = np.zeros((num_states,num_samples)) # initialise viterbi table
    best_path_table = np.zeros((num_states,num_samples)) # initialise the best path table
    best_path = np.zeros(num_samples).astype(np.int32) # this will be your output

    # B- appoint initial values for viterbi and best path (bp) tables - Eq (32a-32b)
    viterbi[:,0] = conditional_probabilities[:,0]
    c[0] = 1.0/np.sum(viterbi[:,0])
    viterbi[:,0] = c[0] * viterbi[:,0] # apply the scaling factor

    # C- Do the iterations for viterbi and psi for time>0 until T
    for t in range(1, num_samples): # loop through time
        for s in range (0,num_states): # loop through the states @(t-1)
            trans_p = viterbi[:, t-1] * transition_probabilities[:,s] # transition probs of each state transitioning
            best_path_table[s,t], viterbi[s,t] = max(enumerate(trans_p), key=operator.itemgetter(1))
            viterbi[s,t] = viterbi[s,t] * conditional_probabilities[s][t]

        c[t] = 1.0/np.sum(viterbi[:,t]) # scaling factor
        viterbi[:,t] = c[t] * viterbi[:,t]

    ## D - Back-tracking
    best_path[num_samples-1] =  viterbi[:,num_samples-1].argmax() # last state
    for t in range(num_samples-1,0,-1): # states of (last-1)th to 0th time step
        best_path[t-1] = best_path_table[best_path[t],t]
    return best_path
Mike Vella
  • 10,187
  • 14
  • 59
  • 86