1
2
3
4
5
6
7
8
9
10
11
12
13
14 """
15 Hidden Markov Models (HMMs) largely used to assign the correct label sequence
16 to sequential data or assess the probability of a given label and data
17 sequence. These models are finite state machines characterised by a number of
18 states, transitions between these states, and output symbols emitted while in
19 each state. The HMM is an extension to the Markov chain, where each state
20 corresponds deterministically to a given event. In the HMM the observation is
21 a probabilistic function of the state. HMMs share the Markov chain's
22 assumption, being that the probability of transition from one state to another
23 only depends on the current state - i.e. the series of states that led to the
24 current state are not used. They are also time invariant.
25
26 The HMM is a directed graph, with probability weighted edges (representing the
27 probability of a transition between the source and sink states) where each
28 vertex emits an output symbol when entered. The symbol (or observation) is
29 non-deterministically generated. For this reason, knowing that a sequence of
30 output observations was generated by a given HMM does not mean that the
31 corresponding sequence of states (and what the current state is) is known.
32 This is the 'hidden' in the hidden markov model.
33
34 Formally, a HMM can be characterised by:
35 - the output observation alphabet. This is the set of symbols which may be
36 observed as output of the system.
37 - the set of states.
38 - the transition probabilities M{a_{ij} = P(s_t = j | s_{t-1} = i)}. These
39 represent the probability of transition to each state from a given
40 state.
41 - the output probability matrix M{b_i(k) = P(X_t = o_k | s_t = i)}. These
42 represent the probability of observing each symbol in a given state.
43 - the initial state distribution. This gives the probability of starting
44 in each state.
45
46 To ground this discussion, take a common NLP application, part-of-speech (POS)
47 tagging. An HMM is desirable for this task as the highest probability tag
48 sequence can be calculated for a given sequence of word forms. This differs
49 from other tagging techniques which often tag each word individually, seeking
50 to optimise each individual tagging greedily without regard to the optimal
51 combination of tags for a larger unit, such as a sentence. The HMM does this
52 with the Viterbi algorithm, which efficiently computes the optimal path
53 through the graph given the sequence of words forms.
54
55 In POS tagging the states usually have a 1:1 correspondence with the tag
56 alphabet - i.e. each state represents a single tag. The output observation
57 alphabet is the set of word forms (the lexicon), and the remaining three
58 parameters are derived by a training regime. With this information the
59 probability of a given sentence can be easily derived, by simply summing the
60 probability of each distinct path through the model. Similarly, the highest
61 probability tagging sequence can be derived with the Viterbi algorithm,
62 yielding a state sequence which can be mapped into a tag sequence.
63
64 This discussion assumes that the HMM has been trained. This is probably the
65 most difficult task with the model, and requires either MLE estimates of the
66 parameters or unsupervised learning using the Baum-Welch algorithm, a variant
67 of EM.
68 """
69
70 import re
71 import types
72
73 from numpy import *
74
75 from nltk import FreqDist, ConditionalFreqDist, ConditionalProbDist, \
76 DictionaryProbDist, DictionaryConditionalProbDist, LidstoneProbDist, \
77 MutableProbDist, MLEProbDist
78 from nltk.internals import deprecated
79 from nltk.evaluate import accuracy as accuracy_
80 from nltk.util import LazyMap, LazyConcatenation, LazyZip
81
82 from api import *
83
84
85 _NINF = float('-1e300')
86
87 _TEXT = 0
88 _TAG = 1
89
91 """
92 Hidden Markov model class, a generative model for labelling sequence data.
93 These models define the joint probability of a sequence of symbols and
94 their labels (state transitions) as the product of the starting state
95 probability, the probability of each state transition, and the probability
96 of each observation being generated from each state. This is described in
97 more detail in the module documentation.
98
99 This implementation is based on the HMM description in Chapter 8, Huang,
100 Acero and Hon, Spoken Language Processing and includes an extension for
101 training shallow HMM parsers or specializaed HMMs as in Molina et.
102 al, 2002. A specialized HMM modifies training data by applying a
103 specialization function to create a new training set that is more
104 appropriate for sequential tagging with an HMM. A typical use case is
105 chunking.
106 """
107 - def __init__(self, symbols, states, transitions, outputs, priors, **kwargs):
108 """
109 Creates a hidden markov model parametised by the the states,
110 transition probabilities, output probabilities and priors.
111
112 @param symbols: the set of output symbols (alphabet)
113 @type symbols: seq of any
114 @param states: a set of states representing state space
115 @type states: seq of any
116 @param transitions: transition probabilities; Pr(s_i | s_j) is the
117 probability of transition from state i given the model is in
118 state_j
119 @type transitions: C{ConditionalProbDistI}
120 @param outputs: output probabilities; Pr(o_k | s_i) is the probability
121 of emitting symbol k when entering state i
122 @type outputs: C{ConditionalProbDistI}
123 @param priors: initial state distribution; Pr(s_i) is the probability
124 of starting in state i
125 @type priors: C{ProbDistI}
126 @kwparam transform: an optional function for transforming training
127 instances, defaults to the identity function.
128 @type transform: C{function} or C{HiddenMarkovModelTaggerTransform}
129 """
130 self._states = states
131 self._transitions = transitions
132 self._symbols = symbols
133 self._outputs = outputs
134 self._priors = priors
135 self._cache = None
136
137 self._transform = kwargs.get('transform', IdentityTransform())
138 if isinstance(self._transform, types.FunctionType):
139 self._transform = LambdaTransform(self._transform)
140 elif not isinstance(self._transform,
141 HiddenMarkovModelTaggerTransformI):
142 raise
143
144 @classmethod
145 - def _train(cls, labeled_sequence, test_sequence=None,
146 unlabeled_sequence=None, **kwargs):
147 transform = kwargs.get('transform', IdentityTransform())
148 if isinstance(transform, types.FunctionType):
149 transform = LambdaTransform(transform)
150 elif \
151 not isinstance(transform, HiddenMarkovModelTaggerTransformI):
152 raise
153
154 estimator = kwargs.get('estimator', lambda fd, bins: \
155 LidstoneProbDist(fd, 0.1, bins))
156
157 labeled_sequence = LazyMap(transform.transform, labeled_sequence)
158 symbols = list(set(word for sent in labeled_sequence
159 for word, tag in sent))
160 tag_set = list(set(tag for sent in labeled_sequence
161 for word, tag in sent))
162
163 trainer = HiddenMarkovModelTrainer(tag_set, symbols)
164 hmm = trainer.train_supervised(labeled_sequence, estimator=estimator)
165 hmm = cls(hmm._symbols, hmm._states, hmm._transitions, hmm._outputs,
166 hmm._priors, transform=transform)
167
168 if test_sequence:
169 hmm.test(test_sequence, verbose=kwargs.get('verbose', False))
170
171 if unlabeled_sequence:
172 max_iterations = kwargs.get('max_iterations', 5)
173 hmm = trainer.train_unsupervised(unlabeled_sequence, model=hmm,
174 max_iterations=max_iterations)
175 if test_sequence:
176 hmm.test(test_sequence, verbose=kwargs.get('verbose', False))
177
178 return hmm
179
180 @classmethod
181 - def train(cls, labeled_sequence, test_sequence=None,
182 unlabeled_sequence=None, **kwargs):
183 """
184 Train a new C{HiddenMarkovModelTagger} using the given labeled and
185 unlabeled training instances. Testing will be performed if test
186 instances are provided.
187
188 @return: a hidden markov model tagger
189 @rtype: C{HiddenMarkovModelTagger}
190 @param labeled_sequence: a sequence of labeled training instances,
191 i.e. a list of sentences represented as tuples
192 @type labeled_sequence: C{list} of C{list}
193 @param test_sequence: a sequence of labeled test instances
194 @type test_sequence: C{list} of C{list}
195 @param unlabeled_sequence: a sequence of unlabeled training instances,
196 i.e. a list of sentences represented as words
197 @type unlabeled_sequence: C{list} of C{list}
198 @kwparam transform: an optional function for transforming training
199 instances, defaults to the identity function, see L{transform()}
200 @type transform: C{function}
201 @kwparam estimator: an optional function or class that maps a
202 condition's frequency distribution to its probability
203 distribution, defaults to a Lidstone distribution with gamma = 0.1
204 @type estimator: C{class} or C{function}
205 @kwparam verbose: boolean flag indicating whether training should be
206 verbose or include printed output
207 @type verbose: C{bool}
208 @kwparam max_iterations: number of Baum-Welch interations to perform
209 @type max_iterations: C{int}
210 """
211 return cls._train(labeled_sequence, test_sequence,
212 unlabeled_sequence, **kwargs)
213
215 """
216 Returns the probability of the given symbol sequence. If the sequence
217 is labelled, then returns the joint probability of the symbol, state
218 sequence. Otherwise, uses the forward algorithm to find the
219 probability over all label sequences.
220
221 @return: the probability of the sequence
222 @rtype: float
223 @param sequence: the sequence of symbols which must contain the TEXT
224 property, and optionally the TAG property
225 @type sequence: Token
226 """
227 return 2**(self.log_probability(self._transform.transform(sequence)))
228
230 """
231 Returns the log-probability of the given symbol sequence. If the
232 sequence is labelled, then returns the joint log-probability of the
233 symbol, state sequence. Otherwise, uses the forward algorithm to find
234 the log-probability over all label sequences.
235
236 @return: the log-probability of the sequence
237 @rtype: float
238 @param sequence: the sequence of symbols which must contain the TEXT
239 property, and optionally the TAG property
240 @type sequence: Token
241 """
242 sequence = self._transform.transform(sequence)
243
244 T = len(sequence)
245 N = len(self._states)
246
247 if T > 0 and sequence[0][_TAG]:
248 last_state = sequence[0][_TAG]
249 p = self._priors.logprob(last_state) + \
250 self._outputs[last_state].logprob(sequence[0][_TEXT])
251 for t in range(1, T):
252 state = sequence[t][_TAG]
253 p += self._transitions[last_state].logprob(state) + \
254 self._outputs[state].logprob(sequence[t][_TEXT])
255 last_state = state
256 return p
257 else:
258 alpha = self._forward_probability(sequence)
259 p = _log_add(*alpha[T-1, :])
260 return p
261
262 - def tag(self, unlabeled_sequence):
263 """
264 Tags the sequence with the highest probability state sequence. This
265 uses the best_path method to find the Viterbi path.
266
267 @return: a labelled sequence of symbols
268 @rtype: list
269 @param unlabeled_sequence: the sequence of unlabeled symbols
270 @type unlabeled_sequence: list
271 """
272 unlabeled_sequence = self._transform.transform(unlabeled_sequence)
273 return self._tag(unlabeled_sequence)
274
275 - def _tag(self, unlabeled_sequence):
278
280 """
281 @return: the log probability of the symbol being observed in the given
282 state
283 @rtype: float
284 """
285 return self._outputs[state].logprob(symbol)
286
288 """
289 The cache is a tuple (P, O, X, S) where:
290
291 - S maps symbols to integers. I.e., it is the inverse
292 mapping from self._symbols; for each symbol s in
293 self._symbols, the following is true::
294
295 self._symbols[S[s]] == s
296
297 - O is the log output probabilities::
298
299 O[i,k] = log( P(token[t]=sym[k]|tag[t]=state[i]) )
300
301 - X is the log transition probabilities::
302
303 X[i,j] = log( P(tag[t]=state[j]|tag[t-1]=state[i]) )
304
305 - P is the log prior probabilities::
306
307 P[i] = log( P(tag[0]=state[i]) )
308 """
309 if not self._cache:
310 N = len(self._states)
311 M = len(self._symbols)
312 P = zeros(N, float32)
313 X = zeros((N, N), float32)
314 O = zeros((N, M), float32)
315 for i in range(N):
316 si = self._states[i]
317 P[i] = self._priors.logprob(si)
318 for j in range(N):
319 X[i, j] = self._transitions[si].logprob(self._states[j])
320 for k in range(M):
321 O[i, k] = self._outputs[si].logprob(self._symbols[k])
322 S = {}
323 for k in range(M):
324 S[self._symbols[k]] = k
325 self._cache = (P, O, X, S)
326
328
329
330 if symbols:
331 self._create_cache()
332 P, O, X, S = self._cache
333 for symbol in symbols:
334 if symbol not in self._symbols:
335 self._cache = None
336 self._symbols.append(symbol)
337
338 if not self._cache:
339 N = len(self._states)
340 M = len(self._symbols)
341 Q = O.shape[1]
342
343
344 O = hstack([O, zeros((N, M - Q), float32)])
345 for i in range(N):
346 si = self._states[i]
347
348 for k in range(Q, M):
349 O[i, k] = self._outputs[si].logprob(self._symbols[k])
350
351 for k in range(Q, M):
352 S[self._symbols[k]] = k
353 self._cache = (P, O, X, S)
354
356 """
357 Returns the state sequence of the optimal (most probable) path through
358 the HMM. Uses the Viterbi algorithm to calculate this part by dynamic
359 programming.
360
361 @return: the state sequence
362 @rtype: sequence of any
363 @param unlabeled_sequence: the sequence of unlabeled symbols
364 @type unlabeled_sequence: list
365 """
366 unlabeled_sequence = self._transform.transform(unlabeled_sequence)
367 return self._best_path(unlabeled_sequence)
368
370 T = len(unlabeled_sequence)
371 N = len(self._states)
372 self._create_cache()
373 self._update_cache(unlabeled_sequence)
374 P, O, X, S = self._cache
375
376 V = zeros((T, N), float32)
377 B = ones((T, N), int) * -1
378
379 V[0] = P + O[:, S[unlabeled_sequence[0]]]
380 for t in range(1, T):
381 for j in range(N):
382 vs = V[t-1, :] + X[:, j]
383 best = argmax(vs)
384 V[t, j] = vs[best] + O[j, S[unlabeled_sequence[t]]]
385 B[t, j] = best
386
387 current = argmax(V[T-1,:])
388 sequence = [current]
389 for t in range(T-1, 0, -1):
390 last = B[t, current]
391 sequence.append(last)
392 current = last
393
394 sequence.reverse()
395 return map(self._states.__getitem__, sequence)
396
398 """
399 Returns the state sequence of the optimal (most probable) path through
400 the HMM. Uses the Viterbi algorithm to calculate this part by dynamic
401 programming. This uses a simple, direct method, and is included for
402 teaching purposes.
403
404 @return: the state sequence
405 @rtype: sequence of any
406 @param unlabeled_sequence: the sequence of unlabeled symbols
407 @type unlabeled_sequence: list
408 """
409 unlabeled_sequence = self._transform.transform(unlabeled_sequence)
410 return self._best_path_simple(unlabeled_sequence)
411
413 T = len(unlabeled_sequence)
414 N = len(self._states)
415 V = zeros((T, N), float64)
416 B = {}
417
418
419 symbol = unlabeled_sequence[0]
420 for i, state in enumerate(self._states):
421 V[0, i] = self._priors.logprob(state) + \
422 self._output_logprob(state, symbol)
423 B[0, state] = None
424
425
426 for t in range(1, T):
427 symbol = unlabeled_sequence[t]
428 for j in range(N):
429 sj = self._states[j]
430 best = None
431 for i in range(N):
432 si = self._states[i]
433 va = V[t-1, i] + self._transitions[si].logprob(sj)
434 if not best or va > best[0]:
435 best = (va, si)
436 V[t, j] = best[0] + self._output_logprob(sj, symbol)
437 B[t, sj] = best[1]
438
439
440 best = None
441 for i in range(N):
442 val = V[T-1, i]
443 if not best or val > best[0]:
444 best = (val, self._states[i])
445
446
447 current = best[1]
448 sequence = [current]
449 for t in range(T-1, 0, -1):
450 last = B[t, current]
451 sequence.append(last)
452 current = last
453
454 sequence.reverse()
455 return sequence
456
458 """
459 Randomly sample the HMM to generate a sentence of a given length. This
460 samples the prior distribution then the observation distribution and
461 transition distribution for each subsequent observation and state.
462 This will mostly generate unintelligible garbage, but can provide some
463 amusement.
464
465 @return: the randomly created state/observation sequence,
466 generated according to the HMM's probability
467 distributions. The SUBTOKENS have TEXT and TAG
468 properties containing the observation and state
469 respectively.
470 @rtype: list
471 @param rng: random number generator
472 @type rng: Random (or any object with a random() method)
473 @param length: desired output length
474 @type length: int
475 """
476
477
478 tokens = []
479 state = self._sample_probdist(self._priors, rng.random(), self._states)
480 symbol = self._sample_probdist(self._outputs[state],
481 rng.random(), self._symbols)
482 tokens.append((symbol, state))
483
484 for i in range(1, length):
485
486 state = self._sample_probdist(self._transitions[state],
487 rng.random(), self._states)
488 symbol = self._sample_probdist(self._outputs[state],
489 rng.random(), self._symbols)
490 tokens.append((symbol, state))
491
492 return tokens
493
495 cum_p = 0
496 for sample in samples:
497 add_p = probdist.prob(sample)
498 if cum_p <= p <= cum_p + add_p:
499 return sample
500 cum_p += add_p
501 raise Exception('Invalid probability distribution - '
502 'does not sum to one')
503
504 - def entropy(self, unlabeled_sequence):
505 """
506 Returns the entropy over labellings of the given sequence. This is
507 given by::
508
509 H(O) = - sum_S Pr(S | O) log Pr(S | O)
510
511 where the summation ranges over all state sequences, S. Let M{Z =
512 Pr(O) = sum_S Pr(S, O)} where the summation ranges over all state
513 sequences and O is the observation sequence. As such the entropy can
514 be re-expressed as::
515
516 H = - sum_S Pr(S | O) log [ Pr(S, O) / Z ]
517 = log Z - sum_S Pr(S | O) log Pr(S, 0)
518 = log Z - sum_S Pr(S | O) [ log Pr(S_0) + sum_t Pr(S_t | S_{t-1})
519 + sum_t Pr(O_t | S_t) ]
520
521 The order of summation for the log terms can be flipped, allowing
522 dynamic programming to be used to calculate the entropy. Specifically,
523 we use the forward and backward probabilities (alpha, beta) giving::
524
525 H = log Z - sum_s0 alpha_0(s0) beta_0(s0) / Z * log Pr(s0)
526 + sum_t,si,sj alpha_t(si) Pr(sj | si) Pr(O_t+1 | sj) beta_t(sj)
527 / Z * log Pr(sj | si)
528 + sum_t,st alpha_t(st) beta_t(st) / Z * log Pr(O_t | st)
529
530 This simply uses alpha and beta to find the probabilities of partial
531 sequences, constrained to include the given state(s) at some point in
532 time.
533 """
534 unlabeled_sequence = self._transform.transform(unlabeled_sequence)
535
536 T = len(unlabeled_sequence)
537 N = len(self._states)
538
539 alpha = self._forward_probability(unlabeled_sequence)
540 beta = self._backward_probability(unlabeled_sequence)
541 normalisation = _log_add(*alpha[T-1, :])
542
543 entropy = normalisation
544
545
546 for i, state in enumerate(self._states):
547 p = 2**(alpha[0, i] + beta[0, i] - normalisation)
548 entropy -= p * self._priors.logprob(state)
549
550
551
552 for t0 in range(T - 1):
553 t1 = t0 + 1
554 for i0, s0 in enumerate(self._states):
555 for i1, s1 in enumerate(self._states):
556 p = 2**(alpha[t0, i0] + self._transitions[s0].logprob(s1) +
557 self._outputs[s1].logprob(
558 unlabeled_sequence[t1][_TEXT]) +
559 beta[t1, i1] - normalisation)
560 entropy -= p * self._transitions[s0].logprob(s1)
561
562
563
564 for t in range(T):
565 for i, state in enumerate(self._states):
566 p = 2**(alpha[t, i] + beta[t, i] - normalisation)
567 entropy -= p * self._outputs[state].logprob(
568 unlabeled_sequence[t][_TEXT])
569
570
571 return entropy
572
574 """
575 Returns the pointwise entropy over the possible states at each
576 position in the chain, given the observation sequence.
577 """
578 unlabeled_sequence = self._transform.transform(unlabeled_sequence)
579
580 T = len(unlabeled_sequence)
581 N = len(self._states)
582
583 alpha = self._forward_probability(unlabeled_sequence)
584 beta = self._backward_probability(unlabeled_sequence)
585 normalisation = _log_add(*alpha[T-1, :])
586
587 entropies = zeros(T, float64)
588 probs = zeros(N, float64)
589 for t in range(T):
590 for s in range(N):
591 probs[s] = alpha[t, s] + beta[t, s] - normalisation
592
593 for s in range(N):
594 entropies[t] -= 2**(probs[s]) * probs[s]
595
596 return entropies
597
599 unlabeled_sequence = self._transform.transform(unlabeled_sequence)
600
601 T = len(unlabeled_sequence)
602 N = len(self._states)
603
604 labellings = [[state] for state in self._states]
605 for t in range(T - 1):
606 current = labellings
607 labellings = []
608 for labelling in current:
609 for state in self._states:
610 labellings.append(labelling + [state])
611
612 log_probs = []
613 for labelling in labellings:
614 labelled_sequence = unlabeled_sequence[:]
615 for t, label in enumerate(labelling):
616 labelled_sequence[t] = (labelled_sequence[t][_TEXT], label)
617 lp = self.log_probability(labelled_sequence)
618 log_probs.append(lp)
619 normalisation = _log_add(*log_probs)
620
621
622
623
624
625
626
627
628
629
630 entropy = 0
631 for lp in log_probs:
632 lp -= normalisation
633 entropy -= 2**(lp) * lp
634
635 return entropy
636
638 unlabeled_sequence = self._transform.transform(unlabeled_sequence)
639
640 T = len(unlabeled_sequence)
641 N = len(self._states)
642
643 labellings = [[state] for state in self._states]
644 for t in range(T - 1):
645 current = labellings
646 labellings = []
647 for labelling in current:
648 for state in self._states:
649 labellings.append(labelling + [state])
650
651 log_probs = []
652 for labelling in labellings:
653 labelled_sequence = unlabeled_sequence[:]
654 for t, label in enumerate(labelling):
655 labelled_sequence[t] = (labelled_sequence[t][_TEXT], label)
656 lp = self.log_probability(labelled_sequence)
657 log_probs.append(lp)
658
659 normalisation = _log_add(*log_probs)
660
661 probabilities = zeros((T, N), float64)
662 probabilities[:] = _NINF
663 for labelling, lp in zip(labellings, log_probs):
664 lp -= normalisation
665 for t, label in enumerate(labelling):
666 index = self._states.index(label)
667 probabilities[t, index] = _log_add(probabilities[t, index], lp)
668
669 entropies = zeros(T, float64)
670 for t in range(T):
671 for s in range(N):
672 entropies[t] -= 2**(probabilities[t, s]) * probabilities[t, s]
673
674 return entropies
675
677 """
678 Return the forward probability matrix, a T by N array of
679 log-probabilities, where T is the length of the sequence and N is the
680 number of states. Each entry (t, s) gives the probability of being in
681 state s at time t after observing the partial symbol sequence up to
682 and including t.
683
684 @param unlabeled_sequence: the sequence of unlabeled symbols
685 @type unlabeled_sequence: list
686 @return: the forward log probability matrix
687 @rtype: array
688 """
689 T = len(unlabeled_sequence)
690 N = len(self._states)
691 alpha = zeros((T, N), float64)
692
693 symbol = unlabeled_sequence[0][_TEXT]
694 for i, state in enumerate(self._states):
695 alpha[0, i] = self._priors.logprob(state) + \
696 self._outputs[state].logprob(symbol)
697 for t in range(1, T):
698 symbol = unlabeled_sequence[t][_TEXT]
699 for i, si in enumerate(self._states):
700 alpha[t, i] = _NINF
701 for j, sj in enumerate(self._states):
702 alpha[t, i] = _log_add(alpha[t, i], alpha[t-1, j] +
703 self._transitions[sj].logprob(si))
704 alpha[t, i] += self._outputs[si].logprob(symbol)
705
706 return alpha
707
709 """
710 Return the backward probability matrix, a T by N array of
711 log-probabilities, where T is the length of the sequence and N is the
712 number of states. Each entry (t, s) gives the probability of being in
713 state s at time t after observing the partial symbol sequence from t
714 .. T.
715
716 @return: the backward log probability matrix
717 @rtype: array
718 @param unlabeled_sequence: the sequence of unlabeled symbols
719 @type unlabeled_sequence: list
720 """
721 T = len(unlabeled_sequence)
722 N = len(self._states)
723 beta = zeros((T, N), float64)
724
725
726 beta[T-1, :] = log2(1)
727
728
729 for t in range(T-2, -1, -1):
730 symbol = unlabeled_sequence[t+1][_TEXT]
731 for i, si in enumerate(self._states):
732 beta[t, i] = _NINF
733 for j, sj in enumerate(self._states):
734 beta[t, i] = _log_add(beta[t, i],
735 self._transitions[si].logprob(sj) +
736 self._outputs[sj].logprob(symbol) +
737 beta[t + 1, j])
738
739 return beta
740
741 - def test(self, test_sequence, **kwargs):
742 """
743 Tests the C{HiddenMarkovModelTagger} instance.
744
745 @param test_sequence: a sequence of labeled test instances
746 @type test_sequence: C{list} of C{list}
747 @kwparam verbose: boolean flag indicating whether training should be
748 verbose or include printed output
749 @type verbose: C{bool}
750 """
751
752 def words(sent):
753 return [word for (word, tag) in sent]
754
755 def tags(sent):
756 return [tag for (word, tag) in sent]
757
758 test_sequence = LazyMap(self._transform.transform, test_sequence)
759 predicted_sequence = LazyMap(self._tag, LazyMap(words, test_sequence))
760
761 if kwargs.get('verbose', False):
762
763
764 test_sequence = list(test_sequence)
765 predicted_sequence = list(predicted_sequence)
766
767 for sent in predicted_sequence:
768 print 'Test:', \
769 ' '.join(['%s/%s' % (str(token), str(tag))
770 for (token, tag) in sent])
771 print
772 print 'Untagged:', \
773 ' '.join([str(token) for (token, tag) in sent])
774 print
775 print 'HMM-tagged:', \
776 ' '.join(['%s/%s' % (str(token), str(tag))
777 for (token, tag) in sent])
778 print
779 print 'Entropy:', \
780 self.entropy([(token, None) for (token, tag) in sent])
781 print
782 print '-' * 60
783
784 test_tags = LazyConcatenation(LazyMap(tags, test_sequence))
785 predicted_tags = LazyConcatenation(LazyMap(tags, predicted_sequence))
786
787 acc = accuracy_(test_tags, predicted_tags)
788
789 count = sum([len(sent) for sent in test_sequence])
790
791 print 'accuracy over %d tokens: %.2f' % (count, acc * 100)
792
794 return ('<HiddenMarkovModelTagger %d states and %d output symbols>'
795 % (len(self._states), len(self._symbols)))
796
797
799 """
800 Algorithms for learning HMM parameters from training data. These include
801 both supervised learning (MLE) and unsupervised learning (Baum-Welch).
802 """
803 - def __init__(self, states=None, symbols=None):
804 """
805 Creates an HMM trainer to induce an HMM with the given states and
806 output symbol alphabet. A supervised and unsupervised training
807 method may be used. If either of the states or symbols are not given,
808 these may be derived from supervised training.
809
810 @param states: the set of state labels
811 @type states: sequence of any
812 @param symbols: the set of observation symbols
813 @type symbols: sequence of any
814 """
815 if states:
816 self._states = states
817 else:
818 self._states = []
819 if symbols:
820 self._symbols = symbols
821 else:
822 self._symbols = []
823
824 - def train(self, labelled_sequences=None, unlabeled_sequences=None,
825 **kwargs):
826 """
827 Trains the HMM using both (or either of) supervised and unsupervised
828 techniques.
829
830 @return: the trained model
831 @rtype: HiddenMarkovModelTagger
832 @param labelled_sequences: the supervised training data, a set of
833 labelled sequences of observations
834 @type labelled_sequences: list
835 @param unlabeled_sequences: the unsupervised training data, a set of
836 sequences of observations
837 @type unlabeled_sequences: list
838 @param kwargs: additional arguments to pass to the training methods
839 """
840 assert labelled_sequences or unlabeled_sequences
841 model = None
842 if labelled_sequences:
843 model = self.train_supervised(labelled_sequences, **kwargs)
844 if unlabeled_sequences:
845 if model: kwargs['model'] = model
846 model = self.train_unsupervised(unlabeled_sequences, **kwargs)
847 return model
848
850 """
851 Trains the HMM using the Baum-Welch algorithm to maximise the
852 probability of the data sequence. This is a variant of the EM
853 algorithm, and is unsupervised in that it doesn't need the state
854 sequences for the symbols. The code is based on 'A Tutorial on Hidden
855 Markov Models and Selected Applications in Speech Recognition',
856 Lawrence Rabiner, IEEE, 1989.
857
858 @return: the trained model
859 @rtype: HiddenMarkovModelTagger
860 @param unlabeled_sequences: the training data, a set of
861 sequences of observations
862 @type unlabeled_sequences: list
863 @param kwargs: may include the following parameters::
864 model - a HiddenMarkovModelTagger instance used to begin
865 the Baum-Welch algorithm
866 max_iterations - the maximum number of EM iterations
867 convergence_logprob - the maximum change in log probability to
868 allow convergence
869 """
870
871 N = len(self._states)
872 M = len(self._symbols)
873 symbol_dict = dict((self._symbols[i], i) for i in range(M))
874
875
876
877 model = kwargs.get('model')
878 if not model:
879 priors = UniformProbDist(self._states)
880 transitions = DictionaryConditionalProbDist(
881 dict((state, UniformProbDist(self._states))
882 for state in self._states))
883 output = DictionaryConditionalProbDist(
884 dict((state, UniformProbDist(self._symbols))
885 for state in self._states))
886 model = HiddenMarkovModelTagger(self._symbols, self._states,
887 transitions, output, priors)
888
889
890 model._priors = MutableProbDist(model._priors, self._states)
891 model._transitions = DictionaryConditionalProbDist(
892 dict((s, MutableProbDist(model._transitions[s], self._states))
893 for s in self._states))
894 model._outputs = DictionaryConditionalProbDist(
895 dict((s, MutableProbDist(model._outputs[s], self._symbols))
896 for s in self._states))
897
898
899 converged = False
900 last_logprob = None
901 iteration = 0
902 max_iterations = kwargs.get('max_iterations', 1000)
903 epsilon = kwargs.get('convergence_logprob', 1e-6)
904 while not converged and iteration < max_iterations:
905 A_numer = ones((N, N), float64) * _NINF
906 B_numer = ones((N, M), float64) * _NINF
907 A_denom = ones(N, float64) * _NINF
908 B_denom = ones(N, float64) * _NINF
909
910 logprob = 0
911 for sequence in unlabeled_sequences:
912 sequence = list(sequence)
913 if not sequence:
914 continue
915
916
917 alpha = model._forward_probability(sequence)
918 beta = model._backward_probability(sequence)
919
920
921 T = len(sequence)
922 lpk = _log_add(*alpha[T-1, :])
923 logprob += lpk
924
925
926
927
928 local_A_numer = ones((N, N), float64) * _NINF
929 local_B_numer = ones((N, M), float64) * _NINF
930 local_A_denom = ones(N, float64) * _NINF
931 local_B_denom = ones(N, float64) * _NINF
932
933
934 for t in range(T):
935 x = sequence[t][_TEXT]
936 if t < T - 1:
937 xnext = sequence[t+1][_TEXT]
938 xi = symbol_dict[x]
939 for i in range(N):
940 si = self._states[i]
941 if t < T - 1:
942 for j in range(N):
943 sj = self._states[j]
944 local_A_numer[i, j] = \
945 _log_add(local_A_numer[i, j],
946 alpha[t, i] +
947 model._transitions[si].logprob(sj) +
948 model._outputs[sj].logprob(xnext) +
949 beta[t+1, j])
950 local_A_denom[i] = _log_add(local_A_denom[i],
951 alpha[t, i] + beta[t, i])
952 else:
953 local_B_denom[i] = _log_add(local_A_denom[i],
954 alpha[t, i] + beta[t, i])
955
956 local_B_numer[i, xi] = _log_add(local_B_numer[i, xi],
957 alpha[t, i] + beta[t, i])
958
959
960 for i in range(N):
961 for j in range(N):
962 A_numer[i, j] = _log_add(A_numer[i, j],
963 local_A_numer[i, j] - lpk)
964 for k in range(M):
965 B_numer[i, k] = _log_add(B_numer[i, k],
966 local_B_numer[i, k] - lpk)
967
968 A_denom[i] = _log_add(A_denom[i], local_A_denom[i] - lpk)
969 B_denom[i] = _log_add(B_denom[i], local_B_denom[i] - lpk)
970
971
972
973 for i in range(N):
974 si = self._states[i]
975 for j in range(N):
976 sj = self._states[j]
977 model._transitions[si].update(sj, A_numer[i,j] -
978 A_denom[i])
979 for k in range(M):
980 ok = self._symbols[k]
981 model._outputs[si].update(ok, B_numer[i,k] - B_denom[i])
982
983
984
985
986 if iteration > 0 and abs(logprob - last_logprob) < epsilon:
987 converged = True
988
989 print 'iteration', iteration, 'logprob', logprob
990 iteration += 1
991 last_logprob = logprob
992
993 return model
994
996 """
997 Supervised training maximising the joint probability of the symbol and
998 state sequences. This is done via collecting frequencies of
999 transitions between states, symbol observations while within each
1000 state and which states start a sentence. These frequency distributions
1001 are then normalised into probability estimates, which can be
1002 smoothed if desired.
1003
1004 @return: the trained model
1005 @rtype: HiddenMarkovModelTagger
1006 @param labelled_sequences: the training data, a set of
1007 labelled sequences of observations
1008 @type labelled_sequences: list
1009 @param kwargs: may include an 'estimator' parameter, a function taking
1010 a C{FreqDist} and a number of bins and returning a C{ProbDistI};
1011 otherwise a MLE estimate is used
1012 """
1013
1014
1015 estimator = kwargs.get('estimator')
1016 if estimator == None:
1017 estimator = lambda fdist, bins: MLEProbDist(fdist)
1018
1019
1020
1021 starting = FreqDist()
1022 transitions = ConditionalFreqDist()
1023 outputs = ConditionalFreqDist()
1024 for sequence in labelled_sequences:
1025 lasts = None
1026 for token in sequence:
1027 state = token[_TAG]
1028 symbol = token[_TEXT]
1029 if lasts == None:
1030 starting.inc(state)
1031 else:
1032 transitions[lasts].inc(state)
1033 outputs[state].inc(symbol)
1034 lasts = state
1035
1036
1037 if state not in self._states:
1038 self._states.append(state)
1039 if symbol not in self._symbols:
1040 self._symbols.append(symbol)
1041
1042
1043 N = len(self._states)
1044 pi = estimator(starting, N)
1045 A = ConditionalProbDist(transitions, estimator, False, N)
1046 B = ConditionalProbDist(outputs, estimator, False, len(self._symbols))
1047
1048 return HiddenMarkovModelTagger(self._symbols, self._states, A, B, pi)
1049
1050
1058
1059
1074
1075
1084
1085
1087 """
1088 Adds the logged values, returning the logarithm of the addition.
1089 """
1090 x = max(values)
1091 if x > _NINF:
1092 sum_diffs = 0
1093 for value in values:
1094 sum_diffs += 2**(value - x)
1095 return x + log2(sum_diffs)
1096 else:
1097 return x
1098
1100
1101
1102 print
1103 print "HMM probability calculation demo"
1104 print
1105
1106
1107 symbols = ['up', 'down', 'unchanged']
1108 states = ['bull', 'bear', 'static']
1109
1110 def pd(values, samples):
1111 d = {}
1112 for value, item in zip(values, samples):
1113 d[item] = value
1114 return DictionaryProbDist(d)
1115
1116 def cpd(array, conditions, samples):
1117 d = {}
1118 for values, condition in zip(array, conditions):
1119 d[condition] = pd(values, samples)
1120 return DictionaryConditionalProbDist(d)
1121
1122 A = array([[0.6, 0.2, 0.2], [0.5, 0.3, 0.2], [0.4, 0.1, 0.5]], float64)
1123 A = cpd(A, states, states)
1124 B = array([[0.7, 0.1, 0.2], [0.1, 0.6, 0.3], [0.3, 0.3, 0.4]], float64)
1125 B = cpd(B, states, symbols)
1126 pi = array([0.5, 0.2, 0.3], float64)
1127 pi = pd(pi, states)
1128
1129 model = HiddenMarkovModelTagger(symbols=symbols, states=states,
1130 transitions=A, outputs=B, priors=pi)
1131
1132 print 'Testing', model
1133
1134 for test in [['up', 'up'], ['up', 'down', 'up'],
1135 ['down'] * 5, ['unchanged'] * 5 + ['up']]:
1136
1137 sequence = [(t, None) for t in test]
1138
1139 print 'Testing with state sequence', test
1140 print 'probability =', model.probability(sequence)
1141 print 'tagging = ', model.tag([word for (word,tag) in sequence])
1142 print 'p(tagged) = ', model.probability(sequence)
1143 print 'H = ', model.entropy(sequence)
1144 print 'H_exh = ', model._exhaustive_entropy(sequence)
1145 print 'H(point) = ', model.point_entropy(sequence)
1146 print 'H_exh(point)=', model._exhaustive_point_entropy(sequence)
1147 print
1148
1171
1172 @deprecated("Use model.test(sentences, **kwargs) instead.")
1173 -def test_pos(model, sentences, display=False):
1175
1191
1197
1199
1200
1201 print
1202 print "Baum-Welch demo for POS tagging"
1203 print
1204
1205 print 'Training HMM (supervised)...'
1206 sentences, tag_set, symbols = load_pos(210)
1207 symbols = set()
1208 for sentence in sentences:
1209 for token in sentence:
1210 symbols.add(token[_TEXT])
1211
1212 trainer = HiddenMarkovModelTrainer(tag_set, list(symbols))
1213 hmm = trainer.train_supervised(sentences[10:200],
1214 estimator=lambda fd, bins: LidstoneProbDist(fd, 0.1, bins))
1215 print 'Training (unsupervised)...'
1216
1217 unlabeled = _untag(sentences[200:210])
1218 hmm = trainer.train_unsupervised(unlabeled, model=hmm, max_iterations=5)
1219 hmm.test(sentences[:10], verbose=True)
1220
1222
1223
1224
1225 print
1226 print "Baum-Welch demo for market example"
1227 print
1228
1229
1230 symbols = ['up', 'down', 'unchanged']
1231 states = ['bull', 'bear', 'static']
1232
1233 def pd(values, samples):
1234 d = {}
1235 for value, item in zip(values, samples):
1236 d[item] = value
1237 return DictionaryProbDist(d)
1238
1239 def cpd(array, conditions, samples):
1240 d = {}
1241 for values, condition in zip(array, conditions):
1242 d[condition] = pd(values, samples)
1243 return DictionaryConditionalProbDist(d)
1244
1245 A = array([[0.6, 0.2, 0.2], [0.5, 0.3, 0.2], [0.4, 0.1, 0.5]], float64)
1246 A = cpd(A, states, states)
1247 B = array([[0.7, 0.1, 0.2], [0.1, 0.6, 0.3], [0.3, 0.3, 0.4]], float64)
1248 B = cpd(B, states, symbols)
1249 pi = array([0.5, 0.2, 0.3], float64)
1250 pi = pd(pi, states)
1251
1252 model = HiddenMarkovModelTagger(symbols=symbols, states=states,
1253 transitions=A, outputs=B, priors=pi)
1254
1255
1256 training = []
1257 import random
1258 rng = random.Random()
1259 for i in range(10):
1260 item = model.random_sample(rng, 5)
1261 training.append((i[0], None) for i in item)
1262
1263
1264 trainer = HiddenMarkovModelTrainer(states, symbols)
1265 hmm = trainer.train_unsupervised(training, model=model,
1266 max_iterations=1000)
1267
1268 if __name__ == '__main__':
1269 demo()
1270 demo_pos()
1271 demo_pos_bw()
1272
1273