Package nltk :: Package tag :: Module hmm
[hide private]
[frames] | no frames]

Source Code for Module nltk.tag.hmm

   1  # Natural Language Toolkit: Hidden Markov Model 
   2  # 
   3  # Copyright (C) 2001-2011 NLTK Project 
   4  # Author: Trevor Cohn <tacohn@csse.unimelb.edu.au> 
   5  #         Philip Blunsom <pcbl@csse.unimelb.edu.au> 
   6  #         Tiago Tresoldi <tiago@tresoldi.pro.br> (fixes) 
   7  #         Steven Bird <sb@csse.unimelb.edu.au> (fixes) 
   8  #         Joseph Frazee <jfrazee@mail.utexas.edu> (fixes) 
   9  # URL: <http://www.nltk.org/> 
  10  # For license information, see LICENSE.TXT 
  11  # 
  12  # $Id: hmm.py 8777 2011-04-10 07:13:40Z StevenBird1 $ 
  13   
  14  """ 
  15  Hidden Markov Models (HMMs) largely used to assign the correct label sequence 
  16  to sequential data or assess the probability of a given label and data 
  17  sequence. These models are finite state machines characterised by a number of 
  18  states, transitions between these states, and output symbols emitted while in 
  19  each state. The HMM is an extension to the Markov chain, where each state 
  20  corresponds deterministically to a given event. In the HMM the observation is 
  21  a probabilistic function of the state. HMMs share the Markov chain's 
  22  assumption, being that the probability of transition from one state to another 
  23  only depends on the current state - i.e. the series of states that led to the 
  24  current state are not used. They are also time invariant. 
  25   
  26  The HMM is a directed graph, with probability weighted edges (representing the 
  27  probability of a transition between the source and sink states) where each 
  28  vertex emits an output symbol when entered. The symbol (or observation) is 
  29  non-deterministically generated. For this reason, knowing that a sequence of 
  30  output observations was generated by a given HMM does not mean that the 
  31  corresponding sequence of states (and what the current state is) is known. 
  32  This is the 'hidden' in the hidden markov model. 
  33   
  34  Formally, a HMM can be characterised by: 
  35      - the output observation alphabet. This is the set of symbols which may be 
  36        observed as output of the system.  
  37      - the set of states.  
  38      - the transition probabilities M{a_{ij} = P(s_t = j | s_{t-1} = i)}. These 
  39        represent the probability of transition to each state from a given 
  40        state.  
  41      - the output probability matrix M{b_i(k) = P(X_t = o_k | s_t = i)}. These 
  42        represent the probability of observing each symbol in a given state. 
  43      - the initial state distribution. This gives the probability of starting 
  44        in each state. 
  45   
  46  To ground this discussion, take a common NLP application, part-of-speech (POS) 
  47  tagging. An HMM is desirable for this task as the highest probability tag 
  48  sequence can be calculated for a given sequence of word forms. This differs 
  49  from other tagging techniques which often tag each word individually, seeking 
  50  to optimise each individual tagging greedily without regard to the optimal 
  51  combination of tags for a larger unit, such as a sentence. The HMM does this 
  52  with the Viterbi algorithm, which efficiently computes the optimal path 
  53  through the graph given the sequence of words forms. 
  54   
  55  In POS tagging the states usually have a 1:1 correspondence with the tag 
  56  alphabet - i.e. each state represents a single tag. The output observation 
  57  alphabet is the set of word forms (the lexicon), and the remaining three 
  58  parameters are derived by a training regime. With this information the 
  59  probability of a given sentence can be easily derived, by simply summing the 
  60  probability of each distinct path through the model. Similarly, the highest 
  61  probability tagging sequence can be derived with the Viterbi algorithm, 
  62  yielding a state sequence which can be mapped into a tag sequence. 
  63   
  64  This discussion assumes that the HMM has been trained. This is probably the 
  65  most difficult task with the model, and requires either MLE estimates of the 
  66  parameters or unsupervised learning using the Baum-Welch algorithm, a variant 
  67  of EM. 
  68  """ 
  69   
  70  import re 
  71  import types 
  72   
  73  from numpy import * 
  74  from nltk.probability import FreqDist, ConditionalFreqDist, \ 
  75       ConditionalProbDist, DictionaryProbDist, DictionaryConditionalProbDist, \ 
  76       LidstoneProbDist, MutableProbDist, MLEProbDist 
  77  from nltk.metrics import accuracy as _accuracy 
  78  from nltk.util import LazyMap, LazyConcatenation, LazyZip 
  79   
  80  from api import * 
  81   
  82  # _NINF = float('-inf')  # won't work on Windows 
  83  _NINF = float('-1e300') 
  84   
  85  _TEXT = 0  # index of text in a tuple 
  86  _TAG = 1   # index of tag in a tuple 
