Tutorial SNLP WS 2023/24¶

Hidden-Markov-Modell für Wortart-Tagging¶

Training des Wortart-Taggers (Übung 7)¶

Schreiben Sie ein Programm, um das manuell annotierte Korpus zu verwenden, und die Parameter eines Trigramm-HMMs zu schätzen. Dieses HMM stellt für das Wortart-Tagging dar.

Folgende Parameter sollen für das Trigramm-HMM geschätzt werden:

  • Apriori-Tagwahrscheilichkeiten $p(t)=f(t)/\Sigma_{t'}f(t')$
  • Kontextwahrscheinkeiten $p(t|t', t'')$
  • Lexikalische Wahscheinlichkeiten $p(t|w)$

Training des HMMs Schritt für Schritt¶

In [1]:
# Importiere notwendige Bibliotheken und initialisiere verwandte Parameter

import sys
import pickle
from collections import defaultdict
from typing import Iterable

FreqDict = dict[Iterable, dict[str, int]]
ProbDict = dict[Iterable, dict[str, float]]

KneserNey = True  # Kneser-Ney-Glättung anwenden
max_suff_len = 5  # maximale Länge der betrachteten Wortsuffixe
Schritte 1: Einlesen der Trainingsdaten aus einer Datei und Extraktion der Häufigkeiten¶

Eingabe: Dateiname
Ausgabe: zwei Dictionaries von Dictionaries, eine für die Häufigkeiten von Wort-Tag-Paaren, die andere für die Häufigkeiten von Kontext-Tag-Paaren

In [ ]:
def extract_freqs(filename: str) -> tuple[FreqDict, FreqDict]:
    ''' liest die Daten und extrahiert die Häufigkeiten
    von Wort-Tag-Paaren und Tag-Trigrammen
    Die Methode gibt zwei Dictionaries von Dictionaries zurück.'''
    # Daten einlesen und Wort-Tag-Paare und Tag-Trigramme extrahieren
    word_tag_freq  = defaultdict(lambda: defaultdict(int))
    context_tag_freq = defaultdict(lambda: defaultdict(int))
    tags = []      # temporärer Speicher für die Tags eines Satzes
    with open(filename) as file:
        for line in file:
            line = line.strip()
            if line == '':  # leere Zeile zeigt das Satzende an
                # Grenztags hinzufügen
                tags = ['<s>','<s>'] + tags + ['</s>','</s>']
                for i in range(len(tags)-2):
                    context = tuple(tags[i:i+2])
                    context_tag_freq[context][tags[i+2]] += 1
                tags = []
            else:
                word, tag = line.split("\t")
                word_tag_freq[word][tag] += 1
                tags.append(tag)
    return word_tag_freq, context_tag_freq
Schritte 2: Schreibe die Funktion get_suffix(), welche für ein gegebenes Wort und Suffixlänge das Suffix des Wortes zurückgibt.¶

Eingabe: ein Wort
Ausgabe: das Suffix des gegebenen Wortes

In [ ]:
def get_suffix(word: str) -> str:
    ''' extrahiert aus dem Argumentwort das Suffix der Länge max_suff_len,
    füllt ggf. am Suffixanfang mit Leerzeichen auf, falls das Wort zu kurz ist,
    und fügt am Ende 'g' hinzu, falls das erste Zeichen ein Großbuchstabe ist,
    'k' falls das erste Zeichen ein Kleinbuchstabe ist,
    und 'o' falls das erste Zeichen kein Buchstabe ist. '''
    # Groß-/Kleinschreibung bestimmen
    case = 'k' if word[0].islower() else 'g' if word[0].isupper() else '0'
    tmp = ' ' * max_suff_len + word  # Wortanfang mit Leerzeichen auffüllen
    suffix = tmp[-max_suff_len:]     # Suffix extrahieren
    return suffix + case
Schritte 3: Berechnung der Suffix-Tag-Häufigkeiten durch Summation von Wort-Tag-Häufigkeiten.¶

Eingabe: ein Dictionary von Dictionary für die Wort-Tag-Häufigkeiten
Ausgabe: ein Dictionary von Dictionary für die Suffix-Tag-Häufigkeiten

In [ ]:
def compute_suffix_tag_freqs(word_tag_freq: FreqDict) -> FreqDict:
    ''' berechnet aus den Wort-Tag-Häufigkeiten die Suffix-Tag-Häufigkeiten.
    Die Häufigkeit eines Suffix-Tag-Paares ist die Summe der Häufigkeiten
    aller Wort-Tag-Paare, bei denen das Wort das entsprechende Suffix besitzt.
    '''
    # {suffix: {tag: int}}
    suffix_tag_freq = defaultdict(lambda: defaultdict(int))
    for word in word_tag_freq:
        suffix = get_suffix(word)
        for tag, freq in word_tag_freq[word].items():
            suffix_tag_freq[suffix][tag] += 1 if KneserNey else freq
    return suffix_tag_freq
Schritte 4: Schreibe die Funktion compute_discount(), welche den Discount für die Glätterung berechnet.¶

Eingabe: ein Dictionary von Dictionary für die Kontext-Tag-Häufigkeiten
Ausgabe: Discount

In [ ]:
def compute_discount(freq: FreqDict) -> float:
    ''' berechnet erst, wieviele Tupel genau einmal bzw. zweimal auftraten
    und berechnet dann den Discount '''
    N1 = sum(1 for context in freq for f in freq[context].values() if f == 1)
    N2 = sum(1 for context in freq for f in freq[context].values() if f == 2)
    discount = N1 / (N1 + 2.0 * N2) if N1 > N2 else 0.5
    return discount
Schritte 5: Schreibe die Funktion estimate_probs(), welche die geglätteten Wahrscheinlichkeiten zurückgibt.¶

Eingabe: ein Dictionary von Dictionary mit Häufigkeiten
Ausgabe: ein Dictionary von Dictionary mit geglätteten Wahrscheinlichkeiten

In [ ]:
def estimate_probs(freq, with_smoothing=True):
    ''' erhält ein Dictionary von Dictionaries mit Häufigkeiten und gibt
    ein entsprechendes Dictionary mit "Wahrscheinlichkeiten" zurück.
    Für with_smoothing=True wird Discounting gemacht, sonst nicht.'''
    discount = compute_discount(freq) if with_smoothing else 0.0
    prob = {}
    for context in freq:
        context_freq = sum(freq[context].values())
        # relative Häufigkeiten mit Discount berechnen
        prob[context] = {tag: (f - discount) / context_freq 
                         for tag, f in freq[context].items()}
    return prob
Schritte 6: Schreibe die Funktion reduced_context_freqs(), welche die Häufigkeiten kleineren Kontexte berechnet.¶

Eingabe: ein Dictionary von Dictionary mit Kontext-Tag-Häufigkeiten
Ausgabe: ein Dictionary von Dictionary mit Häufigkeiten kleinerer Kontexte

In [ ]:
def reduced_context_freqs(context_tag_freq: FreqDict) -> FreqDict:
    ''' berechnet aus den übergebenen n-Gramm-Häufigkeiten
    n-1-Gramm-Häufigkeiten nach der Kneser-Ney-Methode.
    Die Keys des übergebenen Dictionaries können Suffix-Strings oder
    Tag-Tupel sein. Ihre Funktion sollte mit beiden Argument-Typen funktionieren.'''
    # N-1Gramm-Häufigkeiten aus NGramm-Häufigkeiten ableiten
    new_freq = defaultdict(lambda: defaultdict(int))
    for context in context_tag_freq:
        new_context = context[1:]  # 1. Symbol des Kontextes weglassen
        for tag, freq in context_tag_freq[context].items():
            if KneserNey:
                # Zahl der verschiedenen Vorgänger berechnen
                new_freq[new_context][tag] += 1
            else:
                # Häufigkeiten aufsummieren
                new_freq[new_context][tag] += freq
    return new_freq
Schritte 7: Berechnung der Suffix-Tag-Wahrscheinlichkeiten¶

Eingabe: ein Dictionary von Dictionary für Suffix-Tag-Häufigkeiten
Ausgabe: ein Dictionary von Dictionary für Suffix-Tag-Wahrscheinlichkeiten

In [ ]:
def compute_suffix_tag_probs(suffix_tag_freq: FreqDict) -> ProbDict:
    ''' iteriert über alle Suffix-Längen und
    berechnet die relativen Häufigkeiten der Tags gegeben die Wort-Suffixe.
    Alle Suffix-Tag-Werte werden im selben Dictionary gespeichert.
    Diese Funktion ruft "estimate_probs" und "reduced_context_freqs" auf.
    Die relativen Tag-Häufigkeiten für das Suffix der Länge 0 werden ohne
    Glättung/Discounting berechnet. '''
    # Berechnung der relativen Häufigkeiten für Suffix-Tag-Paare
    # alle Suffix-Tag-Wahrscheinlichkeiten werden im gleichen Dictionary gespeichert
    suffix_tag_prob = {}
    for sufflen in range(max_suff_len, -1, -1):
        suffix_tag_prob.update(estimate_probs(suffix_tag_freq))
        suffix_tag_freq = reduced_context_freqs(suffix_tag_freq)
    # für Suffixlänge 0 keine Glättung
    suffix_tag_prob.update(estimate_probs(suffix_tag_freq, False))
    return suffix_tag_prob
Schritte 8: Berechnung der Apriori-Tag-Wahrscheinlichkeiten¶

Eingabe: ein Dictionary von Dictionary für Kontext-Tag-Häufigkeiten
Ausgabe: ein Dictionary für Apriori-Tag-Wahrscheinlichkeiten

In [ ]:
def compute_prior_tag_probs(context_tag_freq: FreqDict) -> dict[str, float]:
    ''' extrahiert aus "context_tag_freq" durch Summation die Gesamthäufigkeiten
    aller Tags und berechnet relative Taghäufigkeiten ohne Discounting '''
    # Extraktion der Taghäufigkeiten aus den Tag-Trigramm-Häufigkeiten.
    tag_freq = defaultdict(int)
    for context in context_tag_freq:
        for tag, freq in context_tag_freq[context].items():
            tag_freq[tag] += freq
    
    # Berechnung der Apriori-Wahrscheinlichkeiten
    N = sum(tag_freq.values())
    prior_tag_prob = {tag: freq/N for tag, freq in tag_freq.items()}
    return prior_tag_prob
Schritte 9: Berechnung der Tag-nGramm-Wahrscheinlichkeiten¶

Eingabe: ein Dictionary von Dictionary für Kontext-Tag-Häufigkeiten
Ausgabe: ein Dictionary von Dictionary für Kontext-Tag-Wahrscheinlichkeiten

In [ ]:
def compute_context_tag_probs(context_tag_freq: FreqDict) -> ProbDict:
    ''' berechnet analog zu compute_suffix_tag_probs, die relativen Häufigkeiten
    von Tags gegeben ein Tag-nGramm (Kontext) der Länge 2, 1, oder 0.
    Bei leerem Kontext wird auch hier nicht gelättet. '''
    context_tag_prob = {}
    for context_size in (2,1):
        context_tag_prob.update(estimate_probs(context_tag_freq))
        context_tag_freq = reduced_context_freqs(context_tag_freq)
    # Tag-Unigramm-Wahrscheinlichkeiten ohne Glättung schätzen
    context_tag_prob.update(estimate_probs(context_tag_freq, False))
    return context_tag_prob
Schritte 10: Alles zusammenfügen und alle Parameter speichern¶
In [ ]:
trainfilename, modelfilename = 'train.tagged', 'model.pkl'
    
word_tag_freq, context_tag_freq = extract_freqs(trainfilename)
word_tag_prob = estimate_probs(word_tag_freq)

suffix_tag_freq = compute_suffix_tag_freqs(word_tag_freq)
suffix_tag_prob = compute_suffix_tag_probs(suffix_tag_freq)

context_tag_prob = compute_context_tag_probs(context_tag_freq)
prior_tag_prob = compute_prior_tag_probs(context_tag_freq)

# Modell in einer Datei speichern
with open(modelfilename, 'wb') as file:
    model = (max_suff_len, prior_tag_prob, word_tag_prob, 
                suffix_tag_prob, context_tag_prob)
    pickle.dump(model, file)

Anwendung des Wortart-Taggers (Übung 8)¶

Schreiben Sie ein Programm, um das getrainierte Wortart-Tagging-Modell auf die Annotierung des tokenisierten Textes anzuwenden.

In diesem Programm sollen Sie

  • die Parameterdatei einlesen,
  • die Backoff-Faktoren berechnen, und
  • den tokenisierten Text einlesen und satzweise annotieren.

Anwendung des HMMs Schritt für Schritt¶

Wir erstellen ein class Tagger für die Implementierung des getrainierten HMMs.

In [ ]:
import sys
import math
import pickle
from typing import Iterable

ProbDict = dict[Iterable, dict[str, float]]

class Tagger:
    def __init__(self, filename: str):
        # Modellparameter einlesen
        with open(filename, 'rb') as file:
            self._max_suff_len, self._prior_tag_prob, self._word_tag_prob, \
                self._suffix_tag_prob, self._tag_ngram_prob = pickle.load(file)
        # Backoff-Faktoren berechnen
        self._word_tag_backoff   = self.compute_backoff_factors(self._word_tag_prob)
        self._suffix_tag_backoff = self.compute_backoff_factors(self._suffix_tag_prob)
        self._tag_ngram_backoff  = self.compute_backoff_factors(self._tag_ngram_prob)

    def get_suffix(self, word: str) -> str:
        ''' liefert das Wortsuffix plus einer Markierung für Groß-/Kleinschreibung '''
        case = 'k' if word[0].islower() else 'g' if word[0].isupper() else '0'
        tmp = ' ' * self._max_suff_len + word  # Wortanfang mit Leerzeichen auffüllen
        suffix = tmp[-self._max_suff_len:]     # Suffix extrahieren
        return suffix + case

        
    # statische Methoden erhalten keinen Zugriff auf "self"
    @staticmethod
    def compute_backoff_factors(prob: ProbDict) -> dict[Iterable, float]:
        ''' berechnet die Backoff-Faktoren '''
        pass
        # return backoff_factor


    def compute_suffix_prob(self, suffix: str, tag: str) -> float:
        ''' berechnet rekursiv p(tag|suffix) '''
        pass
        # return prob

    def compute_word_prob(self, word: str, tag: str) -> float:
        ''' berechnet den geglätteten Wert von p(tag|word) und
        gibt den Wert 1 zurück, falls das Wort der leere String und das Tag das Grenztag ist,
        sonst den Wert 0, falls das Word der leere String ist oder das Tag das Grenztag ist,
        sonst die geglättete Wahrscheinlichkeit des Tags gegeben das Wort
        '''
        pass
        # return word_prob

    def lex_probs(self, word: str) -> list[tuple[str, float]]:
        ''' liefert die besten Tags für das aktuelle Wort und ihre Wahrscheinlichkeiten '''
        pass
        # return tagprobs

    def compute_context_prob(self, context: tuple, tag: str):
        ''' berechnet rekursiv p(tag|context) wobei context aus 2 Tags besteht '''
        pass
        # return prob

    def viterbi(self, words):
        ''' berechnet die Viterbi-Werte und die Tabelle mit den besten Vorgänger-Tags '''
        pass
        # return vitscore, bestprev

    def extract_tags(self, bestprev):
        ''' extrahiert die beste Tagfolge '''
        pass
        # return result_tags

    
    def best_tag_sequence(self, words):
        vitscore, bestprev = self.viterbi(words)
        tags = self.extract_tags(bestprev)
        return tags
Schritte 1: Berechnung der Backoff-Faktoren¶

Eingabe: ein Dictionary von Dictionary für Wahrscheinlichkeiten
Ausgabe: ein Dictionary mit Backoff-Faktoren

In [ ]:
# statische Methoden erhalten keinen Zugriff auf "self"
@staticmethod
def compute_backoff_factors(prob: ProbDict) -> dict[Iterable, float]:
    ''' berechnet die Backoff-Faktoren '''
    backoff_factor = {context: 1.0-sum(prob[context].values()) for context in prob}
    return backoff_factor
Schritte 2: Rekursive Berechnung der Wahrscheinlichkeiten $p(tag|suffix)$¶

Eingabe: Suffix, Tag
Ausgabe: ein Dictionary mit Wahrscheinlichkeiten

In [ ]:
def compute_suffix_prob(self, suffix: str, tag: str) -> float:
    ''' berechnet rekursiv p(tag|suffix) '''
    if suffix == '':
        return self._suffix_tag_prob[suffix][tag]
    prob = self.compute_suffix_prob(suffix[1:], tag)
    if suffix in self._suffix_tag_prob:
        prob = self._suffix_tag_prob[suffix].get(tag, 0.0) + \
            self._suffix_tag_backoff[suffix] * prob
    return prob
Schritte 3: Berechnung der Wahrscheinlichkeiten $p(tag|word)$¶

Eingabe: Wort, Tag
Ausgabe: ein Dictionary mit Wahrscheinlichkeiten

In [ ]:
def compute_word_prob(self, word: str, tag: str) -> float:
    ''' berechnet den geglätteten Wert von p(tag|word) und
    gibt den Wert 1 zurück, falls das Wort der leere String und das Tag das Grenztag ist,
    sonst den Wert 0, falls das Word der leere String ist oder das Tag das Grenztag ist,
    sonst die geglättete Wahrscheinlichkeit des Tags gegeben das Wort
    '''
    
    # Spezialfall Satzgrenzsymbole
    if word == '' and tag == '</s>':
        return 1.0  
    if word == '' or tag == '</s>':
        return 0.0
    # normales Token
    word_prob = self.compute_suffix_prob(self.get_suffix(word), tag)  # Backoff-Wahrscheinlichkeit
    if word in self._word_tag_prob:
        # Backoff-Wahrscheinlichkeit mit Backoff-Faktor multiplizieren und zu
        # der wortbasierten "Wahrscheinlichkeit" addieren
        word_prob = self._word_tag_prob[word].get(tag, 0.0) + \
            self._word_tag_backoff[word] * word_prob
    return word_prob
Schritte 4: Berechnung der lexikalischen Wahrscheinlichkeiten¶

Eingabe: Wort
Ausgabe: ein Tuple von Tuple mit Tags ihren Wahrscheinlichkeiten

In [ ]:
def lex_probs(self, word: str) -> list[tuple[str, float]]:
    ''' liefert die besten Tags für das aktuelle Wort und ihre Wahrscheinlichkeiten '''

    # lexikalische Wahrscheinlichkeiten aller Tags berechnen
    tagprobs = ((tag, self.compute_word_prob(word, tag))
                for tag in self._prior_tag_prob)
    
    # unwahrscheinliche Tags ausfiltern
    tagprobs = ((tag, lexprob) for tag, lexprob in tagprobs if lexprob > 0.001)
    return tagprobs
Schritte 5: Rekursive Berechnung der Kontextwahrscheinlichkeiten $p(tag|context)$¶

Eingabe: Kontext, Tag
Ausgabe: ein Dictionary mit Wahrscheinlichkeiten

In [ ]:
def compute_context_prob(self, context: tuple, tag: str):
    ''' berechnet rekursiv p(tag|context) wobei context aus 2 Tags besteht '''
    if len(context) == 0:  # Abbruch der Rekursion, falls context leer ist
        return self._tag_ngram_prob[context][tag]
    prob = self.compute_context_prob(context[1:], tag)  # Backoff-Wahrscheinlichkeit
    if context in self._tag_ngram_prob:
        p = self._tag_ngram_prob[context].get(tag, 0.0) + \
            self._tag_ngram_backoff[context] * prob
    return prob
Schritte 6: Implementierung des Viterbi-Algorithmus, Berechnung der Viterbi-Werte und der Tabelle mit den besten Vorgänger-Tags¶

Eingabe: eine Liste von Wörtern
Ausgabe: Viterbi-Wert und die Tabelle mit besten Vorgänger-Tags (eine Liste von Dictionaries)

In [ ]:
def viterbi(self, words):
    ''' berechnet die Viterbi-Werte und die Tabelle mit den besten Vorgänger-Tags '''
    words =  [''] + words + ['','']  # Grenztokens hinzufügen

    # Initialisierung der Viterbi-Tabelle
    # Zur Vermeidung von Underflow wird mit logarithmierten Werten gearbeitet
    vitscore = [dict() for _ in words]  # logarithmierte Viterbi-Wk.
    bestprev = [dict() for _ in words]  # bestes Vorgänger-Tagpaar
    vitscore[0][('<s>','<s>')] = 0.0  # =log(1)
    for i in range(1, len(words)):
        lexprobs = self.lex_probs(words[i])  # die möglichen Tags nachschlagen
        for tag, lexprob in lexprobs:
            for tagpair in vitscore[i-1]:
                tag1, tag2 = tagpair  # Kontext-Tags
                p = self.compute_context_prob(tagpair, tag) * lexprob / self._prior_tag_prob[tag]
                p = vitscore[i-1][tagpair] + math.log(p)
                newtagpair = (tag2, tag)
                if vitscore[i].get(newtagpair, -math.inf) < p:
                    vitscore[i][newtagpair] = p
                    bestprev[i][newtagpair] = tagpair
    return vitscore, bestprev
Schritte 7: Extrahierung der Wahrscheinlichsten Tagfolge aus der Viterbi-Tabelle¶

Eingabe: die Tabelle mit besten Vorgänger-Tags (eine Liste von Dictionaries)
Ausgabe: eine Liste von Tags

In [ ]:
def extract_tags(self, bestprev):
    ''' extrahiert die beste Tagfolge '''
    tagpair = ('</s>','</s>')
    result_tags = []
    for i in range(len(bestprev)-1, 2, -1):
        tagpair = bestprev[i][tagpair]
        result_tags.append(tagpair[0])
    result_tags = result_tags[::-1]  # Tagfolge umdrehen
    return result_tags
Schritte 8: Alles zusammenfügen und die Eingabedatei zeileweise annotieren¶
In [ ]:
modelfilename, testfilename = 'model.pkl', 'dev.txt'

# Taggermodel einlesen
tagger = Tagger(modelfilename)

# Eingabedatei taggen
with open(testfilename) as file:
    for line in file:
        # Jede Zeile enthält einen tokenisierten Satz
        words = line.split()
        tags = tagger.best_tag_sequence(words)  # beste Tagfolge berechnen

        # getaggten Satz ausgeben
        for w,t in zip(words,tags):
            print(w, t, sep="\t")
        print('')  # Leerzeile am Satzende