Package nltk :: Package tag :: Module hmm
[hide private]
[frames] | no frames]

Source Code for Module nltk.tag.hmm

   1  # Natural Language Toolkit: Hidden Markov Model 
   2  # 
   3  # Copyright (C) 2001-2008 NLTK Project 
   4  # Author: Trevor Cohn <[email protected]> 
   5  #         Philip Blunsom <[email protected]> 
   6  #         Tiago Tresoldi <[email protected]> (fixes) 
   7  #         Steven Bird <[email protected]> (fixes) 
   8  #         Joseph Frazee <[email protected]> (fixes) 
   9  # URL: <http://nltk.org> 
  10  # For license information, see LICENSE.TXT 
  11  # 
  12  # $Id: hmm.py 6438 2008-08-15 00:00:16Z jpf3 $ 
  13   
  14  """ 
  15  Hidden Markov Models (HMMs) largely used to assign the correct label sequence 
  16  to sequential data or assess the probability of a given label and data 
  17  sequence. These models are finite state machines characterised by a number of 
  18  states, transitions between these states, and output symbols emitted while in 
  19  each state. The HMM is an extension to the Markov chain, where each state 
  20  corresponds deterministically to a given event. In the HMM the observation is 
  21  a probabilistic function of the state. HMMs share the Markov chain's 
  22  assumption, being that the probability of transition from one state to another 
  23  only depends on the current state - i.e. the series of states that led to the 
  24  current state are not used. They are also time invariant. 
  25   
  26  The HMM is a directed graph, with probability weighted edges (representing the 
  27  probability of a transition between the source and sink states) where each 
  28  vertex emits an output symbol when entered. The symbol (or observation) is 
  29  non-deterministically generated. For this reason, knowing that a sequence of 
  30  output observations was generated by a given HMM does not mean that the 
  31  corresponding sequence of states (and what the current state is) is known. 
  32  This is the 'hidden' in the hidden markov model. 
  33   
  34  Formally, a HMM can be characterised by: 
  35      - the output observation alphabet. This is the set of symbols which may be 
  36        observed as output of the system.  
  37      - the set of states.  
  38      - the transition probabilities M{a_{ij} = P(s_t = j | s_{t-1} = i)}. These 
  39        represent the probability of transition to each state from a given 
  40        state.  
  41      - the output probability matrix M{b_i(k) = P(X_t = o_k | s_t = i)}. These 
  42        represent the probability of observing each symbol in a given state. 
  43      - the initial state distribution. This gives the probability of starting 
  44        in each state. 
  45   
  46  To ground this discussion, take a common NLP application, part-of-speech (POS) 
  47  tagging. An HMM is desirable for this task as the highest probability tag 
  48  sequence can be calculated for a given sequence of word forms. This differs 
  49  from other tagging techniques which often tag each word individually, seeking 
  50  to optimise each individual tagging greedily without regard to the optimal 
  51  combination of tags for a larger unit, such as a sentence. The HMM does this 
  52  with the Viterbi algorithm, which efficiently computes the optimal path 
  53  through the graph given the sequence of words forms. 
  54   
  55  In POS tagging the states usually have a 1:1 correspondence with the tag 
  56  alphabet - i.e. each state represents a single tag. The output observation 
  57  alphabet is the set of word forms (the lexicon), and the remaining three 
  58  parameters are derived by a training regime. With this information the 
  59  probability of a given sentence can be easily derived, by simply summing the 
  60  probability of each distinct path through the model. Similarly, the highest 
  61  probability tagging sequence can be derived with the Viterbi algorithm, 
  62  yielding a state sequence which can be mapped into a tag sequence. 
  63   
  64  This discussion assumes that the HMM has been trained. This is probably the 
  65  most difficult task with the model, and requires either MLE estimates of the 
  66  parameters or unsupervised learning using the Baum-Welch algorithm, a variant 
  67  of EM. 
  68  """ 
  69   
  70  import re 
  71  import types 
  72   
  73  from numpy import * 
  74   
  75  from nltk import FreqDist, ConditionalFreqDist, ConditionalProbDist, \ 
  76       DictionaryProbDist, DictionaryConditionalProbDist, LidstoneProbDist, \ 
  77       MutableProbDist, MLEProbDist 
  78  from nltk.internals import deprecated 
  79  from nltk.evaluate import accuracy as accuracy_ 
  80  from nltk.util import LazyMap, LazyConcatenation, LazyZip 
  81   
  82  from api import * 
  83   
  84  # _NINF = float('-inf')  # won't work on Windows 
  85  _NINF = float('-1e300') 
  86   
  87  _TEXT = 0  # index of text in a tuple 
  88  _TAG = 1   # index of tag in a tuple 
  89       