87 88 -class HiddenMarkovModelTagger(TaggerI):
89 """ 90 Hidden Markov model class, a generative model for labelling sequence data. 91 These models define the joint probability of a sequence of symbols and 92 their labels (state transitions) as the product of the starting state 93 probability, the probability of each state transition, and the probability 94 of each observation being generated from each state. This is described in 95 more detail in the module documentation. 96 97 This implementation is based on the HMM description in Chapter 8, Huang, 98 Acero and Hon, Spoken Language Processing and includes an extension for 99 training shallow HMM parsers or specializaed HMMs as in Molina et. 100 al, 2002. A specialized HMM modifies training data by applying a 101 specialization function to create a new training set that is more 102 appropriate for sequential tagging with an HMM. A typical use case is 103 chunking. 104 """
105 - def __init__(self, symbols, states, transitions, outputs, priors, **kwargs):
106 """ 107 Creates a hidden markov model parametised by the the states, 108 transition probabilities, output probabilities and priors. 109 110 @param symbols: the set of output symbols (alphabet) 111 @type symbols: seq of any 112 @param states: a set of states representing state space 113 @type states: seq of any 114 @param transitions: transition probabilities; Pr(s_i | s_j) is the 115 probability of transition from state i given the model is in 116 state_j 117 @type transitions: C{ConditionalProbDistI} 118 @param outputs: output probabilities; Pr(o_k | s_i) is the probability 119 of emitting symbol k when entering state i 120 @type outputs: C{ConditionalProbDistI} 121 @param priors: initial state distribution; Pr(s_i) is the probability 122 of starting in state i 123 @type priors: C{ProbDistI} 124 @kwparam transform: an optional function for transforming training 125 instances, defaults to the identity function. 126 @type transform: C{function} or C{HiddenMarkovModelTaggerTransform} 127 """ 128 self._states = states 129 self._transitions = transitions 130 self._symbols = symbols 131 self._outputs = outputs 132 self._priors = priors 133 self._cache = None 134 135 self._transform = kwargs.get('transform', IdentityTransform()) 136 if isinstance(self._transform, types.FunctionType): 137 self._transform = LambdaTransform(self._transform) 138 elif not isinstance(self._transform, 139 HiddenMarkovModelTaggerTransformI): 140 raise
141 142 @classmethod
143 - def _train(cls, labeled_sequence, test_sequence=None, 144 unlabeled_sequence=None, **kwargs):
145 transform = kwargs.get('transform', IdentityTransform()) 146 if isinstance(transform, types.FunctionType): 147 transform = LambdaTransform(transform) 148 elif \ 149 not isinstance(transform, HiddenMarkovModelTaggerTransformI): 150 raise 151 152 estimator = kwargs.get('estimator', lambda fd, bins: \ 153 LidstoneProbDist(fd, 0.1, bins)) 154 155 labeled_sequence = LazyMap(transform.transform, labeled_sequence) 156 symbols = list(set(word for sent in labeled_sequence 157 for word, tag in sent)) 158 tag_set = list(set(tag for sent in labeled_sequence 159 for word, tag in sent)) 160 161 trainer = HiddenMarkovModelTrainer(tag_set, symbols) 162 hmm = trainer.train_supervised(labeled_sequence, estimator=estimator) 163 hmm = cls(hmm._symbols, hmm._states, hmm._transitions, hmm._outputs, 164 hmm._priors, transform=transform) 165 166 if test_sequence: 167 hmm.test(test_sequence, verbose=kwargs.get('verbose', False)) 168 169 if unlabeled_sequence: 170 max_iterations = kwargs.get('max_iterations', 5) 171 hmm = trainer.train_unsupervised(unlabeled_sequence, model=hmm, 172 max_iterations=max_iterations) 173 if test_sequence: 174 hmm.test(test_sequence, verbose=kwargs.get('verbose', False)) 175 176 return hmm
177 178 @classmethod
179 - def train(cls, labeled_sequence, test_sequence=None, 180 unlabeled_sequence=None, **kwargs):
181 """ 182 Train a new C{HiddenMarkovModelTagger} using the given labeled and 183 unlabeled training instances. Testing will be performed if test 184 instances are provided. 185 186 @return: a hidden markov model tagger 187 @rtype: C{HiddenMarkovModelTagger} 188 @param labeled_sequence: a sequence of labeled training instances, 189 i.e. a list of sentences represented as tuples 190 @type labeled_sequence: C{list} of C{list} 191 @param test_sequence: a sequence of labeled test instances 192 @type test_sequence: C{list} of C{list} 193 @param unlabeled_sequence: a sequence of unlabeled training instances, 194 i.e. a list of sentences represented as words 195 @type unlabeled_sequence: C{list} of C{list} 196 @kwparam transform: an optional function for transforming training 197 instances, defaults to the identity function, see L{transform()} 198 @type transform: C{function} 199 @kwparam estimator: an optional function or class that maps a 200 condition's frequency distribution to its probability 201 distribution, defaults to a Lidstone distribution with gamma = 0.1 202 @type estimator: C{class} or C{function} 203 @kwparam verbose: boolean flag indicating whether training should be 204 verbose or include printed output 205 @type verbose: C{bool} 206 @kwparam max_iterations: number of Baum-Welch interations to perform 207 @type max_iterations: C{int} 208 """ 209 return cls._train(labeled_sequence, test_sequence, 210 unlabeled_sequence, **kwargs)
211
212 - def probability(self, sequence):
213 """ 214 Returns the probability of the given symbol sequence. If the sequence 215 is labelled, then returns the joint probability of the symbol, state 216 sequence. Otherwise, uses the forward algorithm to find the 217 probability over all label sequences. 218 219 @return: the probability of the sequence 220 @rtype: float 221 @param sequence: the sequence of symbols which must contain the TEXT 222 property, and optionally the TAG property 223 @type sequence: Token 224 """ 225 return 2**(self.log_probability(self._transform.transform(sequence)))
226
227 - def log_probability(self, sequence):
228 """ 229 Returns the log-probability of the given symbol sequence. If the 230 sequence is labelled, then returns the joint log-probability of the 231 symbol, state sequence. Otherwise, uses the forward algorithm to find 232 the log-probability over all label sequences. 233 234 @return: the log-probability of the sequence 235 @rtype: float 236 @param sequence: the sequence of symbols which must contain the TEXT 237 property, and optionally the TAG property 238 @type sequence: Token 239 """ 240 sequence = self._transform.transform(sequence) 241 242 T = len(sequence) 243 N = len(self._states) 244 245 if T > 0 and sequence[0][_TAG]: 246 last_state = sequence[0][_TAG] 247 p = self._priors.logprob(last_state) + \ 248 self._outputs[last_state].logprob(sequence[0][_TEXT]) 249 for t in range(1, T): 250 state = sequence[t][_TAG] 251 p += self._transitions[last_state].logprob(state) + \ 252 self._outputs[state].logprob(sequence[t][_TEXT]) 253 last_state = state 254 return p 255 else: 256 alpha = self._forward_probability(sequence) 257 p = _log_add(*alpha[T-1, :]) 258 return p
259
260 - def tag(self, unlabeled_sequence):
261 """ 262 Tags the sequence with the highest probability state sequence. This 263 uses the best_path method to find the Viterbi path. 264 265 @return: a labelled sequence of symbols 266 @rtype: list 267 @param unlabeled_sequence: the sequence of unlabeled symbols 268 @type unlabeled_sequence: list 269 """ 270 unlabeled_sequence = self._transform.transform(unlabeled_sequence) 271 return self._tag(unlabeled_sequence)
272
273 - def _tag(self, unlabeled_sequence):
274 path = self._best_path(unlabeled_sequence) 275 return zip(unlabeled_sequence, path)
276
277 - def _output_logprob(self, state, symbol):
278 """ 279 @return: the log probability of the symbol being observed in the given 280 state 281 @rtype: float 282 """ 283 return self._outputs[state].logprob(symbol)
284
285 - def _create_cache(self):
286 """ 287 The cache is a tuple (P, O, X, S) where: 288 289 - S maps symbols to integers. I.e., it is the inverse 290 mapping from self._symbols; for each symbol s in 291 self._symbols, the following is true:: 292 293 self._symbols[S[s]] == s 294 295 - O is the log output probabilities:: 296 297 O[i,k] = log( P(token[t]=sym[k]|tag[t]=state[i]) ) 298 299 - X is the log transition probabilities:: 300 301 X[i,j] = log( P(tag[t]=state[j]|tag[t-1]=state[i]) ) 302 303 - P is the log prior probabilities:: 304 305 P[i] = log( P(tag[0]=state[i]) ) 306 """ 307 if not self._cache: 308 N = len(self._states) 309 M = len(self._symbols) 310 P = zeros(N, float32) 311 X = zeros((N, N), float32) 312 O = zeros((N, M), float32) 313 for i in range(N): 314 si = self._states[i] 315 P[i] = self._priors.logprob(si) 316 for j in range(N): 317 X[i, j] = self._transitions[si].logprob(self._states[j]) 318 for k in range(M): 319 O[i, k] = self._outputs[si].logprob(self._symbols[k]) 320 S = {} 321 for k in range(M): 322 S[self._symbols[k]] = k 323 self._cache = (P, O, X, S)
324
325 - def _update_cache(self, symbols):
326 # add new symbols to the symbol table and repopulate the output 327 # probabilities and symbol table mapping 328 if symbols: 329 self._create_cache() 330 P, O, X, S = self._cache 331 for symbol in symbols: 332 if symbol not in self._symbols: 333 self._cache = None 334 self._symbols.append(symbol) 335 # don't bother with the work if there aren't any new symbols 336 if not self._cache: 337 N = len(self._states) 338 M = len(self._symbols) 339 Q = O.shape[1] 340 # add new columns to the output probability table without 341 # destroying the old probabilities 342 O = hstack([O, zeros((N, M - Q), float32)]) 343 for i in range(N): 344 si = self._states[i] 345 # only calculate probabilities for new symbols 346 for k in range(Q, M): 347 O[i, k] = self._outputs[si].logprob(self._symbols[k]) 348 # only create symbol mappings for new symbols 349 for k in range(Q, M): 350 S[self._symbols[k]] = k 351 self._cache = (P, O, X, S)
352
353 - def best_path(self, unlabeled_sequence):
354 """ 355 Returns the state sequence of the optimal (most probable) path through 356 the HMM. Uses the Viterbi algorithm to calculate this part by dynamic 357 programming. 358 359 @return: the state sequence 360 @rtype: sequence of any 361 @param unlabeled_sequence: the sequence of unlabeled symbols 362 @type unlabeled_sequence: list 363 """ 364 unlabeled_sequence = self._transform.transform(unlabeled_sequence) 365 return self._best_path(unlabeled_sequence)
366
367 - def _best_path(self, unlabeled_sequence):
368 T = len(unlabeled_sequence) 369 N = len(self._states) 370 self._create_cache() 371 self._update_cache(unlabeled_sequence) 372 P, O, X, S = self._cache 373 374 V = zeros((T, N), float32) 375 B = ones((T, N), int) * -1 376 377 V[0] = P + O[:, S[unlabeled_sequence[0]]] 378 for t in range(1, T): 379 for j in range(N): 380 vs = V[t-1, :] + X[:, j] 381 best = argmax(vs) 382 V[t, j] = vs[best] + O[j, S[unlabeled_sequence[t]]] 383 B[t, j] = best 384 385 current = argmax(V[T-1,:]) 386 sequence = [current] 387 for t in range(T-1, 0, -1): 388 last = B[t, current] 389 sequence.append(last) 390 current = last 391 392 sequence.reverse() 393 return map(self._states.__getitem__, sequence)
394
395 - def best_path_simple(self, unlabeled_sequence):
396 """ 397 Returns the state sequence of the optimal (most probable) path through 398 the HMM. Uses the Viterbi algorithm to calculate this part by dynamic 399 programming. This uses a simple, direct method, and is included for 400 teaching purposes. 401 402 @return: the state sequence 403 @rtype: sequence of any 404 @param unlabeled_sequence: the sequence of unlabeled symbols 405 @type unlabeled_sequence: list 406 """ 407 unlabeled_sequence = self._transform.transform(unlabeled_sequence) 408 return self._best_path_simple(unlabeled_sequence)
409
410 - def _best_path_simple(self, unlabeled_sequence):
411 T = len(unlabeled_sequence) 412 N = len(self._states) 413 V = zeros((T, N), float64) 414 B = {} 415 416 # find the starting log probabilities for each state 417 symbol = unlabeled_sequence[0] 418 for i, state in enumerate(self._states): 419 V[0, i] = self._priors.logprob(state) + \ 420 self._output_logprob(state, symbol) 421 B[0, state] = None 422 423 # find the maximum log probabilities for reaching each state at time t 424 for t in range(1, T): 425 symbol = unlabeled_sequence[t] 426 for j in range(N): 427 sj = self._states[j] 428 best = None 429 for i in range(N): 430 si = self._states[i] 431 va = V[t-1, i] + self._transitions[si].logprob(sj) 432 if not best or va > best[0]: 433 best = (va, si) 434 V[t, j] = best[0] + self._output_logprob(sj, symbol) 435 B[t, sj] = best[1] 436 437 # find the highest probability final state 438 best = None 439 for i in range(N): 440 val = V[T-1, i] 441 if not best or val > best[0]: 442 best = (val, self._states[i]) 443 444 # traverse the back-pointers B to find the state sequence 445 current = best[1] 446 sequence = [current] 447 for t in range(T-1, 0, -1): 448 last = B[t, current] 449 sequence.append(last) 450 current = last 451 452 sequence.reverse() 453 return sequence
454
455 - def random_sample(self, rng, length):
456 """ 457 Randomly sample the HMM to generate a sentence of a given length. This 458 samples the prior distribution then the observation distribution and 459 transition distribution for each subsequent observation and state. 460 This will mostly generate unintelligible garbage, but can provide some 461 amusement. 462 463 @return: the randomly created state/observation sequence, 464 generated according to the HMM's probability 465 distributions. The SUBTOKENS have TEXT and TAG 466 properties containing the observation and state 467 respectively. 468 @rtype: list 469 @param rng: random number generator 470 @type rng: Random (or any object with a random() method) 471 @param length: desired output length 472 @type length: int 473 """ 474 475 # sample the starting state and symbol prob dists 476 tokens = [] 477 state = self._sample_probdist(self._priors, rng.random(), self._states) 478 symbol = self._sample_probdist(self._outputs[state], 479 rng.random(), self._symbols) 480 tokens.append((symbol, state)) 481 482 for i in range(1, length): 483 # sample the state transition and symbol prob dists 484 state = self._sample_probdist(self._transitions[state], 485 rng.random(), self._states) 486 symbol = self._sample_probdist(self._outputs[state], 487 rng.random(), self._symbols) 488 tokens.append((symbol, state)) 489 490 return tokens
491
492 - def _sample_probdist(self, probdist, p, samples):
493 cum_p = 0 494 for sample in samples: 495 add_p = probdist.prob(sample) 496 if cum_p <= p <= cum_p + add_p: 497 return sample 498 cum_p += add_p 499 raise Exception('Invalid probability distribution - ' 500 'does not sum to one')
501
502 - def entropy(self, unlabeled_sequence):
503 """ 504 Returns the entropy over labellings of the given sequence. This is 505 given by:: 506 507 H(O) = - sum_S Pr(S | O) log Pr(S | O) 508 509 where the summation ranges over all state sequences, S. Let M{Z = 510 Pr(O) = sum_S Pr(S, O)} where the summation ranges over all state 511 sequences and O is the observation sequence. As such the entropy can 512 be re-expressed as:: 513 514 H = - sum_S Pr(S | O) log [ Pr(S, O) / Z ] 515 = log Z - sum_S Pr(S | O) log Pr(S, 0) 516 = log Z - sum_S Pr(S | O) [ log Pr(S_0) + sum_t Pr(S_t | S_{t-1}) 517 + sum_t Pr(O_t | S_t) ] 518 519 The order of summation for the log terms can be flipped, allowing 520 dynamic programming to be used to calculate the entropy. Specifically, 521 we use the forward and backward probabilities (alpha, beta) giving:: 522 523 H = log Z - sum_s0 alpha_0(s0) beta_0(s0) / Z * log Pr(s0) 524 + sum_t,si,sj alpha_t(si) Pr(sj | si) Pr(O_t+1 | sj) beta_t(sj) 525 / Z * log Pr(sj | si) 526 + sum_t,st alpha_t(st) beta_t(st) / Z * log Pr(O_t | st) 527 528 This simply uses alpha and beta to find the probabilities of partial 529 sequences, constrained to include the given state(s) at some point in 530 time. 531 """ 532 unlabeled_sequence = self._transform.transform(unlabeled_sequence) 533 534 T = len(unlabeled_sequence) 535 N = len(self._states) 536 537 alpha = self._forward_probability(unlabeled_sequence) 538 beta = self._backward_probability(unlabeled_sequence) 539 normalisation = _log_add(*alpha[T-1, :]) 540 541 entropy = normalisation 542 543 # starting state, t = 0 544 for i, state in enumerate(self._states): 545 p = 2**(alpha[0, i] + beta[0, i] - normalisation) 546 entropy -= p * self._priors.logprob(state) 547 #print 'p(s_0 = %s) =' % state, p 548 549 # state transitions 550 for t0 in range(T - 1): 551 t1 = t0 + 1 552 for i0, s0 in enumerate(self._states): 553 for i1, s1 in enumerate(self._states): 554 p = 2**(alpha[t0, i0] + self._transitions[s0].logprob(s1) + 555 self._outputs[s1].logprob( 556 unlabeled_sequence[t1][_TEXT]) + 557 beta[t1, i1] - normalisation) 558 entropy -= p * self._transitions[s0].logprob(s1) 559 #print 'p(s_%d = %s, s_%d = %s) =' % (t0, s0, t1, s1), p 560 561 # symbol emissions 562 for t in range(T): 563 for i, state in enumerate(self._states): 564 p = 2**(alpha[t, i] + beta[t, i] - normalisation) 565 entropy -= p * self._outputs[state].logprob( 566 unlabeled_sequence[t][_TEXT]) 567 #print 'p(s_%d = %s) =' % (t, state), p 568 569 return entropy
570
571 - def point_entropy(self, unlabeled_sequence):
572 """ 573 Returns the pointwise entropy over the possible states at each 574 position in the chain, given the observation sequence. 575 """ 576 unlabeled_sequence = self._transform.transform(unlabeled_sequence) 577 578 T = len(unlabeled_sequence) 579 N = len(self._states) 580 581 alpha = self._forward_probability(unlabeled_sequence) 582 beta = self._backward_probability(unlabeled_sequence) 583 normalisation = _log_add(*alpha[T-1, :]) 584 585 entropies = zeros(T, float64) 586 probs = zeros(N, float64) 587 for t in range(T): 588 for s in range(N): 589 probs[s] = alpha[t, s] + beta[t, s] - normalisation 590 591 for s in range(N): 592 entropies[t] -= 2**(probs[s]) * probs[s] 593 594 return entropies
595
596 - def _exhaustive_entropy(self, unlabeled_sequence):
597 unlabeled_sequence = self._transform.transform(unlabeled_sequence) 598 599 T = len(unlabeled_sequence) 600 N = len(self._states) 601 602 labellings = [[state] for state in self._states] 603 for t in range(T - 1): 604 current = labellings 605 labellings = [] 606 for labelling in current: 607 for state in self._states: 608 labellings.append(labelling + [state]) 609 610 log_probs = [] 611 for labelling in labellings: 612 labelled_sequence = unlabeled_sequence[:] 613 for t, label in enumerate(labelling): 614 labelled_sequence[t] = (labelled_sequence[t][_TEXT], label) 615 lp = self.log_probability(labelled_sequence) 616 log_probs.append(lp) 617 normalisation = _log_add(*log_probs) 618 619 #ps = zeros((T, N), float64) 620 #for labelling, lp in zip(labellings, log_probs): 621 #for t in range(T): 622 #ps[t, self._states.index(labelling[t])] += \ 623 # 2**(lp - normalisation) 624 625 #for t in range(T): 626 #print 'prob[%d] =' % t, ps[t] 627 628 entropy = 0 629 for lp in log_probs: 630 lp -= normalisation 631 entropy -= 2**(lp) * lp 632 633 return entropy
634
635 - def _exhaustive_point_entropy(self, unlabeled_sequence):
636 unlabeled_sequence = self._transform.transform(unlabeled_sequence) 637 638 T = len(unlabeled_sequence) 639 N = len(self._states) 640 641 labellings = [[state] for state in self._states] 642 for t in range(T - 1): 643 current = labellings 644 labellings = [] 645 for labelling in current: 646 for state in self._states: 647 labellings.append(labelling + [state]) 648 649 log_probs = [] 650 for labelling in labellings: 651 labelled_sequence = unlabeled_sequence[:] 652 for t, label in enumerate(labelling): 653 labelled_sequence[t] = (labelled_sequence[t][_TEXT], label) 654 lp = self.log_probability(labelled_sequence) 655 log_probs.append(lp) 656 657 normalisation = _log_add(*log_probs) 658 659 probabilities = zeros((T, N), float64) 660 probabilities[:] = _NINF 661 for labelling, lp in zip(labellings, log_probs): 662 lp -= normalisation 663 for t, label in enumerate(labelling): 664 index = self._states.index(label) 665 probabilities[t, index] = _log_add(probabilities[t, index], lp) 666 667 entropies = zeros(T, float64) 668 for t in range(T): 669 for s in range(N): 670 entropies[t] -= 2**(probabilities[t, s]) * probabilities[t, s] 671 672 return entropies
673
674 - def _forward_probability(self, unlabeled_sequence):
675 """ 676 Return the forward probability matrix, a T by N array of 677 log-probabilities, where T is the length of the sequence and N is the 678 number of states. Each entry (t, s) gives the probability of being in 679 state s at time t after observing the partial symbol sequence up to 680 and including t. 681 682 @param unlabeled_sequence: the sequence of unlabeled symbols 683 @type unlabeled_sequence: list 684 @return: the forward log probability matrix 685 @rtype: array 686 """ 687 T = len(unlabeled_sequence) 688 N = len(self._states) 689 alpha = zeros((T, N), float64) 690 691 symbol = unlabeled_sequence[0][_TEXT] 692 for i, state in enumerate(self._states): 693 alpha[0, i] = self._priors.logprob(state) + \ 694 self._outputs[state].logprob(symbol) 695 for t in range(1, T): 696 symbol = unlabeled_sequence[t][_TEXT] 697 for i, si in enumerate(self._states): 698 alpha[t, i] = _NINF 699 for j, sj in enumerate(self._states): 700 alpha[t, i] = _log_add(alpha[t, i], alpha[t-1, j] + 701 self._transitions[sj].logprob(si)) 702 alpha[t, i] += self._outputs[si].logprob(symbol) 703 704 return alpha
705
706 - def _backward_probability(self, unlabeled_sequence):
707 """ 708 Return the backward probability matrix, a T by N array of 709 log-probabilities, where T is the length of the sequence and N is the 710 number of states. Each entry (t, s) gives the probability of being in 711 state s at time t after observing the partial symbol sequence from t 712 .. T. 713 714 @return: the backward log probability matrix 715 @rtype: array 716 @param unlabeled_sequence: the sequence of unlabeled symbols 717 @type unlabeled_sequence: list 718 """ 719 T = len(unlabeled_sequence) 720 N = len(self._states) 721 beta = zeros((T, N), float64) 722 723 # initialise the backward values 724 beta[T-1, :] = log2(1) 725 726 # inductively calculate remaining backward values 727 for t in range(T-2, -1, -1): 728 symbol = unlabeled_sequence[t+1][_TEXT] 729 for i, si in enumerate(self._states): 730 beta[t, i] = _NINF 731 for j, sj in enumerate(self._states): 732 beta[t, i] = _log_add(beta[t, i], 733 self._transitions[si].logprob(sj) + 734 self._outputs[sj].logprob(symbol) + 735 beta[t + 1, j]) 736 737 return beta
738
739 - def test(self, test_sequence, **kwargs):
740 """ 741 Tests the C{HiddenMarkovModelTagger} instance. 742 743 @param test_sequence: a sequence of labeled test instances 744 @type test_sequence: C{list} of C{list} 745 @kwparam verbose: boolean flag indicating whether training should be 746 verbose or include printed output 747 @type verbose: C{bool} 748 """ 749 750 def words(sent): 751 return [word for (word, tag) in sent]
752 753 def tags(sent): 754 return [tag for (word, tag) in sent]
755 756 test_sequence = LazyMap(self._transform.transform, test_sequence) 757 predicted_sequence = LazyMap(self._tag, LazyMap(words, test_sequence)) 758 759 if kwargs.get('verbose', False): 760 # This will be used again later for accuracy so there's no sense 761 # in tagging it twice. 762 test_sequence = list(test_sequence) 763 predicted_sequence = list(predicted_sequence) 764 765 for test_sent, predicted_sent in zip(test_sequence, 766 predicted_sequence): 767 print 'Test:', \ 768 ' '.join(['%s/%s' % (str(token), str(tag)) 769 for (token, tag) in test_sent]) 770 print 771 print 'Untagged:', \ 772 ' '.join([str(token) for (token, tag) in test_sent]) 773 print 774 print 'HMM-tagged:', \ 775 ' '.join(['%s/%s' % (str(token), str(tag)) 776 for (token, tag) in predicted_sent]) 777 print 778 print 'Entropy:', \ 779 self.entropy([(token, None) for 780 (token, tag) in predicted_sent]) 781 print 782 print '-' * 60 783 784 test_tags = LazyConcatenation(LazyMap(tags, test_sequence)) 785 predicted_tags = LazyConcatenation(LazyMap(tags, predicted_sequence)) 786 787 acc = _accuracy(test_tags, predicted_tags) 788 789 count = sum([len(sent) for sent in test_sequence]) 790 791 print 'accuracy over %d tokens: %.2f' % (count, acc * 100) 792
793 - def __repr__(self):
794 return ('<HiddenMarkovModelTagger %d states and %d output symbols>' 795 % (len(self._states), len(self._symbols)))
796
797 798 -class HiddenMarkovModelTrainer(object):
799 """ 800 Algorithms for learning HMM parameters from training data. These include 801 both supervised learning (MLE) and unsupervised learning (Baum-Welch). 802 """
803 - def __init__(self, states=None, symbols=None):
804 """ 805 Creates an HMM trainer to induce an HMM with the given states and 806 output symbol alphabet. A supervised and unsupervised training 807 method may be used. If either of the states or symbols are not given, 808 these may be derived from supervised training. 809 810 @param states: the set of state labels 811 @type states: sequence of any 812 @param symbols: the set of observation symbols 813 @type symbols: sequence of any 814 """ 815 if states: 816 self._states = states 817 else: 818 self._states = [] 819 if symbols: 820 self._symbols = symbols 821 else: 822 self._symbols = []
823
824 - def train(self, labelled_sequences=None, unlabeled_sequences=None, 825 **kwargs):
826 """ 827 Trains the HMM using both (or either of) supervised and unsupervised 828 techniques. 829 830 @return: the trained model 831 @rtype: HiddenMarkovModelTagger 832 @param labelled_sequences: the supervised training data, a set of 833 labelled sequences of observations 834 @type labelled_sequences: list 835 @param unlabeled_sequences: the unsupervised training data, a set of 836 sequences of observations 837 @type unlabeled_sequences: list 838 @param kwargs: additional arguments to pass to the training methods 839 """ 840 assert labelled_sequences or unlabeled_sequences 841 model = None 842 if labelled_sequences: 843 model = self.train_supervised(labelled_sequences, **kwargs) 844 if unlabeled_sequences: 845 if model: kwargs['model'] = model 846 model = self.train_unsupervised(unlabeled_sequences, **kwargs) 847 return model
848
849 - def train_unsupervised(self, unlabeled_sequences, **kwargs):
850 """ 851 Trains the HMM using the Baum-Welch algorithm to maximise the 852 probability of the data sequence. This is a variant of the EM 853 algorithm, and is unsupervised in that it doesn't need the state 854 sequences for the symbols. The code is based on 'A Tutorial on Hidden 855 Markov Models and Selected Applications in Speech Recognition', 856 Lawrence Rabiner, IEEE, 1989. 857 858 @return: the trained model 859 @rtype: HiddenMarkovModelTagger 860 @param unlabeled_sequences: the training data, a set of 861 sequences of observations 862 @type unlabeled_sequences: list 863 @param kwargs: may include the following parameters:: 864 model - a HiddenMarkovModelTagger instance used to begin 865 the Baum-Welch algorithm 866 max_iterations - the maximum number of EM iterations 867 convergence_logprob - the maximum change in log probability to 868 allow convergence 869 """ 870 871 N = len(self._states) 872 M = len(self._symbols) 873 symbol_dict = dict((self._symbols[i], i) for i in range(M)) 874 875 # create a uniform HMM, which will be iteratively refined, unless 876 # given an existing model 877 model = kwargs.get('model') 878 if not model: 879 priors = UniformProbDist(self._states) 880 transitions = DictionaryConditionalProbDist( 881 dict((state, UniformProbDist(self._states)) 882 for state in self._states)) 883 output = DictionaryConditionalProbDist( 884 dict((state, UniformProbDist(self._symbols)) 885 for state in self._states)) 886 model = HiddenMarkovModelTagger(self._symbols, self._states, 887 transitions, output, priors) 888 889 # update model prob dists so that they can be modified 890 model._priors = MutableProbDist(model._priors, self._states) 891 model._transitions = DictionaryConditionalProbDist( 892 dict((s, MutableProbDist(model._transitions[s], self._states)) 893 for s in self._states)) 894 model._outputs = DictionaryConditionalProbDist( 895 dict((s, MutableProbDist(model._outputs[s], self._symbols)) 896 for s in self._states)) 897 898 # iterate until convergence 899 converged = False 900 last_logprob = None 901 iteration = 0 902 max_iterations = kwargs.get('max_iterations', 1000) 903 epsilon = kwargs.get('convergence_logprob', 1e-6) 904 while not converged and iteration < max_iterations: 905 A_numer = ones((N, N), float64) * _NINF 906 B_numer = ones((N, M), float64) * _NINF 907 A_denom = ones(N, float64) * _NINF 908 B_denom = ones(N, float64) * _NINF 909 910 logprob = 0 911 for sequence in unlabeled_sequences: 912 sequence = list(sequence) 913 if not sequence: 914 continue 915 916 # compute forward and backward probabilities 917 alpha = model._forward_probability(sequence) 918 beta = model._backward_probability(sequence) 919 920 # find the log probability of the sequence 921 T = len(sequence) 922 lpk = _log_add(*alpha[T-1, :]) 923 logprob += lpk 924 925 # now update A and B (transition and output probabilities) 926 # using the alpha and beta values. Please refer to Rabiner's 927 # paper for details, it's too hard to explain in comments 928 local_A_numer = ones((N, N), float64) * _NINF 929 local_B_numer = ones((N, M), float64) * _NINF 930 local_A_denom = ones(N, float64) * _NINF 931 local_B_denom = ones(N, float64) * _NINF 932 933 # for each position, accumulate sums for A and B 934 for t in range(T): 935 x = sequence[t][_TEXT] #not found? FIXME 936 if t < T - 1: 937 xnext = sequence[t+1][_TEXT] #not found? FIXME 938 xi = symbol_dict[x] 939 for i in range(N): 940 si = self._states[i] 941 if t < T - 1: 942 for j in range(N): 943 sj = self._states[j] 944 local_A_numer[i, j] = \ 945 _log_add(local_A_numer[i, j], 946 alpha[t, i] + 947 model._transitions[si].logprob(sj) + 948 model._outputs[sj].logprob(xnext) + 949 beta[t+1, j]) 950 local_A_denom[i] = _log_add(local_A_denom[i], 951 alpha[t, i] + beta[t, i]) 952 else: 953 local_B_denom[i] = _log_add(local_A_denom[i], 954 alpha[t, i] + beta[t, i]) 955 956 local_B_numer[i, xi] = _log_add(local_B_numer[i, xi], 957 alpha[t, i] + beta[t, i]) 958 959 # add these sums to the global A and B values 960 for i in range(N): 961 for j in range(N): 962 A_numer[i, j] = _log_add(A_numer[i, j], 963 local_A_numer[i, j] - lpk) 964 for k in range(M): 965 B_numer[i, k] = _log_add(B_numer[i, k], 966 local_B_numer[i, k] - lpk) 967 968 A_denom[i] = _log_add(A_denom[i], local_A_denom[i] - lpk) 969 B_denom[i] = _log_add(B_denom[i], local_B_denom[i] - lpk) 970 971 # use the calculated values to update the transition and output 972 # probability values 973 for i in range(N): 974 si = self._states[i] 975 for j in range(N): 976 sj = self._states[j] 977 model._transitions[si].update(sj, A_numer[i,j] - 978 A_denom[i]) 979 for k in range(M): 980 ok = self._symbols[k] 981 model._outputs[si].update(ok, B_numer[i,k] - B_denom[i]) 982 # Rabiner says the priors don't need to be updated. I don't 983 # believe him. FIXME 984 985 # test for convergence 986 if iteration > 0 and abs(logprob - last_logprob) < epsilon: 987 converged = True 988 989 print 'iteration', iteration, 'logprob', logprob 990 iteration += 1 991 last_logprob = logprob 992 993 return model
994
995 - def train_supervised(self, labelled_sequences, **kwargs):
996 """ 997 Supervised training maximising the joint probability of the symbol and 998 state sequences. This is done via collecting frequencies of 999 transitions between states, symbol observations while within each 1000 state and which states start a sentence. These frequency distributions 1001 are then normalised into probability estimates, which can be 1002 smoothed if desired. 1003 1004 @return: the trained model 1005 @rtype: HiddenMarkovModelTagger 1006 @param labelled_sequences: the training data, a set of 1007 labelled sequences of observations 1008 @type labelled_sequences: list 1009 @param kwargs: may include an 'estimator' parameter, a function taking 1010 a C{FreqDist} and a number of bins and returning a C{ProbDistI}; 1011 otherwise a MLE estimate is used 1012 """ 1013 1014 # default to the MLE estimate 1015 estimator = kwargs.get('estimator') 1016 if estimator == None: 1017 estimator = lambda fdist, bins: MLEProbDist(fdist) 1018 1019 # count occurences of starting states, transitions out of each state 1020 # and output symbols observed in each state 1021 starting = FreqDist() 1022 transitions = ConditionalFreqDist() 1023 outputs = ConditionalFreqDist() 1024 for sequence in labelled_sequences: 1025 lasts = None 1026 for token in sequence: 1027 state = token[_TAG] 1028 symbol = token[_TEXT] 1029 if lasts == None: 1030 starting.inc(state) 1031 else: 1032 transitions[lasts].inc(state) 1033 outputs[state].inc(symbol) 1034 lasts = state 1035 1036 # update the state and symbol lists 1037 if state not in self._states: 1038 self._states.append(state) 1039 if symbol not in self._symbols: 1040 self._symbols.append(symbol) 1041 1042 # create probability distributions (with smoothing) 1043 N = len(self._states) 1044 pi = estimator(starting, N) 1045 A = ConditionalProbDist(transitions, estimator, N) 1046 B = ConditionalProbDist(outputs, estimator, len(self._symbols)) 1047 1048 return HiddenMarkovModelTagger(self._symbols, self._states, A, B, pi)
1049
1050 1051 -class HiddenMarkovModelTaggerTransform(HiddenMarkovModelTaggerTransformI):
1052 """ 1053 An abstract subclass of C{HiddenMarkovModelTaggerTransformI}. 1054 """
1055 - def __init__(self):
1056 if self.__class__ == HiddenMarkovModelTaggerTransform: 1057 raise AssertionError, "Abstract classes can't be instantiated"
1058
1059 1060 -class LambdaTransform(HiddenMarkovModelTaggerTransform):
1061 """ 1062 A subclass of C{HiddenMarkovModelTaggerTransform} that is backed by an 1063 arbitrary user-defined function, instance method, or lambda function. 1064 """
1065 - def __init__(self, transform):
1066 """ 1067 @param func: a user-defined or lambda transform function 1068 @type func: C{function} 1069 """ 1070 self._transform = transform
1071
1072 - def transform(self, labeled_symbols):
1073 return self._transform(labeled_symbols)
1074
1075 1076 -class IdentityTransform(HiddenMarkovModelTaggerTransform):
1077 """ 1078 A subclass of C{HiddenMarkovModelTaggerTransform} that implements 1079 L{transform()} as the identity function, i.e. symbols passed to 1080 C{transform()} are returned unmodified. 1081 """
1082 - def transform(self, labeled_symbols):
1083 return labeled_symbols
1084
1085 1086 -def _log_add(*values):
1087 """ 1088 Adds the logged values, returning the logarithm of the addition. 1089 """ 1090 x = max(values) 1091 if x > _NINF: 1092 sum_diffs = 0 1093 for value in values: 1094 sum_diffs += 2**(value - x) 1095 return x + log2(sum_diffs) 1096 else: 1097 return x
1098
1099 -def demo():
1100 # demonstrates HMM probability calculation 1101 1102 print 1103 print "HMM probability calculation demo" 1104 print 1105 1106 # example taken from page 381, Huang et al 1107 symbols = ['up', 'down', 'unchanged'] 1108 states = ['bull', 'bear', 'static'] 1109 1110 def pd(values, samples): 1111 d = {} 1112 for value, item in zip(values, samples): 1113 d[item] = value 1114 return DictionaryProbDist(d)
1115 1116 def cpd(array, conditions, samples): 1117 d = {} 1118 for values, condition in zip(array, conditions): 1119 d[condition] = pd(values, samples) 1120 return DictionaryConditionalProbDist(d) 1121 1122 A = array([[0.6, 0.2, 0.2], [0.5, 0.3, 0.2], [0.4, 0.1, 0.5]], float64) 1123 A = cpd(A, states, states) 1124 B = array([[0.7, 0.1, 0.2], [0.1, 0.6, 0.3], [0.3, 0.3, 0.4]], float64) 1125 B = cpd(B, states, symbols) 1126 pi = array([0.5, 0.2, 0.3], float64) 1127 pi = pd(pi, states) 1128 1129 model = HiddenMarkovModelTagger(symbols=symbols, states=states, 1130 transitions=A, outputs=B, priors=pi) 1131 1132 print 'Testing', model 1133 1134 for test in [['up', 'up'], ['up', 'down', 'up'], 1135 ['down'] * 5, ['unchanged'] * 5 + ['up']]: 1136 1137 sequence = [(t, None) for t in test] 1138 1139 print 'Testing with state sequence', test 1140 print 'probability =', model.probability(sequence) 1141 print 'tagging = ', model.tag([word for (word,tag) in sequence]) 1142 print 'p(tagged) = ', model.probability(sequence) 1143 print 'H = ', model.entropy(sequence) 1144 print 'H_exh = ', model._exhaustive_entropy(sequence) 1145 print 'H(point) = ', model.point_entropy(sequence) 1146 print 'H_exh(point)=', model._exhaustive_point_entropy(sequence) 1147 print 1148
1149 -def load_pos(num_sents):
1150 from nltk.corpus import brown 1151 1152 sentences = brown.tagged_sents(categories='news')[:num_sents] 1153 1154 tag_re = re.compile(r'[*]|--|[^+*-]+') 1155 tag_set = set() 1156 symbols = set() 1157 1158 cleaned_sentences = [] 1159 for sentence in sentences: 1160 for i in range(len(sentence)): 1161 word, tag = sentence[i] 1162 word = word.lower() # normalize 1163 symbols.add(word) # log this word 1164 # Clean up the tag. 1165 tag = tag_re.match(tag).group() 1166 tag_set.add(tag) 1167 sentence[i] = (word, tag) # store cleaned-up tagged token 1168 cleaned_sentences += [sentence] 1169 1170 return cleaned_sentences, list(tag_set), list(symbols)
1171
1172 -def demo_pos():
1173 # demonstrates POS tagging using supervised training 1174 1175 print 1176 print "HMM POS tagging demo" 1177 print 1178 1179 print 'Training HMM...' 1180 labelled_sequences, tag_set, symbols = load_pos(200) 1181 trainer = HiddenMarkovModelTrainer(tag_set, symbols) 1182 hmm = trainer.train_supervised(labelled_sequences[10:], 1183 estimator=lambda fd, bins: LidstoneProbDist(fd, 0.1, bins)) 1184 1185 print 'Testing...' 1186 hmm.test(labelled_sequences[:10], verbose=True)
1187
1188 -def _untag(sentences):
1189 unlabeled = [] 1190 for sentence in sentences: 1191 unlabeled.append((token[_TEXT], None) for token in sentence) 1192 return unlabeled
1193
1194 -def demo_pos_bw():
1195 # demonstrates the Baum-Welch algorithm in POS tagging 1196 1197 print 1198 print "Baum-Welch demo for POS tagging" 1199 print 1200 1201 print 'Training HMM (supervised)...' 1202 sentences, tag_set, symbols = load_pos(210) 1203 symbols = set() 1204 for sentence in sentences: 1205 for token in sentence: 1206 symbols.add(token[_TEXT]) 1207 1208 trainer = HiddenMarkovModelTrainer(tag_set, list(symbols)) 1209 hmm = trainer.train_supervised(sentences[10:200], 1210 estimator=lambda fd, bins: LidstoneProbDist(fd, 0.1, bins)) 1211 print 'Training (unsupervised)...' 1212 # it's rather slow - so only use 10 samples 1213 unlabeled = _untag(sentences[200:210]) 1214 hmm = trainer.train_unsupervised(unlabeled, model=hmm, max_iterations=5) 1215 hmm.test(sentences[:10], verbose=True)
1216
1217 -def demo_bw():
1218 # demo Baum Welch by generating some sequences and then performing 1219 # unsupervised training on them 1220 1221 print 1222 print "Baum-Welch demo for market example" 1223 print 1224 1225 # example taken from page 381, Huang et al 1226 symbols = ['up', 'down', 'unchanged'] 1227 states = ['bull', 'bear', 'static'] 1228 1229 def pd(values, samples): 1230 d = {} 1231 for value, item in zip(values, samples): 1232 d[item] = value 1233 return DictionaryProbDist(d)
1234 1235 def cpd(array, conditions, samples): 1236 d = {} 1237 for values, condition in zip(array, conditions): 1238 d[condition] = pd(values, samples) 1239 return DictionaryConditionalProbDist(d) 1240 1241 A = array([[0.6, 0.2, 0.2], [0.5, 0.3, 0.2], [0.4, 0.1, 0.5]], float64) 1242 A = cpd(A, states, states) 1243 B = array([[0.7, 0.1, 0.2], [0.1, 0.6, 0.3], [0.3, 0.3, 0.4]], float64) 1244 B = cpd(B, states, symbols) 1245 pi = array([0.5, 0.2, 0.3], float64) 1246 pi = pd(pi, states) 1247 1248 model = HiddenMarkovModelTagger(symbols=symbols, states=states, 1249 transitions=A, outputs=B, priors=pi) 1250 1251 # generate some random sequences 1252 training = [] 1253 import random 1254 rng = random.Random() 1255 for i in range(10): 1256 item = model.random_sample(rng, 5) 1257 training.append((i[0], None) for i in item) 1258 1259 # train on those examples, starting with the model that generated them 1260 trainer = HiddenMarkovModelTrainer(states, symbols) 1261 hmm = trainer.train_unsupervised(training, model=model, 1262 max_iterations=1000) 1263 1264 if __name__ == '__main__': 1265