1
2
3
4
5
6
7
8
9
10
11
12
13
14 """
15 Hidden Markov Models (HMMs) largely used to assign the correct label sequence
16 to sequential data or assess the probability of a given label and data
17 sequence. These models are finite state machines characterised by a number of
18 states, transitions between these states, and output symbols emitted while in
19 each state. The HMM is an extension to the Markov chain, where each state
20 corresponds deterministically to a given event. In the HMM the observation is
21 a probabilistic function of the state. HMMs share the Markov chain's
22 assumption, being that the probability of transition from one state to another
23 only depends on the current state - i.e. the series of states that led to the
24 current state are not used. They are also time invariant.
25
26 The HMM is a directed graph, with probability weighted edges (representing the
27 probability of a transition between the source and sink states) where each
28 vertex emits an output symbol when entered. The symbol (or observation) is
29 non-deterministically generated. For this reason, knowing that a sequence of
30 output observations was generated by a given HMM does not mean that the
31 corresponding sequence of states (and what the current state is) is known.
32 This is the 'hidden' in the hidden markov model.
33
34 Formally, a HMM can be characterised by:
35 - the output observation alphabet. This is the set of symbols which may be
36 observed as output of the system.
37 - the set of states.
38 - the transition probabilities M{a_{ij} = P(s_t = j | s_{t-1} = i)}. These
39 represent the probability of transition to each state from a given
40 state.
41 - the output probability matrix M{b_i(k) = P(X_t = o_k | s_t = i)}. These
42 represent the probability of observing each symbol in a given state.
43 - the initial state distribution. This gives the probability of starting
44 in each state.
45
46 To ground this discussion, take a common NLP application, part-of-speech (POS)
47 tagging. An HMM is desirable for this task as the highest probability tag
48 sequence can be calculated for a given sequence of word forms. This differs
49 from other tagging techniques which often tag each word individually, seeking
50 to optimise each individual tagging greedily without regard to the optimal
51 combination of tags for a larger unit, such as a sentence. The HMM does this
52 with the Viterbi algorithm, which efficiently computes the optimal path
53 through the graph given the sequence of words forms.
54
55 In POS tagging the states usually have a 1:1 correspondence with the tag
56 alphabet - i.e. each state represents a single tag. The output observation
57 alphabet is the set of word forms (the lexicon), and the remaining three
58 parameters are derived by a training regime. With this information the
59 probability of a given sentence can be easily derived, by simply summing the
60 probability of each distinct path through the model. Similarly, the highest
61 probability tagging sequence can be derived with the Viterbi algorithm,
62 yielding a state sequence which can be mapped into a tag sequence.
63
64 This discussion assumes that the HMM has been trained. This is probably the
65 most difficult task with the model, and requires either MLE estimates of the
66 parameters or unsupervised learning using the Baum-Welch algorithm, a variant
67 of EM.
68 """
69
70 import re
71 import types
72
73 from numpy import *
74 from nltk.probability import FreqDist, ConditionalFreqDist, \
75 ConditionalProbDist, DictionaryProbDist, DictionaryConditionalProbDist, \
76 LidstoneProbDist, MutableProbDist, MLEProbDist
77 from nltk.metrics import accuracy as _accuracy
78 from nltk.util import LazyMap, LazyConcatenation, LazyZip
79
80 from api import *
81
82
83 _NINF = float('-1e300')
84
85 _TEXT = 0
86 _TAG = 1
89 """
90 Hidden Markov model class, a generative model for labelling sequence data.
91 These models define the joint probability of a sequence of symbols and
92 their labels (state transitions) as the product of the starting state
93 probability, the probability of each state transition, and the probability
94 of each observation being generated from each state. This is described in
95 more detail in the module documentation.
96
97 This implementation is based on the HMM description in Chapter 8, Huang,
98 Acero and Hon, Spoken Language Processing and includes an extension for
99 training shallow HMM parsers or specializaed HMMs as in Molina et.
100 al, 2002. A specialized HMM modifies training data by applying a
101 specialization function to create a new training set that is more
102 appropriate for sequential tagging with an HMM. A typical use case is
103 chunking.
104 """
105 - def __init__(self, symbols, states, transitions, outputs, priors, **kwargs):
106 """
107 Creates a hidden markov model parametised by the the states,
108 transition probabilities, output probabilities and priors.
109
110 @param symbols: the set of output symbols (alphabet)
111 @type symbols: seq of any
112 @param states: a set of states representing state space
113 @type states: seq of any
114 @param transitions: transition probabilities; Pr(s_i | s_j) is the
115 probability of transition from state i given the model is in
116 state_j
117 @type transitions: C{ConditionalProbDistI}
118 @param outputs: output probabilities; Pr(o_k | s_i) is the probability
119 of emitting symbol k when entering state i
120 @type outputs: C{ConditionalProbDistI}
121 @param priors: initial state distribution; Pr(s_i) is the probability
122 of starting in state i
123 @type priors: C{ProbDistI}
124 @kwparam transform: an optional function for transforming training
125 instances, defaults to the identity function.
126 @type transform: C{function} or C{HiddenMarkovModelTaggerTransform}
127 """
128 self._states = states
129 self._transitions = transitions
130 self._symbols = symbols
131 self._outputs = outputs
132 self._priors = priors
133 self._cache = None
134
135 self._transform = kwargs.get('transform', IdentityTransform())
136 if isinstance(self._transform, types.FunctionType):
137 self._transform = LambdaTransform(self._transform)
138 elif not isinstance(self._transform,
139 HiddenMarkovModelTaggerTransformI):
140 raise
141
142 @classmethod
143 - def _train(cls, labeled_sequence, test_sequence=None,
144 unlabeled_sequence=None, **kwargs):
145 transform = kwargs.get('transform', IdentityTransform())
146 if isinstance(transform, types.FunctionType):
147 transform = LambdaTransform(transform)
148 elif \
149 not isinstance(transform, HiddenMarkovModelTaggerTransformI):
150 raise
151
152 estimator = kwargs.get('estimator', lambda fd, bins: \
153 LidstoneProbDist(fd, 0.1, bins))
154
155 labeled_sequence = LazyMap(transform.transform, labeled_sequence)
156 symbols = list(set(word for sent in labeled_sequence
157 for word, tag in sent))
158 tag_set = list(set(tag for sent in labeled_sequence
159 for word, tag in sent))
160
161 trainer = HiddenMarkovModelTrainer(tag_set, symbols)
162 hmm = trainer.train_supervised(labeled_sequence, estimator=estimator)
163 hmm = cls(hmm._symbols, hmm._states, hmm._transitions, hmm._outputs,
164 hmm._priors, transform=transform)
165
166 if test_sequence:
167 hmm.test(test_sequence, verbose=kwargs.get('verbose', False))
168
169 if unlabeled_sequence:
170 max_iterations = kwargs.get('max_iterations', 5)
171 hmm = trainer.train_unsupervised(unlabeled_sequence, model=hmm,
172 max_iterations=max_iterations)
173 if test_sequence:
174 hmm.test(test_sequence, verbose=kwargs.get('verbose', False))
175
176 return hmm
177
178 @classmethod
179 - def train(cls, labeled_sequence, test_sequence=None,
180 unlabeled_sequence=None, **kwargs):
181 """
182 Train a new C{HiddenMarkovModelTagger} using the given labeled and
183 unlabeled training instances. Testing will be performed if test
184 instances are provided.
185
186 @return: a hidden markov model tagger
187 @rtype: C{HiddenMarkovModelTagger}
188 @param labeled_sequence: a sequence of labeled training instances,
189 i.e. a list of sentences represented as tuples
190 @type labeled_sequence: C{list} of C{list}
191 @param test_sequence: a sequence of labeled test instances
192 @type test_sequence: C{list} of C{list}
193 @param unlabeled_sequence: a sequence of unlabeled training instances,
194 i.e. a list of sentences represented as words
195 @type unlabeled_sequence: C{list} of C{list}
196 @kwparam transform: an optional function for transforming training
197 instances, defaults to the identity function, see L{transform()}
198 @type transform: C{function}
199 @kwparam estimator: an optional function or class that maps a
200 condition's frequency distribution to its probability
201 distribution, defaults to a Lidstone distribution with gamma = 0.1
202 @type estimator: C{class} or C{function}
203 @kwparam verbose: boolean flag indicating whether training should be
204 verbose or include printed output
205 @type verbose: C{bool}
206 @kwparam max_iterations: number of Baum-Welch interations to perform
207 @type max_iterations: C{int}
208 """
209 return cls._train(labeled_sequence, test_sequence,
210 unlabeled_sequence, **kwargs)
211
213 """
214 Returns the probability of the given symbol sequence. If the sequence
215 is labelled, then returns the joint probability of the symbol, state
216 sequence. Otherwise, uses the forward algorithm to find the
217 probability over all label sequences.
218
219 @return: the probability of the sequence
220 @rtype: float
221 @param sequence: the sequence of symbols which must contain the TEXT
222 property, and optionally the TAG property
223 @type sequence: Token
224 """
225 return 2**(self.log_probability(self._transform.transform(sequence)))
226
228 """
229 Returns the log-probability of the given symbol sequence. If the
230 sequence is labelled, then returns the joint log-probability of the
231 symbol, state sequence. Otherwise, uses the forward algorithm to find
232 the log-probability over all label sequences.
233
234 @return: the log-probability of the sequence
235 @rtype: float
236 @param sequence: the sequence of symbols which must contain the TEXT
237 property, and optionally the TAG property
238 @type sequence: Token
239 """
240 sequence = self._transform.transform(sequence)
241
242 T = len(sequence)
243 N = len(self._states)
244
245 if T > 0 and sequence[0][_TAG]:
246 last_state = sequence[0][_TAG]
247 p = self._priors.logprob(last_state) + \
248 self._outputs[last_state].logprob(sequence[0][_TEXT])
249 for t in range(1, T):
250 state = sequence[t][_TAG]
251 p += self._transitions[last_state].logprob(state) + \
252 self._outputs[state].logprob(sequence[t][_TEXT])
253 last_state = state
254 return p
255 else:
256 alpha = self._forward_probability(sequence)
257 p = _log_add(*alpha[T-1, :])
258 return p
259
260 - def tag(self, unlabeled_sequence):
261 """
262 Tags the sequence with the highest probability state sequence. This
263 uses the best_path method to find the Viterbi path.
264
265 @return: a labelled sequence of symbols
266 @rtype: list
267 @param unlabeled_sequence: the sequence of unlabeled symbols
268 @type unlabeled_sequence: list
269 """
270 unlabeled_sequence = self._transform.transform(unlabeled_sequence)
271 return self._tag(unlabeled_sequence)
272
273 - def _tag(self, unlabeled_sequence):
276
278 """
279 @return: the log probability of the symbol being observed in the given
280 state
281 @rtype: float
282 """
283 return self._outputs[state].logprob(symbol)
284
286 """
287 The cache is a tuple (P, O, X, S) where:
288
289 - S maps symbols to integers. I.e., it is the inverse
290 mapping from self._symbols; for each symbol s in
291 self._symbols, the following is true::
292
293 self._symbols[S[s]] == s
294
295 - O is the log output probabilities::
296
297 O[i,k] = log( P(token[t]=sym[k]|tag[t]=state[i]) )
298
299 - X is the log transition probabilities::
300
301 X[i,j] = log( P(tag[t]=state[j]|tag[t-1]=state[i]) )
302
303 - P is the log prior probabilities::
304
305 P[i] = log( P(tag[0]=state[i]) )
306 """
307 if not self._cache:
308 N = len(self._states)
309 M = len(self._symbols)
310 P = zeros(N, float32)
311 X = zeros((N, N), float32)
312 O = zeros((N, M), float32)
313 for i in range(N):
314 si = self._states[i]
315 P[i] = self._priors.logprob(si)
316 for j in range(N):
317 X[i, j] = self._transitions[si].logprob(self._states[j])
318 for k in range(M):
319 O[i, k] = self._outputs[si].logprob(self._symbols[k])
320 S = {}
321 for k in range(M):
322 S[self._symbols[k]] = k
323 self._cache = (P, O, X, S)
324
326
327
328 if symbols:
329 self._create_cache()
330 P, O, X, S = self._cache
331 for symbol in symbols:
332 if symbol not in self._symbols:
333 self._cache = None
334 self._symbols.append(symbol)
335
336 if not self._cache:
337 N = len(self._states)
338 M = len(self._symbols)
339 Q = O.shape[1]
340
341
342 O = hstack([O, zeros((N, M - Q), float32)])
343 for i in range(N):
344 si = self._states[i]
345
346 for k in range(Q, M):
347 O[i, k] = self._outputs[si].logprob(self._symbols[k])
348
349 for k in range(Q, M):
350 S[self._symbols[k]] = k
351 self._cache = (P, O, X, S)
352
354 """
355 Returns the state sequence of the optimal (most probable) path through
356 the HMM. Uses the Viterbi algorithm to calculate this part by dynamic
357 programming.
358
359 @return: the state sequence
360 @rtype: sequence of any
361 @param unlabeled_sequence: the sequence of unlabeled symbols
362 @type unlabeled_sequence: list
363 """
364 unlabeled_sequence = self._transform.transform(unlabeled_sequence)
365 return self._best_path(unlabeled_sequence)
366
368 T = len(unlabeled_sequence)
369 N = len(self._states)
370 self._create_cache()
371 self._update_cache(unlabeled_sequence)
372 P, O, X, S = self._cache
373
374 V = zeros((T, N), float32)
375 B = ones((T, N), int) * -1
376
377 V[0] = P + O[:, S[unlabeled_sequence[0]]]
378 for t in range(1, T):
379 for j in range(N):
380 vs = V[t-1, :] + X[:, j]
381 best = argmax(vs)
382 V[t, j] = vs[best] + O[j, S[unlabeled_sequence[t]]]
383 B[t, j] = best
384
385 current = argmax(V[T-1,:])
386 sequence = [current]
387 for t in range(T-1, 0, -1):
388 last = B[t, current]
389 sequence.append(last)
390 current = last
391
392 sequence.reverse()
393 return map(self._states.__getitem__, sequence)
394
396 """
397 Returns the state sequence of the optimal (most probable) path through
398 the HMM. Uses the Viterbi algorithm to calculate this part by dynamic
399 programming. This uses a simple, direct method, and is included for
400 teaching purposes.
401
402 @return: the state sequence
403 @rtype: sequence of any
404 @param unlabeled_sequence: the sequence of unlabeled symbols
405 @type unlabeled_sequence: list
406 """
407 unlabeled_sequence = self._transform.transform(unlabeled_sequence)
408 return self._best_path_simple(unlabeled_sequence)
409
411 T = len(unlabeled_sequence)
412 N = len(self._states)
413 V = zeros((T, N), float64)
414 B = {}
415
416
417 symbol = unlabeled_sequence[0]
418 for i, state in enumerate(self._states):
419 V[0, i] = self._priors.logprob(state) + \
420 self._output_logprob(state, symbol)
421 B[0, state] = None
422
423
424 for t in range(1, T):
425 symbol = unlabeled_sequence[t]
426 for j in range(N):
427 sj = self._states[j]
428 best = None
429 for i in range(N):
430 si = self._states[i]
431 va = V[t-1, i] + self._transitions[si].logprob(sj)
432 if not best or va > best[0]:
433 best = (va, si)
434 V[t, j] = best[0] + self._output_logprob(sj, symbol)
435 B[t, sj] = best[1]
436
437
438 best = None
439 for i in range(N):
440 val = V[T-1, i]
441 if not best or val > best[0]:
442 best = (val, self._states[i])
443
444
445 current = best[1]
446 sequence = [current]
447 for t in range(T-1, 0, -1):
448 last = B[t, current]
449 sequence.append(last)
450 current = last
451
452 sequence.reverse()
453 return sequence
454
456 """
457 Randomly sample the HMM to generate a sentence of a given length. This
458 samples the prior distribution then the observation distribution and
459 transition distribution for each subsequent observation and state.
460 This will mostly generate unintelligible garbage, but can provide some
461 amusement.
462
463 @return: the randomly created state/observation sequence,
464 generated according to the HMM's probability
465 distributions. The SUBTOKENS have TEXT and TAG
466 properties containing the observation and state
467 respectively.
468 @rtype: list
469 @param rng: random number generator
470 @type rng: Random (or any object with a random() method)
471 @param length: desired output length
472 @type length: int
473 """
474
475
476 tokens = []
477 state = self._sample_probdist(self._priors, rng.random(), self._states)
478 symbol = self._sample_probdist(self._outputs[state],
479 rng.random(), self._symbols)
480 tokens.append((symbol, state))
481
482 for i in range(1, length):
483
484 state = self._sample_probdist(self._transitions[state],
485 rng.random(), self._states)
486 symbol = self._sample_probdist(self._outputs[state],
487 rng.random(), self._symbols)
488 tokens.append((symbol, state))
489
490 return tokens
491
493 cum_p = 0
494 for sample in samples:
495 add_p = probdist.prob(sample)
496 if cum_p <= p <= cum_p + add_p:
497 return sample
498 cum_p += add_p
499 raise Exception('Invalid probability distribution - '
500 'does not sum to one')
501
502 - def entropy(self, unlabeled_sequence):
503 """
504 Returns the entropy over labellings of the given sequence. This is
505 given by::
506
507 H(O) = - sum_S Pr(S | O) log Pr(S | O)
508
509 where the summation ranges over all state sequences, S. Let M{Z =
510 Pr(O) = sum_S Pr(S, O)} where the summation ranges over all state
511 sequences and O is the observation sequence. As such the entropy can
512 be re-expressed as::
513
514 H = - sum_S Pr(S | O) log [ Pr(S, O) / Z ]
515 = log Z - sum_S Pr(S | O) log Pr(S, 0)
516 = log Z - sum_S Pr(S | O) [ log Pr(S_0) + sum_t Pr(S_t | S_{t-1})
517 + sum_t Pr(O_t | S_t) ]
518
519 The order of summation for the log terms can be flipped, allowing
520 dynamic programming to be used to calculate the entropy. Specifically,
521 we use the forward and backward probabilities (alpha, beta) giving::
522
523 H = log Z - sum_s0 alpha_0(s0) beta_0(s0) / Z * log Pr(s0)
524 + sum_t,si,sj alpha_t(si) Pr(sj | si) Pr(O_t+1 | sj) beta_t(sj)
525 / Z * log Pr(sj | si)
526 + sum_t,st alpha_t(st) beta_t(st) / Z * log Pr(O_t | st)
527
528 This simply uses alpha and beta to find the probabilities of partial
529 sequences, constrained to include the given state(s) at some point in
530 time.
531 """
532 unlabeled_sequence = self._transform.transform(unlabeled_sequence)
533
534 T = len(unlabeled_sequence)
535 N = len(self._states)
536
537 alpha = self._forward_probability(unlabeled_sequence)
538 beta = self._backward_probability(unlabeled_sequence)
539 normalisation = _log_add(*alpha[T-1, :])
540
541 entropy = normalisation
542
543
544 for i, state in enumerate(self._states):
545 p = 2**(alpha[0, i] + beta[0, i] - normalisation)
546 entropy -= p * self._priors.logprob(state)
547
548
549
550 for t0 in range(T - 1):
551 t1 = t0 + 1
552 for i0, s0 in enumerate(self._states):
553 for i1, s1 in enumerate(self._states):
554 p = 2**(alpha[t0, i0] + self._transitions[s0].logprob(s1) +
555 self._outputs[s1].logprob(
556 unlabeled_sequence[t1][_TEXT]) +
557 beta[t1, i1] - normalisation)
558 entropy -= p * self._transitions[s0].logprob(s1)
559
560
561
562 for t in range(T):
563 for i, state in enumerate(self._states):
564 p = 2**(alpha[t, i] + beta[t, i] - normalisation)
565 entropy -= p * self._outputs[state].logprob(
566 unlabeled_sequence[t][_TEXT])
567
568
569 return entropy
570
572 """
573 Returns the pointwise entropy over the possible states at each
574 position in the chain, given the observation sequence.
575 """
576 unlabeled_sequence = self._transform.transform(unlabeled_sequence)
577
578 T = len(unlabeled_sequence)
579 N = len(self._states)
580
581 alpha = self._forward_probability(unlabeled_sequence)
582 beta = self._backward_probability(unlabeled_sequence)
583 normalisation = _log_add(*alpha[T-1, :])
584
585 entropies = zeros(T, float64)
586 probs = zeros(N, float64)
587 for t in range(T):
588 for s in range(N):
589 probs[s] = alpha[t, s] + beta[t, s] - normalisation
590
591 for s in range(N):
592 entropies[t] -= 2**(probs[s]) * probs[s]
593
594 return entropies
595
597 unlabeled_sequence = self._transform.transform(unlabeled_sequence)
598
599 T = len(unlabeled_sequence)
600 N = len(self._states)
601
602 labellings = [[state] for state in self._states]
603 for t in range(T - 1):
604 current = labellings
605 labellings = []
606 for labelling in current:
607 for state in self._states:
608 labellings.append(labelling + [state])
609
610 log_probs = []
611 for labelling in labellings:
612 labelled_sequence = unlabeled_sequence[:]
613 for t, label in enumerate(labelling):
614 labelled_sequence[t] = (labelled_sequence[t][_TEXT], label)
615 lp = self.log_probability(labelled_sequence)
616 log_probs.append(lp)
617 normalisation = _log_add(*log_probs)
618
619
620
621
622
623
624
625
626
627
628 entropy = 0
629 for lp in log_probs:
630 lp -= normalisation
631 entropy -= 2**(lp) * lp
632
633 return entropy
634
636 unlabeled_sequence = self._transform.transform(unlabeled_sequence)
637
638 T = len(unlabeled_sequence)
639 N = len(self._states)
640
641 labellings = [[state] for state in self._states]
642 for t in range(T - 1):
643 current = labellings
644 labellings = []
645 for labelling in current:
646 for state in self._states:
647 labellings.append(labelling + [state])
648
649 log_probs = []
650 for labelling in labellings:
651 labelled_sequence = unlabeled_sequence[:]
652 for t, label in enumerate(labelling):
653 labelled_sequence[t] = (labelled_sequence[t][_TEXT], label)
654 lp = self.log_probability(labelled_sequence)
655 log_probs.append(lp)
656
657 normalisation = _log_add(*log_probs)
658
659 probabilities = zeros((T, N), float64)
660 probabilities[:] = _NINF
661 for labelling, lp in zip(labellings, log_probs):
662 lp -= normalisation
663 for t, label in enumerate(labelling):
664 index = self._states.index(label)
665 probabilities[t, index] = _log_add(probabilities[t, index], lp)
666
667 entropies = zeros(T, float64)
668 for t in range(T):
669 for s in range(N):
670 entropies[t] -= 2**(probabilities[t, s]) * probabilities[t, s]
671
672 return entropies
673
675 """
676 Return the forward probability matrix, a T by N array of
677 log-probabilities, where T is the length of the sequence and N is the
678 number of states. Each entry (t, s) gives the probability of being in
679 state s at time t after observing the partial symbol sequence up to
680 and including t.
681
682 @param unlabeled_sequence: the sequence of unlabeled symbols
683 @type unlabeled_sequence: list
684 @return: the forward log probability matrix
685 @rtype: array
686 """
687 T = len(unlabeled_sequence)
688 N = len(self._states)
689 alpha = zeros((T, N), float64)
690
691 symbol = unlabeled_sequence[0][_TEXT]
692 for i, state in enumerate(self._states):
693 alpha[0, i] = self._priors.logprob(state) + \
694 self._outputs[state].logprob(symbol)
695 for t in range(1, T):
696 symbol = unlabeled_sequence[t][_TEXT]
697 for i, si in enumerate(self._states):
698 alpha[t, i] = _NINF
699 for j, sj in enumerate(self._states):
700 alpha[t, i] = _log_add(alpha[t, i], alpha[t-1, j] +
701 self._transitions[sj].logprob(si))
702 alpha[t, i] += self._outputs[si].logprob(symbol)
703
704 return alpha
705
707 """
708 Return the backward probability matrix, a T by N array of
709 log-probabilities, where T is the length of the sequence and N is the
710 number of states. Each entry (t, s) gives the probability of being in
711 state s at time t after observing the partial symbol sequence from t
712 .. T.
713
714 @return: the backward log probability matrix
715 @rtype: array
716 @param unlabeled_sequence: the sequence of unlabeled symbols
717 @type unlabeled_sequence: list
718 """
719 T = len(unlabeled_sequence)
720 N = len(self._states)
721 beta = zeros((T, N), float64)
722
723
724 beta[T-1, :] = log2(1)
725
726
727 for t in range(T-2, -1, -1):
728 symbol = unlabeled_sequence[t+1][_TEXT]
729 for i, si in enumerate(self._states):
730 beta[t, i] = _NINF
731 for j, sj in enumerate(self._states):
732 beta[t, i] = _log_add(beta[t, i],
733 self._transitions[si].logprob(sj) +
734 self._outputs[sj].logprob(symbol) +
735 beta[t + 1, j])
736
737 return beta
738
739 - def test(self, test_sequence, **kwargs):
740 """
741 Tests the C{HiddenMarkovModelTagger} instance.
742
743 @param test_sequence: a sequence of labeled test instances
744 @type test_sequence: C{list} of C{list}
745 @kwparam verbose: boolean flag indicating whether training should be
746 verbose or include printed output
747 @type verbose: C{bool}
748 """
749
750 def words(sent):
751 return [word for (word, tag) in sent]
752
753 def tags(sent):
754 return [tag for (word, tag) in sent]
755
756 test_sequence = LazyMap(self._transform.transform, test_sequence)
757 predicted_sequence = LazyMap(self._tag, LazyMap(words, test_sequence))
758
759 if kwargs.get('verbose', False):
760
761
762 test_sequence = list(test_sequence)
763 predicted_sequence = list(predicted_sequence)
764
765 for test_sent, predicted_sent in zip(test_sequence,
766 predicted_sequence):
767 print 'Test:', \
768 ' '.join(['%s/%s' % (str(token), str(tag))
769 for (token, tag) in test_sent])
770 print
771 print 'Untagged:', \
772 ' '.join([str(token) for (token, tag) in test_sent])
773 print
774 print 'HMM-tagged:', \
775 ' '.join(['%s/%s' % (str(token), str(tag))
776 for (token, tag) in predicted_sent])
777 print
778 print 'Entropy:', \
779 self.entropy([(token, None) for
780 (token, tag) in predicted_sent])
781 print
782 print '-' * 60
783
784 test_tags = LazyConcatenation(LazyMap(tags, test_sequence))
785 predicted_tags = LazyConcatenation(LazyMap(tags, predicted_sequence))
786
787 acc = _accuracy(test_tags, predicted_tags)
788
789 count = sum([len(sent) for sent in test_sequence])
790
791 print 'accuracy over %d tokens: %.2f' % (count, acc * 100)
792
794 return ('<HiddenMarkovModelTagger %d states and %d output symbols>'
795 % (len(self._states), len(self._symbols)))
796
799 """
800 Algorithms for learning HMM parameters from training data. These include
801 both supervised learning (MLE) and unsupervised learning (Baum-Welch).
802 """
803 - def __init__(self, states=None, symbols=None):
804 """
805 Creates an HMM trainer to induce an HMM with the given states and
806 output symbol alphabet. A supervised and unsupervised training
807 method may be used. If either of the states or symbols are not given,
808 these may be derived from supervised training.
809
810 @param states: the set of state labels
811 @type states: sequence of any
812 @param symbols: the set of observation symbols
813 @type symbols: sequence of any
814 """
815 if states:
816 self._states = states
817 else:
818 self._states = []
819 if symbols:
820 self._symbols = symbols
821 else:
822 self._symbols = []
823
824 - def train(self, labelled_sequences=None, unlabeled_sequences=None,
825 **kwargs):
826 """
827 Trains the HMM using both (or either of) supervised and unsupervised
828 techniques.
829
830 @return: the trained model
831 @rtype: HiddenMarkovModelTagger
832 @param labelled_sequences: the supervised training data, a set of
833 labelled sequences of observations
834 @type labelled_sequences: list
835 @param unlabeled_sequences: the unsupervised training data, a set of
836 sequences of observations
837 @type unlabeled_sequences: list
838 @param kwargs: additional arguments to pass to the training methods
839 """
840 assert labelled_sequences or unlabeled_sequences
841 model = None
842 if labelled_sequences:
843 model = self.train_supervised(labelled_sequences, **kwargs)
844 if unlabeled_sequences:
845 if model: kwargs['model'] = model
846 model = self.train_unsupervised(unlabeled_sequences, **kwargs)
847 return model
848
850 """
851 Trains the HMM using the Baum-Welch algorithm to maximise the
852 probability of the data sequence. This is a variant of the EM
853 algorithm, and is unsupervised in that it doesn't need the state
854 sequences for the symbols. The code is based on 'A Tutorial on Hidden
855 Markov Models and Selected Applications in Speech Recognition',
856 Lawrence Rabiner, IEEE, 1989.
857
858 @return: the trained model
859 @rtype: HiddenMarkovModelTagger
860 @param unlabeled_sequences: the training data, a set of
861 sequences of observations
862 @type unlabeled_sequences: list
863 @param kwargs: may include the following parameters::
864 model - a HiddenMarkovModelTagger instance used to begin
865 the Baum-Welch algorithm
866 max_iterations - the maximum number of EM iterations
867 convergence_logprob - the maximum change in log probability to
868 allow convergence
869 """
870
871 N = len(self._states)
872 M = len(self._symbols)
873 symbol_dict = dict((self._symbols[i], i) for i in range(M))
874
875
876
877 model = kwargs.get('model')
878 if not model:
879 priors = UniformProbDist(self._states)
880 transitions = DictionaryConditionalProbDist(
881 dict((state, UniformProbDist(self._states))
882 for state in self._states))
883 output = DictionaryConditionalProbDist(
884 dict((state, UniformProbDist(self._symbols))
885 for state in self._states))
886 model = HiddenMarkovModelTagger(self._symbols, self._states,
887 transitions, output, priors)
888
889
890 model._priors = MutableProbDist(model._priors, self._states)
891 model._transitions = DictionaryConditionalProbDist(
892 dict((s, MutableProbDist(model._transitions[s], self._states))
893 for s in self._states))
894 model._outputs = DictionaryConditionalProbDist(
895 dict((s, MutableProbDist(model._outputs[s], self._symbols))
896 for s in self._states))
897
898
899 converged = False
900 last_logprob = None
901 iteration = 0
902 max_iterations = kwargs.get('max_iterations', 1000)
903 epsilon = kwargs.get('convergence_logprob', 1e-6)
904 while not converged and iteration < max_iterations:
905 A_numer = ones((N, N), float64) * _NINF
906 B_numer = ones((N, M), float64) * _NINF
907 A_denom = ones(N, float64) * _NINF
908 B_denom = ones(N, float64) * _NINF
909
910 logprob = 0
911 for sequence in unlabeled_sequences:
912 sequence = list(sequence)
913 if not sequence:
914 continue
915
916
917 alpha = model._forward_probability(sequence)
918 beta = model._backward_probability(sequence)
919
920
921 T = len(sequence)
922 lpk = _log_add(*alpha[T-1, :])
923 logprob += lpk
924
925
926
927
928 local_A_numer = ones((N, N), float64) * _NINF
929 local_B_numer = ones((N, M), float64) * _NINF
930 local_A_denom = ones(N, float64) * _NINF
931 local_B_denom = ones(N, float64) * _NINF
932
933
934 for t in range(T):
935 x = sequence[t][_TEXT]
936 if t < T - 1:
937 xnext = sequence[t+1][_TEXT]
938 xi = symbol_dict[x]
939 for i in range(N):
940 si = self._states[i]
941 if t < T - 1:
942 for j in range(N):
943 sj = self._states[j]
944 local_A_numer[i, j] = \
945 _log_add(local_A_numer[i, j],
946 alpha[t, i] +
947 model._transitions[si].logprob(sj) +
948 model._outputs[sj].logprob(xnext) +
949 beta[t+1, j])
950 local_A_denom[i] = _log_add(local_A_denom[i],
951 alpha[t, i] + beta[t, i])
952 else:
953 local_B_denom[i] = _log_add(local_A_denom[i],
954 alpha[t, i] + beta[t, i])
955
956 local_B_numer[i, xi] = _log_add(local_B_numer[i, xi],
957 alpha[t, i] + beta[t, i])
958
959
960 for i in range(N):
961 for j in range(N):
962 A_numer[i, j] = _log_add(A_numer[i, j],
963 local_A_numer[i, j] - lpk)
964 for k in range(M):
965 B_numer[i, k] = _log_add(B_numer[i, k],
966 local_B_numer[i, k] - lpk)
967
968 A_denom[i] = _log_add(A_denom[i], local_A_denom[i] - lpk)
969 B_denom[i] = _log_add(B_denom[i], local_B_denom[i] - lpk)
970
971
972
973 for i in range(N):
974 si = self._states[i]
975 for j in range(N):
976 sj = self._states[j]
977 model._transitions[si].update(sj, A_numer[i,j] -
978 A_denom[i])
979 for k in range(M):
980 ok = self._symbols[k]
981 model._outputs[si].update(ok, B_numer[i,k] - B_denom[i])
982
983
984
985
986 if iteration > 0 and abs(logprob - last_logprob) < epsilon:
987 converged = True
988
989 print 'iteration', iteration, 'logprob', logprob
990 iteration += 1
991 last_logprob = logprob
992
993 return model
994
996 """
997 Supervised training maximising the joint probability of the symbol and
998 state sequences. This is done via collecting frequencies of
999 transitions between states, symbol observations while within each
1000 state and which states start a sentence. These frequency distributions
1001 are then normalised into probability estimates, which can be
1002 smoothed if desired.
1003
1004 @return: the trained model
1005 @rtype: HiddenMarkovModelTagger
1006 @param labelled_sequences: the training data, a set of
1007 labelled sequences of observations
1008 @type labelled_sequences: list
1009 @param kwargs: may include an 'estimator' parameter, a function taking
1010 a C{FreqDist} and a number of bins and returning a C{ProbDistI};
1011 otherwise a MLE estimate is used
1012 """
1013
1014
1015 estimator = kwargs.get('estimator')
1016 if estimator == None:
1017 estimator = lambda fdist, bins: MLEProbDist(fdist)
1018
1019
1020
1021 starting = FreqDist()
1022 transitions = ConditionalFreqDist()
1023 outputs = ConditionalFreqDist()
1024 for sequence in labelled_sequences:
1025 lasts = None
1026 for token in sequence:
1027 state = token[_TAG]
1028 symbol = token[_TEXT]
1029 if lasts == None:
1030 starting.inc(state)
1031 else:
1032 transitions[lasts].inc(state)
1033 outputs[state].inc(symbol)
1034 lasts = state
1035
1036
1037 if state not in self._states:
1038 self._states.append(state)
1039 if symbol not in self._symbols:
1040 self._symbols.append(symbol)
1041
1042
1043 N = len(self._states)
1044 pi = estimator(starting, N)
1045 A = ConditionalProbDist(transitions, estimator, N)
1046 B = ConditionalProbDist(outputs, estimator, len(self._symbols))
1047
1048 return HiddenMarkovModelTagger(self._symbols, self._states, A, B, pi)
1049
1058
1074
1084
1087 """
1088 Adds the logged values, returning the logarithm of the addition.
1089 """
1090 x = max(values)
1091 if x > _NINF:
1092 sum_diffs = 0
1093 for value in values:
1094 sum_diffs += 2**(value - x)
1095 return x + log2(sum_diffs)
1096 else:
1097 return x
1098
1100
1101
1102 print
1103 print "HMM probability calculation demo"
1104 print
1105
1106
1107 symbols = ['up', 'down', 'unchanged']
1108 states = ['bull', 'bear', 'static']
1109
1110 def pd(values, samples):
1111 d = {}
1112 for value, item in zip(values, samples):
1113 d[item] = value
1114 return DictionaryProbDist(d)
1115
1116 def cpd(array, conditions, samples):
1117 d = {}
1118 for values, condition in zip(array, conditions):
1119 d[condition] = pd(values, samples)
1120 return DictionaryConditionalProbDist(d)
1121
1122 A = array([[0.6, 0.2, 0.2], [0.5, 0.3, 0.2], [0.4, 0.1, 0.5]], float64)
1123 A = cpd(A, states, states)
1124 B = array([[0.7, 0.1, 0.2], [0.1, 0.6, 0.3], [0.3, 0.3, 0.4]], float64)
1125 B = cpd(B, states, symbols)
1126 pi = array([0.5, 0.2, 0.3], float64)
1127 pi = pd(pi, states)
1128
1129 model = HiddenMarkovModelTagger(symbols=symbols, states=states,
1130 transitions=A, outputs=B, priors=pi)
1131
1132 print 'Testing', model
1133
1134 for test in [['up', 'up'], ['up', 'down', 'up'],
1135 ['down'] * 5, ['unchanged'] * 5 + ['up']]:
1136
1137 sequence = [(t, None) for t in test]
1138
1139 print 'Testing with state sequence', test
1140 print 'probability =', model.probability(sequence)
1141 print 'tagging = ', model.tag([word for (word,tag) in sequence])
1142 print 'p(tagged) = ', model.probability(sequence)
1143 print 'H = ', model.entropy(sequence)
1144 print 'H_exh = ', model._exhaustive_entropy(sequence)
1145 print 'H(point) = ', model.point_entropy(sequence)
1146 print 'H_exh(point)=', model._exhaustive_point_entropy(sequence)
1147 print
1148
1171
1187
1193
1195
1196
1197 print
1198 print "Baum-Welch demo for POS tagging"
1199 print
1200
1201 print 'Training HMM (supervised)...'
1202 sentences, tag_set, symbols = load_pos(210)
1203 symbols = set()
1204 for sentence in sentences:
1205 for token in sentence:
1206 symbols.add(token[_TEXT])
1207
1208 trainer = HiddenMarkovModelTrainer(tag_set, list(symbols))
1209 hmm = trainer.train_supervised(sentences[10:200],
1210 estimator=lambda fd, bins: LidstoneProbDist(fd, 0.1, bins))
1211 print 'Training (unsupervised)...'
1212
1213 unlabeled = _untag(sentences[200:210])
1214 hmm = trainer.train_unsupervised(unlabeled, model=hmm, max_iterations=5)
1215 hmm.test(sentences[:10], verbose=True)
1216
1218
1219
1220
1221 print
1222 print "Baum-Welch demo for market example"
1223 print
1224
1225
1226 symbols = ['up', 'down', 'unchanged']
1227 states = ['bull', 'bear', 'static']
1228
1229 def pd(values, samples):
1230 d = {}
1231 for value, item in zip(values, samples):
1232 d[item] = value
1233 return DictionaryProbDist(d)
1234
1235 def cpd(array, conditions, samples):
1236 d = {}
1237 for values, condition in zip(array, conditions):
1238 d[condition] = pd(values, samples)
1239 return DictionaryConditionalProbDist(d)
1240
1241 A = array([[0.6, 0.2, 0.2], [0.5, 0.3, 0.2], [0.4, 0.1, 0.5]], float64)
1242 A = cpd(A, states, states)
1243 B = array([[0.7, 0.1, 0.2], [0.1, 0.6, 0.3], [0.3, 0.3, 0.4]], float64)
1244 B = cpd(B, states, symbols)
1245 pi = array([0.5, 0.2, 0.3], float64)
1246 pi = pd(pi, states)
1247
1248 model = HiddenMarkovModelTagger(symbols=symbols, states=states,
1249 transitions=A, outputs=B, priors=pi)
1250
1251
1252 training = []
1253 import random
1254 rng = random.Random()
1255 for i in range(10):
1256 item = model.random_sample(rng, 5)
1257 training.append((i[0], None) for i in item)
1258
1259
1260 trainer = HiddenMarkovModelTrainer(states, symbols)
1261 hmm = trainer.train_unsupervised(training, model=model,
1262 max_iterations=1000)
1263
1264 if __name__ == '__main__':
1265