90 -class HiddenMarkovModelTagger(TaggerI):
91 """ 92 Hidden Markov model class, a generative model for labelling sequence data. 93 These models define the joint probability of a sequence of symbols and 94 their labels (state transitions) as the product of the starting state 95 probability, the probability of each state transition, and the probability 96 of each observation being generated from each state. This is described in 97 more detail in the module documentation. 98 99 This implementation is based on the HMM description in Chapter 8, Huang, 100 Acero and Hon, Spoken Language Processing and includes an extension for 101 training shallow HMM parsers or specializaed HMMs as in Molina et. 102 al, 2002. A specialized HMM modifies training data by applying a 103 specialization function to create a new training set that is more 104 appropriate for sequential tagging with an HMM. A typical use case is 105 chunking. 106 """
107 - def __init__(self, symbols, states, transitions, outputs, priors, **kwargs):
108 """ 109 Creates a hidden markov model parametised by the the states, 110 transition probabilities, output probabilities and priors. 111 112 @param symbols: the set of output symbols (alphabet) 113 @type symbols: seq of any 114 @param states: a set of states representing state space 115 @type states: seq of any 116 @param transitions: transition probabilities; Pr(s_i | s_j) is the 117 probability of transition from state i given the model is in 118 state_j 119 @type transitions: C{ConditionalProbDistI} 120 @param outputs: output probabilities; Pr(o_k | s_i) is the probability 121 of emitting symbol k when entering state i 122 @type outputs: C{ConditionalProbDistI} 123 @param priors: initial state distribution; Pr(s_i) is the probability 124 of starting in state i 125 @type priors: C{ProbDistI} 126 @kwparam transform: an optional function for transforming training 127 instances, defaults to the identity function. 128 @type transform: C{function} or C{HiddenMarkovModelTaggerTransform} 129 """ 130 self._states = states 131 self._transitions = transitions 132 self._symbols = symbols 133 self._outputs = outputs 134 self._priors = priors 135 self._cache = None 136 137 self._transform = kwargs.get('transform', IdentityTransform()) 138 if isinstance(self._transform, types.FunctionType): 139 self._transform = LambdaTransform(self._transform) 140 elif not isinstance(self._transform, 141 HiddenMarkovModelTaggerTransformI): 142 raise
143 144 @classmethod
145 - def _train(cls, labeled_sequence, test_sequence=None, 146 unlabeled_sequence=None, **kwargs):
147 transform = kwargs.get('transform', IdentityTransform()) 148 if isinstance(transform, types.FunctionType): 149 transform = LambdaTransform(transform) 150 elif \ 151 not isinstance(transform, HiddenMarkovModelTaggerTransformI): 152 raise 153 154 estimator = kwargs.get('estimator', lambda fd, bins: \ 155 LidstoneProbDist(fd, 0.1, bins)) 156 157 labeled_sequence = LazyMap(transform.transform, labeled_sequence) 158 symbols = list(set(word for sent in labeled_sequence 159 for word, tag in sent)) 160 tag_set = list(set(tag for sent in labeled_sequence 161 for word, tag in sent)) 162 163 trainer = HiddenMarkovModelTrainer(tag_set, symbols) 164 hmm = trainer.train_supervised(labeled_sequence, estimator=estimator) 165 hmm = cls(hmm._symbols, hmm._states, hmm._transitions, hmm._outputs, 166 hmm._priors, transform=transform) 167 168 if test_sequence: 169 hmm.test(test_sequence, verbose=kwargs.get('verbose', False)) 170 171 if unlabeled_sequence: 172 max_iterations = kwargs.get('max_iterations', 5) 173 hmm = trainer.train_unsupervised(unlabeled_sequence, model=hmm, 174 max_iterations=max_iterations) 175 if test_sequence: 176 hmm.test(test_sequence, verbose=kwargs.get('verbose', False)) 177 178 return hmm
179 180 @classmethod
181 - def train(cls, labeled_sequence, test_sequence=None, 182 unlabeled_sequence=None, **kwargs):
183 """ 184 Train a new C{HiddenMarkovModelTagger} using the given labeled and 185 unlabeled training instances. Testing will be performed if test 186 instances are provided. 187 188 @return: a hidden markov model tagger 189 @rtype: C{HiddenMarkovModelTagger} 190 @param labeled_sequence: a sequence of labeled training instances, 191 i.e. a list of sentences represented as tuples 192 @type labeled_sequence: C{list} of C{list} 193 @param test_sequence: a sequence of labeled test instances 194 @type test_sequence: C{list} of C{list} 195 @param unlabeled_sequence: a sequence of unlabeled training instances, 196 i.e. a list of sentences represented as words 197 @type unlabeled_sequence: C{list} of C{list} 198 @kwparam transform: an optional function for transforming training 199 instances, defaults to the identity function, see L{transform()} 200 @type transform: C{function} 201 @kwparam estimator: an optional function or class that maps a 202 condition's frequency distribution to its probability 203 distribution, defaults to a Lidstone distribution with gamma = 0.1 204 @type estimator: C{class} or C{function} 205 @kwparam verbose: boolean flag indicating whether training should be 206 verbose or include printed output 207 @type verbose: C{bool} 208 @kwparam max_iterations: number of Baum-Welch interations to perform 209 @type max_iterations: C{int} 210 """ 211 return cls._train(labeled_sequence, test_sequence, 212 unlabeled_sequence, **kwargs)
213
214 - def probability(self, sequence):
215 """ 216 Returns the probability of the given symbol sequence. If the sequence 217 is labelled, then returns the joint probability of the symbol, state 218 sequence. Otherwise, uses the forward algorithm to find the 219 probability over all label sequences. 220 221 @return: the probability of the sequence 222 @rtype: float 223 @param sequence: the sequence of symbols which must contain the TEXT 224 property, and optionally the TAG property 225 @type sequence: Token 226 """ 227 return 2**(self.log_probability(self._transform.transform(sequence)))
228
229 - def log_probability(self, sequence):
230 """ 231 Returns the log-probability of the given symbol sequence. If the 232 sequence is labelled, then returns the joint log-probability of the 233 symbol, state sequence. Otherwise, uses the forward algorithm to find 234 the log-probability over all label sequences. 235 236 @return: the log-probability of the sequence 237 @rtype: float 238 @param sequence: the sequence of symbols which must contain the TEXT 239 property, and optionally the TAG property 240 @type sequence: Token 241 """ 242 sequence = self._transform.transform(sequence) 243 244 T = len(sequence) 245 N = len(self._states) 246 247 if T > 0 and sequence[0][_TAG]: 248 last_state = sequence[0][_TAG] 249 p = self._priors.logprob(last_state) + \ 250 self._outputs[last_state].logprob(sequence[0][_TEXT]) 251 for t in range(1, T): 252 state = sequence[t][_TAG] 253 p += self._transitions[last_state].logprob(state) + \ 254 self._outputs[state].logprob(sequence[t][_TEXT]) 255 last_state = state 256 return p 257 else: 258 alpha = self._forward_probability(sequence) 259 p = _log_add(*alpha[T-1, :]) 260 return p
261
262 - def tag(self, unlabeled_sequence):
263 """ 264 Tags the sequence with the highest probability state sequence. This 265 uses the best_path method to find the Viterbi path. 266 267 @return: a labelled sequence of symbols 268 @rtype: list 269 @param unlabeled_sequence: the sequence of unlabeled symbols 270 @type unlabeled_sequence: list 271 """ 272 unlabeled_sequence = self._transform.transform(unlabeled_sequence) 273 return self._tag(unlabeled_sequence)
274
275 - def _tag(self, unlabeled_sequence):
276 path = self._best_path(unlabeled_sequence) 277 return zip(unlabeled_sequence, path)
278
279 - def _output_logprob(self, state, symbol):
280 """ 281 @return: the log probability of the symbol being observed in the given 282 state 283 @rtype: float 284 """ 285 return self._outputs[state].logprob(symbol)
286
287 - def _create_cache(self):
288 """ 289 The cache is a tuple (P, O, X, S) where: 290 291 - S maps symbols to integers. I.e., it is the inverse 292 mapping from self._symbols; for each symbol s in 293 self._symbols, the following is true:: 294 295 self._symbols[S[s]] == s 296 297 - O is the log output probabilities:: 298 299 O[i,k] = log( P(token[t]=sym[k]|tag[t]=state[i]) ) 300 301 - X is the log transition probabilities:: 302 303 X[i,j] = log( P(tag[t]=state[j]|tag[t-1]=state[i]) ) 304 305 - P is the log prior probabilities:: 306 307 P[i] = log( P(tag[0]=state[i]) ) 308 """ 309 if not self._cache: 310 N = len(self._states) 311 M = len(self._symbols) 312 P = zeros(N, float32) 313 X = zeros((N, N), float32) 314 O = zeros((N, M), float32) 315 for i in range(N): 316 si = self._states[i] 317 P[i] = self._priors.logprob(si) 318 for j in range(N): 319 X[i, j] = self._transitions[si].logprob(self._states[j]) 320 for k in range(M): 321 O[i, k] = self._outputs[si].logprob(self._symbols[k]) 322 S = {} 323 for k in range(M): 324 S[self._symbols[k]] = k 325 self._cache = (P, O, X, S)
326
327 - def _update_cache(self, symbols):
328 # add new symbols to the symbol table and repopulate the output 329 # probabilities and symbol table mapping 330 if symbols: 331 self._create_cache() 332 P, O, X, S = self._cache 333 for symbol in symbols: 334 if symbol not in self._symbols: 335 self._cache = None 336 self._symbols.append(symbol) 337 # don't bother with the work if there aren't any new symbols 338 if not self._cache: 339 N = len(self._states) 340 M = len(self._symbols) 341 Q = O.shape[1] 342 # add new columns to the output probability table without 343 # destroying the old probabilities 344 O = hstack([O, zeros((N, M - Q), float32)]) 345 for i in range(N): 346 si = self._states[i] 347 # only calculate probabilities for new symbols 348 for k in range(Q, M): 349 O[i, k] = self._outputs[si].logprob(self._symbols[k]) 350 # only create symbol mappings for new symbols 351 for k in range(Q, M): 352 S[self._symbols[k]] = k 353 self._cache = (P, O, X, S)
354
355 - def best_path(self, unlabeled_sequence):
356 """ 357 Returns the state sequence of the optimal (most probable) path through 358 the HMM. Uses the Viterbi algorithm to calculate this part by dynamic 359 programming. 360 361 @return: the state sequence 362 @rtype: sequence of any 363 @param unlabeled_sequence: the sequence of unlabeled symbols 364 @type unlabeled_sequence: list 365 """ 366 unlabeled_sequence = self._transform.transform(unlabeled_sequence) 367 return self._best_path(unlabeled_sequence)
368
369 - def _best_path(self, unlabeled_sequence):
370 T = len(unlabeled_sequence) 371 N = len(self._states) 372 self._create_cache() 373 self._update_cache(unlabeled_sequence) 374 P, O, X, S = self._cache 375 376 V = zeros((T, N), float32) 377 B = ones((T, N), int) * -1 378 379 V[0] = P + O[:, S[unlabeled_sequence[0]]] 380 for t in range(1, T): 381 for j in range(N): 382 vs = V[t-1, :] + X[:, j] 383 best = argmax(vs) 384 V[t, j] = vs[best] + O[j, S[unlabeled_sequence[t]]] 385 B[t, j] = best 386 387 current = argmax(V[T-1,:]) 388 sequence = [current] 389 for t in range(T-1, 0, -1): 390 last = B[t, current] 391 sequence.append(last) 392 current = last 393 394 sequence.reverse() 395 return map(self._states.__getitem__, sequence)
396
397 - def best_path_simple(self, unlabeled_sequence):
398 """ 399 Returns the state sequence of the optimal (most probable) path through 400 the HMM. Uses the Viterbi algorithm to calculate this part by dynamic 401 programming. This uses a simple, direct method, and is included for 402 teaching purposes. 403 404 @return: the state sequence 405 @rtype: sequence of any 406 @param unlabeled_sequence: the sequence of unlabeled symbols 407 @type unlabeled_sequence: list 408 """ 409 unlabeled_sequence = self._transform.transform(unlabeled_sequence) 410 return self._best_path_simple(unlabeled_sequence)
411
412 - def _best_path_simple(self, unlabeled_sequence):
413 T = len(unlabeled_sequence) 414 N = len(self._states) 415 V = zeros((T, N), float64) 416 B = {} 417 418 # find the starting log probabilities for each state 419 symbol = unlabeled_sequence[0] 420 for i, state in enumerate(self._states): 421 V[0, i] = self._priors.logprob(state) + \ 422 self._output_logprob(state, symbol) 423 B[0, state] = None 424 425 # find the maximum log probabilities for reaching each state at time t 426 for t in range(1, T): 427 symbol = unlabeled_sequence[t] 428 for j in range(N): 429 sj = self._states[j] 430 best = None 431 for i in range(N): 432 si = self._states[i] 433 va = V[t-1, i] + self._transitions[si].logprob(sj) 434 if not best or va > best[0]: 435 best = (va, si) 436 V[t, j] = best[0] + self._output_logprob(sj, symbol) 437 B[t, sj] = best[1] 438 439 # find the highest probability final state 440 best = None 441 for i in range(N): 442 val = V[T-1, i] 443 if not best or val > best[0]: 444 best = (val, self._states[i]) 445 446 # traverse the back-pointers B to find the state sequence 447 current = best[1] 448 sequence = [current] 449 for t in range(T-1, 0, -1): 450 last = B[t, current] 451 sequence.append(last) 452 current = last 453 454 sequence.reverse() 455 return sequence
456
457 - def random_sample(self, rng, length):
458 """ 459 Randomly sample the HMM to generate a sentence of a given length. This 460 samples the prior distribution then the observation distribution and 461 transition distribution for each subsequent observation and state. 462 This will mostly generate unintelligible garbage, but can provide some 463 amusement. 464 465 @return: the randomly created state/observation sequence, 466 generated according to the HMM's probability 467 distributions. The SUBTOKENS have TEXT and TAG 468 properties containing the observation and state 469 respectively. 470 @rtype: list 471 @param rng: random number generator 472 @type rng: Random (or any object with a random() method) 473 @param length: desired output length 474 @type length: int 475 """ 476 477 # sample the starting state and symbol prob dists 478 tokens = [] 479 state = self._sample_probdist(self._priors, rng.random(), self._states) 480 symbol = self._sample_probdist(self._outputs[state], 481 rng.random(), self._symbols) 482 tokens.append((symbol, state)) 483 484 for i in range(1, length): 485 # sample the state transition and symbol prob dists 486 state = self._sample_probdist(self._transitions[state], 487 rng.random(), self._states) 488 symbol = self._sample_probdist(self._outputs[state], 489 rng.random(), self._symbols) 490 tokens.append((symbol, state)) 491 492 return tokens
493
494 - def _sample_probdist(self, probdist, p, samples):
495 cum_p = 0 496 for sample in samples: 497 add_p = probdist.prob(sample) 498 if cum_p <= p <= cum_p + add_p: 499 return sample 500 cum_p += add_p 501 raise Exception('Invalid probability distribution - ' 502 'does not sum to one')
503
504 - def entropy(self, unlabeled_sequence):
505 """ 506 Returns the entropy over labellings of the given sequence. This is 507 given by:: 508 509 H(O) = - sum_S Pr(S | O) log Pr(S | O) 510 511 where the summation ranges over all state sequences, S. Let M{Z = 512 Pr(O) = sum_S Pr(S, O)} where the summation ranges over all state 513 sequences and O is the observation sequence. As such the entropy can 514 be re-expressed as:: 515 516 H = - sum_S Pr(S | O) log [ Pr(S, O) / Z ] 517 = log Z - sum_S Pr(S | O) log Pr(S, 0) 518 = log Z - sum_S Pr(S | O) [ log Pr(S_0) + sum_t Pr(S_t | S_{t-1}) 519 + sum_t Pr(O_t | S_t) ] 520 521 The order of summation for the log terms can be flipped, allowing 522 dynamic programming to be used to calculate the entropy. Specifically, 523 we use the forward and backward probabilities (alpha, beta) giving:: 524 525 H = log Z - sum_s0 alpha_0(s0) beta_0(s0) / Z * log Pr(s0) 526 + sum_t,si,sj alpha_t(si) Pr(sj | si) Pr(O_t+1 | sj) beta_t(sj) 527 / Z * log Pr(sj | si) 528 + sum_t,st alpha_t(st) beta_t(st) / Z * log Pr(O_t | st) 529 530 This simply uses alpha and beta to find the probabilities of partial 531 sequences, constrained to include the given state(s) at some point in 532 time. 533 """ 534 unlabeled_sequence = self._transform.transform(unlabeled_sequence) 535 536 T = len(unlabeled_sequence) 537 N = len(self._states) 538 539 alpha = self._forward_probability(unlabeled_sequence) 540 beta = self._backward_probability(unlabeled_sequence) 541 normalisation = _log_add(*alpha[T-1, :]) 542 543 entropy = normalisation 544 545 # starting state, t = 0 546 for i, state in enumerate(self._states): 547 p = 2**(alpha[0, i] + beta[0, i] - normalisation) 548 entropy -= p * self._priors.logprob(state) 549 #print 'p(s_0 = %s) =' % state, p 550 551 # state transitions 552 for t0 in range(T - 1): 553 t1 = t0 + 1 554 for i0, s0 in enumerate(self._states): 555 for i1, s1 in enumerate(self._states): 556 p = 2**(alpha[t0, i0] + self._transitions[s0].logprob(s1) + 557 self._outputs[s1].logprob( 558 unlabeled_sequence[t1][_TEXT]) + 559 beta[t1, i1] - normalisation) 560 entropy -= p * self._transitions[s0].logprob(s1) 561 #print 'p(s_%d = %s, s_%d = %s) =' % (t0, s0, t1, s1), p 562 563 # symbol emissions 564 for t in range(T): 565 for i, state in enumerate(self._states): 566 p = 2**(alpha[t, i] + beta[t, i] - normalisation) 567 entropy -= p * self._outputs[state].logprob( 568 unlabeled_sequence[t][_TEXT]) 569 #print 'p(s_%d = %s) =' % (t, state), p 570 571 return entropy
572
573 - def point_entropy(self, unlabeled_sequence):
574 """ 575 Returns the pointwise entropy over the possible states at each 576 position in the chain, given the observation sequence. 577 """ 578 unlabeled_sequence = self._transform.transform(unlabeled_sequence) 579 580 T = len(unlabeled_sequence) 581 N = len(self._states) 582 583 alpha = self._forward_probability(unlabeled_sequence) 584 beta = self._backward_probability(unlabeled_sequence) 585 normalisation = _log_add(*alpha[T-1, :]) 586 587 entropies = zeros(T, float64) 588 probs = zeros(N, float64) 589 for t in range(T): 590 for s in range(N): 591 probs[s] = alpha[t, s] + beta[t, s] - normalisation 592 593 for s in range(N): 594 entropies[t] -= 2**(probs[s]) * probs[s] 595 596 return entropies
597
598 - def _exhaustive_entropy(self, unlabeled_sequence):
599 unlabeled_sequence = self._transform.transform(unlabeled_sequence) 600 601 T = len(unlabeled_sequence) 602 N = len(self._states) 603 604 labellings = [[state] for state in self._states] 605 for t in range(T - 1): 606 current = labellings 607 labellings = [] 608 for labelling in current: 609 for state in self._states: 610 labellings.append(labelling + [state]) 611 612 log_probs = [] 613 for labelling in labellings: 614 labelled_sequence = unlabeled_sequence[:] 615 for t, label in enumerate(labelling): 616 labelled_sequence[t] = (labelled_sequence[t][_TEXT], label) 617 lp = self.log_probability(labelled_sequence) 618 log_probs.append(lp) 619 normalisation = _log_add(*log_probs) 620 621 #ps = zeros((T, N), float64) 622 #for labelling, lp in zip(labellings, log_probs): 623 #for t in range(T): 624 #ps[t, self._states.index(labelling[t])] += \ 625 # 2**(lp - normalisation) 626 627 #for t in range(T): 628 #print 'prob[%d] =' % t, ps[t] 629 630 entropy = 0 631 for lp in log_probs: 632 lp -= normalisation 633 entropy -= 2**(lp) * lp 634 635 return entropy
636
637 - def _exhaustive_point_entropy(self, unlabeled_sequence):
638 unlabeled_sequence = self._transform.transform(unlabeled_sequence) 639 640 T = len(unlabeled_sequence) 641 N = len(self._states) 642 643 labellings = [[state] for state in self._states] 644 for t in range(T - 1): 645 current = labellings 646 labellings = [] 647 for labelling in current: 648 for state in self._states: 649 labellings.append(labelling + [state]) 650 651 log_probs = [] 652 for labelling in labellings: 653 labelled_sequence = unlabeled_sequence[:] 654 for t, label in enumerate(labelling): 655 labelled_sequence[t] = (labelled_sequence[t][_TEXT], label) 656 lp = self.log_probability(labelled_sequence) 657 log_probs.append(lp) 658 659 normalisation = _log_add(*log_probs) 660 661 probabilities = zeros((T, N), float64) 662 probabilities[:] = _NINF 663 for labelling, lp in zip(labellings, log_probs): 664 lp -= normalisation 665 for t, label in enumerate(labelling): 666 index = self._states.index(label) 667 probabilities[t, index] = _log_add(probabilities[t, index], lp) 668 669 entropies = zeros(T, float64) 670 for t in range(T): 671 for s in range(N): 672 entropies[t] -= 2**(probabilities[t, s]) * probabilities[t, s] 673 674 return entropies
675
676 - def _forward_probability(self, unlabeled_sequence):
677 """ 678 Return the forward probability matrix, a T by N array of 679 log-probabilities, where T is the length of the sequence and N is the 680 number of states. Each entry (t, s) gives the probability of being in 681 state s at time t after observing the partial symbol sequence up to 682 and including t. 683 684 @param unlabeled_sequence: the sequence of unlabeled symbols 685 @type unlabeled_sequence: list 686 @return: the forward log probability matrix 687 @rtype: array 688 """ 689 T = len(unlabeled_sequence) 690 N = len(self._states) 691 alpha = zeros((T, N), float64) 692 693 symbol = unlabeled_sequence[0][_TEXT] 694 for i, state in enumerate(self._states): 695 alpha[0, i] = self._priors.logprob(state) + \ 696 self._outputs[state].logprob(symbol) 697 for t in range(1, T): 698 symbol = unlabeled_sequence[t][_TEXT] 699 for i, si in enumerate(self._states): 700 alpha[t, i] = _NINF 701 for j, sj in enumerate(self._states): 702 alpha[t, i] = _log_add(alpha[t, i], alpha[t-1, j] + 703 self._transitions[sj].logprob(si)) 704 alpha[t, i] += self._outputs[si].logprob(symbol) 705 706 return alpha
707
708 - def _backward_probability(self, unlabeled_sequence):
709 """ 710 Return the backward probability matrix, a T by N array of 711 log-probabilities, where T is the length of the sequence and N is the 712 number of states. Each entry (t, s) gives the probability of being in 713 state s at time t after observing the partial symbol sequence from t 714 .. T. 715 716 @return: the backward log probability matrix 717 @rtype: array 718 @param unlabeled_sequence: the sequence of unlabeled symbols 719 @type unlabeled_sequence: list 720 """ 721 T = len(unlabeled_sequence) 722 N = len(self._states) 723 beta = zeros((T, N), float64) 724 725 # initialise the backward values 726 beta[T-1, :] = log2(1) 727 728 # inductively calculate remaining backward values 729 for t in range(T-2, -1, -1): 730 symbol = unlabeled_sequence[t+1][_TEXT] 731 for i, si in enumerate(self._states): 732 beta[t, i] = _NINF 733 for j, sj in enumerate(self._states): 734 beta[t, i] = _log_add(beta[t, i], 735 self._transitions[si].logprob(sj) + 736 self._outputs[sj].logprob(symbol) + 737 beta[t + 1, j]) 738 739 return beta
740
741 - def test(self, test_sequence, **kwargs):
742 """ 743 Tests the C{HiddenMarkovModelTagger} instance. 744 745 @param test_sequence: a sequence of labeled test instances 746 @type test_sequence: C{list} of C{list} 747 @kwparam verbose: boolean flag indicating whether training should be 748 verbose or include printed output 749 @type verbose: C{bool} 750 """ 751 752 def words(sent): 753 return [word for (word, tag) in sent]
754 755 def tags(sent): 756 return [tag for (word, tag) in sent]
757 758 test_sequence = LazyMap(self._transform.transform, test_sequence) 759 predicted_sequence = LazyMap(self._tag, LazyMap(words, test_sequence)) 760 761 if kwargs.get('verbose', False): 762 # This will be used again later for accuracy so there's no sense 763 # in tagging it twice. 764 test_sequence = list(test_sequence) 765 predicted_sequence = list(predicted_sequence) 766 767 for sent in predicted_sequence: 768 print 'Test:', \ 769 ' '.join(['%s/%s' % (str(token), str(tag)) 770 for (token, tag) in sent]) 771 print 772 print 'Untagged:', \ 773 ' '.join([str(token) for (token, tag) in sent]) 774 print 775 print 'HMM-tagged:', \ 776 ' '.join(['%s/%s' % (str(token), str(tag)) 777 for (token, tag) in sent]) 778 print 779 print 'Entropy:', \ 780 self.entropy([(token, None) for (token, tag) in sent]) 781 print 782 print '-' * 60 783 784 test_tags = LazyConcatenation(LazyMap(tags, test_sequence)) 785 predicted_tags = LazyConcatenation(LazyMap(tags, predicted_sequence)) 786 787 acc = accuracy_(test_tags, predicted_tags) 788 789 count = sum([len(sent) for sent in test_sequence]) 790 791 print 'accuracy over %d tokens: %.2f' % (count, acc * 100) 792
793 - def __repr__(self):
794 return ('<HiddenMarkovModelTagger %d states and %d output symbols>' 795 % (len(self._states), len(self._symbols)))
796 797
798 -class HiddenMarkovModelTrainer(object):
799 """ 800 Algorithms for learning HMM parameters from training data. These include 801 both supervised learning (MLE) and unsupervised learning (Baum-Welch). 802 """
803 - def __init__(self, states=None, symbols=None):
804 """ 805 Creates an HMM trainer to induce an HMM with the given states and 806 output symbol alphabet. A supervised and unsupervised training 807 method may be used. If either of the states or symbols are not given, 808 these may be derived from supervised training. 809 810 @param states: the set of state labels 811 @type states: sequence of any 812 @param symbols: the set of observation symbols 813 @type symbols: sequence of any 814 """ 815 if states: 816 self._states = states 817 else: 818 self._states = [] 819 if symbols: 820 self._symbols = symbols 821 else: 822 self._symbols = []
823
824 - def train(self, labelled_sequences=None, unlabeled_sequences=None, 825 **kwargs):
826 """ 827 Trains the HMM using both (or either of) supervised and unsupervised 828 techniques. 829 830 @return: the trained model 831 @rtype: HiddenMarkovModelTagger 832 @param labelled_sequences: the supervised training data, a set of 833 labelled sequences of observations 834 @type labelled_sequences: list 835 @param unlabeled_sequences: the unsupervised training data, a set of 836 sequences of observations 837 @type unlabeled_sequences: list 838 @param kwargs: additional arguments to pass to the training methods 839 """ 840 assert labelled_sequences or unlabeled_sequences 841 model = None 842 if labelled_sequences: 843 model = self.train_supervised(labelled_sequences, **kwargs) 844 if unlabeled_sequences: 845 if model: kwargs['model'] = model 846 model = self.train_unsupervised(unlabeled_sequences, **kwargs) 847 return model
848
849 - def train_unsupervised(self, unlabeled_sequences, **kwargs):
850 """ 851 Trains the HMM using the Baum-Welch algorithm to maximise the 852 probability of the data sequence. This is a variant of the EM 853 algorithm, and is unsupervised in that it doesn't need the state 854 sequences for the symbols. The code is based on 'A Tutorial on Hidden 855 Markov Models and Selected Applications in Speech Recognition', 856 Lawrence Rabiner, IEEE, 1989. 857 858 @return: the trained model 859 @rtype: HiddenMarkovModelTagger 860 @param unlabeled_sequences: the training data, a set of 861 sequences of observations 862 @type unlabeled_sequences: list 863 @param kwargs: may include the following parameters:: 864 model - a HiddenMarkovModelTagger instance used to begin 865 the Baum-Welch algorithm 866 max_iterations - the maximum number of EM iterations 867 convergence_logprob - the maximum change in log probability to 868 allow convergence 869 """ 870 871 N = len(self._states) 872 M = len(self._symbols) 873 symbol_dict = dict((self._symbols[i], i) for i in range(M)) 874 875 # create a uniform HMM, which will be iteratively refined, unless 876 # given an existing model 877 model = kwargs.get('model') 878 if not model: 879 priors = UniformProbDist(self._states) 880 transitions = DictionaryConditionalProbDist( 881 dict((state, UniformProbDist(self._states)) 882 for state in self._states)) 883 output = DictionaryConditionalProbDist( 884 dict((state, UniformProbDist(self._symbols)) 885 for state in self._states)) 886 model = HiddenMarkovModelTagger(self._symbols, self._states, 887 transitions, output, priors) 888 889 # update model prob dists so that they can be modified 890 model._priors = MutableProbDist(model._priors, self._states) 891 model._transitions = DictionaryConditionalProbDist( 892 dict((s, MutableProbDist(model._transitions[s], self._states)) 893 for s in self._states)) 894 model._outputs = DictionaryConditionalProbDist( 895 dict((s, MutableProbDist(model._outputs[s], self._symbols)) 896 for s in self._states)) 897 898 # iterate until convergence 899 converged = False 900 last_logprob = None 901 iteration = 0 902 max_iterations = kwargs.get('max_iterations', 1000) 903 epsilon = kwargs.get('convergence_logprob', 1e-6) 904 while not converged and iteration < max_iterations: 905 A_numer = ones((N, N), float64) * _NINF 906 B_numer = ones((N, M), float64) * _NINF 907 A_denom = ones(N, float64) * _NINF 908 B_denom = ones(N, float64) * _NINF 909 910 logprob = 0 911 for sequence in unlabeled_sequences: 912 sequence = list(sequence) 913 if not sequence: 914 continue 915 916 # compute forward and backward probabilities 917 alpha = model._forward_probability(sequence) 918 beta = model._backward_probability(sequence) 919 920 # find the log probability of the sequence 921 T = len(sequence) 922 lpk = _log_add(*alpha[T-1, :]) 923 logprob += lpk 924 925 # now update A and B (transition and output probabilities) 926 # using the alpha and beta values. Please refer to Rabiner's 927 # paper for details, it's too hard to explain in comments 928 local_A_numer = ones((N, N), float64) * _NINF 929 local_B_numer = ones((N, M), float64) * _NINF 930 local_A_denom = ones(N, float64) * _NINF 931 local_B_denom = ones(N, float64) * _NINF 932 933 # for each position, accumulate sums for A and B 934 for t in range(T): 935 x = sequence[t][_TEXT] #not found? FIXME 936 if t < T - 1: 937 xnext = sequence[t+1][_TEXT] #not found? FIXME 938 xi = symbol_dict[x] 939 for i in range(N): 940 si = self._states[i] 941 if t < T - 1: 942 for j in range(N): 943 sj = self._states[j] 944 local_A_numer[i, j] = \ 945 _log_add(local_A_numer[i, j], 946 alpha[t, i] + 947 model._transitions[si].logprob(sj) + 948 model._outputs[sj].logprob(xnext) + 949 beta[t+1, j]) 950 local_A_denom[i] = _log_add(local_A_denom[i], 951 alpha[t, i] + beta[t, i]) 952 else: 953 local_B_denom[i] = _log_add(local_A_denom[i], 954 alpha[t, i] + beta[t, i]) 955 956 local_B_numer[i, xi] = _log_add(local_B_numer[i, xi], 957 alpha[t, i] + beta[t, i]) 958 959 # add these sums to the global A and B values 960 for i in range(N): 961 for j in range(N): 962 A_numer[i, j] = _log_add(A_numer[i, j], 963 local_A_numer[i, j] - lpk) 964 for k in range(M): 965 B_numer[i, k] = _log_add(B_numer[i, k], 966 local_B_numer[i, k] - lpk) 967 968 A_denom[i] = _log_add(A_denom[i], local_A_denom[i] - lpk) 969 B_denom[i] = _log_add(B_denom[i], local_B_denom[i] - lpk) 970 971 # use the calculated values to update the transition and output 972 # probability values 973 for i in range(N): 974 si = self._states[i] 975 for j in range(N): 976 sj = self._states[j] 977 model._transitions[si].update(sj, A_numer[i,j] - 978 A_denom[i]) 979 for k in range(M): 980 ok = self._symbols[k] 981 model._outputs[si].update(ok, B_numer[i,k] - B_denom[i]) 982 # Rabiner says the priors don't need to be updated. I don't 983 # believe him. FIXME 984 985 # test for convergence 986 if iteration > 0 and abs(logprob - last_logprob) < epsilon: 987 converged = True 988 989 print 'iteration', iteration, 'logprob', logprob 990 iteration += 1 991 last_logprob = logprob 992 993 return model
994
995 - def train_supervised(self, labelled_sequences, **kwargs):
996 """ 997 Supervised training maximising the joint probability of the symbol and 998 state sequences. This is done via collecting frequencies of 999 transitions between states, symbol observations while within each 1000 state and which states start a sentence. These frequency distributions 1001 are then normalised into probability estimates, which can be 1002 smoothed if desired. 1003 1004 @return: the trained model 1005 @rtype: HiddenMarkovModelTagger 1006 @param labelled_sequences: the training data, a set of 1007 labelled sequences of observations 1008 @type labelled_sequences: list 1009 @param kwargs: may include an 'estimator' parameter, a function taking 1010 a C{FreqDist} and a number of bins and returning a C{ProbDistI}; 1011 otherwise a MLE estimate is used 1012 """ 1013 1014 # default to the MLE estimate 1015 estimator = kwargs.get('estimator') 1016 if estimator == None: 1017 estimator = lambda fdist, bins: MLEProbDist(fdist) 1018 1019 # count occurences of starting states, transitions out of each state 1020 # and output symbols observed in each state 1021 starting = FreqDist() 1022 transitions = ConditionalFreqDist() 1023 outputs = ConditionalFreqDist() 1024 for sequence in labelled_sequences: 1025 lasts = None 1026 for token in sequence: 1027 state = token[_TAG] 1028 symbol = token[_TEXT] 1029 if lasts == None: 1030 starting.inc(state) 1031 else: 1032 transitions[lasts].inc(state) 1033 outputs[state].inc(symbol) 1034 lasts = state 1035 1036 # update the state and symbol lists 1037 if state not in self._states: 1038 self._states.append(state) 1039 if symbol not in self._symbols: 1040 self._symbols.append(symbol) 1041 1042 # create probability distributions (with smoothing) 1043 N = len(self._states) 1044 pi = estimator(starting, N) 1045 A = ConditionalProbDist(transitions, estimator, False, N) 1046 B = ConditionalProbDist(outputs, estimator, False, len(self._symbols)) 1047 1048 return HiddenMarkovModelTagger(self._symbols, self._states, A, B, pi)
1049 1050
1051 -class HiddenMarkovModelTaggerTransform(HiddenMarkovModelTaggerTransformI):
1052 """ 1053 An abstract subclass of C{HiddenMarkovModelTaggerTransformI}. 1054 """
1055 - def __init__(self):
1056 if self.__class__ == HiddenMarkovModelTaggerTransform: 1057 raise AssertionError, "Abstract classes can't be instantiated"
1058 1059
1060 -class LambdaTransform(HiddenMarkovModelTaggerTransform):
1061 """ 1062 A subclass of C{HiddenMarkovModelTaggerTransform} that is backed by an 1063 arbitrary user-defined function, instance method, or lambda function. 1064 """
1065 - def __init__(self, transform):
1066 """ 1067 @param func: a user-defined or lambda transform function 1068 @type func: C{function} 1069 """ 1070 self._transform = transform
1071
1072 - def transform(self, labeled_symbols):
1073 return self._transform(labeled_symbols)
1074 1075
1076 -class IdentityTransform(HiddenMarkovModelTaggerTransform):
1077 """ 1078 A subclass of C{HiddenMarkovModelTaggerTransform} that implements 1079 L{transform()} as the identity function, i.e. symbols passed to 1080 C{transform()} are returned unmodified. 1081 """
1082 - def transform(self, labeled_symbols):
1083 return labeled_symbols
1084 1085
1086 -def _log_add(*values):
1087 """ 1088 Adds the logged values, returning the logarithm of the addition. 1089 """ 1090 x = max(values) 1091 if x > _NINF: 1092 sum_diffs = 0 1093 for value in values: 1094 sum_diffs += 2**(value - x) 1095 return x + log2(sum_diffs) 1096 else: 1097 return x
1098
1099 -def demo():
1100 # demonstrates HMM probability calculation 1101 1102 print 1103 print "HMM probability calculation demo" 1104 print 1105 1106 # example taken from page 381, Huang et al 1107 symbols = ['up', 'down', 'unchanged'] 1108 states = ['bull', 'bear', 'static'] 1109 1110 def pd(values, samples): 1111 d = {} 1112 for value, item in zip(values, samples): 1113 d[item] = value 1114 return DictionaryProbDist(d)
1115 1116 def cpd(array, conditions, samples): 1117 d = {} 1118 for values, condition in zip(array, conditions): 1119 d[condition] = pd(values, samples) 1120 return DictionaryConditionalProbDist(d) 1121 1122 A = array([[0.6, 0.2, 0.2], [0.5, 0.3, 0.2], [0.4, 0.1, 0.5]], float64) 1123 A = cpd(A, states, states) 1124 B = array([[0.7, 0.1, 0.2], [0.1, 0.6, 0.3], [0.3, 0.3, 0.4]], float64) 1125 B = cpd(B, states, symbols) 1126 pi = array([0.5, 0.2, 0.3], float64) 1127 pi = pd(pi, states) 1128 1129 model = HiddenMarkovModelTagger(symbols=symbols, states=states, 1130 transitions=A, outputs=B, priors=pi) 1131 1132 print 'Testing', model 1133 1134 for test in [['up', 'up'], ['up', 'down', 'up'], 1135 ['down'] * 5, ['unchanged'] * 5 + ['up']]: 1136 1137 sequence = [(t, None) for t in test] 1138 1139 print 'Testing with state sequence', test 1140 print 'probability =', model.probability(sequence) 1141 print 'tagging = ', model.tag([word for (word,tag) in sequence]) 1142 print 'p(tagged) = ', model.probability(sequence) 1143 print 'H = ', model.entropy(sequence) 1144 print 'H_exh = ', model._exhaustive_entropy(sequence) 1145 print 'H(point) = ', model.point_entropy(sequence) 1146 print 'H_exh(point)=', model._exhaustive_point_entropy(sequence) 1147 print 1148
1149 -def load_pos(num_sents):
1150 from nltk.corpus import brown 1151 1152 sentences = brown.tagged_sents(categories='a')[:num_sents] 1153 1154 tag_re = re.compile(r'[*]|--|[^+*-]+') 1155 tag_set = set() 1156 symbols = set() 1157 1158 cleaned_sentences = [] 1159 for sentence in sentences: 1160 for i in range(len(sentence)): 1161 word, tag = sentence[i] 1162 word = word.lower() # normalize 1163 symbols.add(word) # log this word 1164 # Clean up the tag. 1165 tag = tag_re.match(tag).group() 1166 tag_set.add(tag) 1167 sentence[i] = (word, tag) # store cleaned-up tagged token 1168 cleaned_sentences += [sentence] 1169 1170 return cleaned_sentences, list(tag_set), list(symbols)
1171 1172 @deprecated("Use model.test(sentences, **kwargs) instead.")
1173 -def test_pos(model, sentences, display=False):
1174 return model.test(sentences, verbose=display)
1175
1176 -def demo_pos():
1177 # demonstrates POS tagging using supervised training 1178 1179 print 1180 print "HMM POS tagging demo" 1181 print 1182 1183 print 'Training HMM...' 1184 labelled_sequences, tag_set, symbols = load_pos(200) 1185 trainer = HiddenMarkovModelTrainer(tag_set, symbols) 1186 hmm = trainer.train_supervised(labelled_sequences[10:], 1187 estimator=lambda fd, bins: LidstoneProbDist(fd, 0.1, bins)) 1188 1189 print 'Testing...' 1190 hmm.test(labelled_sequences[:10], verbose=True)
1191
1192 -def _untag(sentences):
1193 unlabeled = [] 1194 for sentence in sentences: 1195 unlabeled.append((token[_TEXT], None) for token in sentence) 1196 return unlabeled
1197
1198 -def demo_pos_bw():
1199 # demonstrates the Baum-Welch algorithm in POS tagging 1200 1201 print 1202 print "Baum-Welch demo for POS tagging" 1203 print 1204 1205 print 'Training HMM (supervised)...' 1206 sentences, tag_set, symbols = load_pos(210) 1207 symbols = set() 1208 for sentence in sentences: 1209 for token in sentence: 1210 symbols.add(token[_TEXT]) 1211 1212 trainer = HiddenMarkovModelTrainer(tag_set, list(symbols)) 1213 hmm = trainer.train_supervised(sentences[10:200], 1214 estimator=lambda fd, bins: LidstoneProbDist(fd, 0.1, bins)) 1215 print 'Training (unsupervised)...' 1216 # it's rather slow - so only use 10 samples 1217 unlabeled = _untag(sentences[200:210]) 1218 hmm = trainer.train_unsupervised(unlabeled, model=hmm, max_iterations=5) 1219 hmm.test(sentences[:10], verbose=True)
1220
1221 -def demo_bw():
1222 # demo Baum Welch by generating some sequences and then performing 1223 # unsupervised training on them 1224 1225 print 1226 print "Baum-Welch demo for market example" 1227 print 1228 1229 # example taken from page 381, Huang et al 1230 symbols = ['up', 'down', 'unchanged'] 1231 states = ['bull', 'bear', 'static'] 1232 1233 def pd(values, samples): 1234 d = {} 1235 for value, item in zip(values, samples): 1236 d[item] = value 1237 return DictionaryProbDist(d)
1238 1239 def cpd(array, conditions, samples): 1240 d = {} 1241 for values, condition in zip(array, conditions): 1242 d[condition] = pd(values, samples) 1243 return DictionaryConditionalProbDist(d) 1244 1245 A = array([[0.6, 0.2, 0.2], [0.5, 0.3, 0.2], [0.4, 0.1, 0.5]], float64) 1246 A = cpd(A, states, states) 1247 B = array([[0.7, 0.1, 0.2], [0.1, 0.6, 0.3], [0.3, 0.3, 0.4]], float64) 1248 B = cpd(B, states, symbols) 1249 pi = array([0.5, 0.2, 0.3], float64) 1250 pi = pd(pi, states) 1251 1252 model = HiddenMarkovModelTagger(symbols=symbols, states=states, 1253 transitions=A, outputs=B, priors=pi) 1254 1255 # generate some random sequences 1256 training = [] 1257 import random 1258 rng = random.Random() 1259 for i in range(10): 1260 item = model.random_sample(rng, 5) 1261 training.append((i[0], None) for i in item) 1262 1263 # train on those examples, starting with the model that generated them 1264 trainer = HiddenMarkovModelTrainer(states, symbols) 1265 hmm = trainer.train_unsupervised(training, model=model, 1266 max_iterations=1000) 1267 1268 if __name__ == '__main__': 1269 demo() 1270 demo_pos() 1271 demo_pos_bw() 1272 # demo_bw() 1273