Schreiben Sie ein Programm, um das manuell annotierte Korpus zu verwenden, und die Parameter eines Trigramm-HMMs zu schätzen. Dieses HMM stellt für das Wortart-Tagging dar.
Folgende Parameter sollen für das Trigramm-HMM geschätzt werden:
# Importiere notwendige Bibliotheken und initialisiere verwandte Parameter
import sys
import pickle
from collections import defaultdict
from typing import Iterable
FreqDict = dict[Iterable, dict[str, int]]
ProbDict = dict[Iterable, dict[str, float]]
KneserNey = True # Kneser-Ney-Glättung anwenden
max_suff_len = 5 # maximale Länge der betrachteten Wortsuffixe
Eingabe: Dateiname
Ausgabe: zwei Dictionaries von Dictionaries, eine für die Häufigkeiten von Wort-Tag-Paaren, die andere für die Häufigkeiten von Kontext-Tag-Paaren
def extract_freqs(filename: str) -> tuple[FreqDict, FreqDict]:
''' liest die Daten und extrahiert die Häufigkeiten
von Wort-Tag-Paaren und Tag-Trigrammen
Die Methode gibt zwei Dictionaries von Dictionaries zurück.'''
# Daten einlesen und Wort-Tag-Paare und Tag-Trigramme extrahieren
word_tag_freq = defaultdict(lambda: defaultdict(int))
context_tag_freq = defaultdict(lambda: defaultdict(int))
tags = [] # temporärer Speicher für die Tags eines Satzes
with open(filename) as file:
for line in file:
line = line.strip()
if line == '': # leere Zeile zeigt das Satzende an
# Grenztags hinzufügen
tags = ['<s>','<s>'] + tags + ['</s>','</s>']
for i in range(len(tags)-2):
context = tuple(tags[i:i+2])
context_tag_freq[context][tags[i+2]] += 1
tags = []
else:
word, tag = line.split("\t")
word_tag_freq[word][tag] += 1
tags.append(tag)
return word_tag_freq, context_tag_freq
Eingabe: ein Wort
Ausgabe: das Suffix des gegebenen Wortes
def get_suffix(word: str) -> str:
''' extrahiert aus dem Argumentwort das Suffix der Länge max_suff_len,
füllt ggf. am Suffixanfang mit Leerzeichen auf, falls das Wort zu kurz ist,
und fügt am Ende 'g' hinzu, falls das erste Zeichen ein Großbuchstabe ist,
'k' falls das erste Zeichen ein Kleinbuchstabe ist,
und 'o' falls das erste Zeichen kein Buchstabe ist. '''
# Groß-/Kleinschreibung bestimmen
case = 'k' if word[0].islower() else 'g' if word[0].isupper() else '0'
tmp = ' ' * max_suff_len + word # Wortanfang mit Leerzeichen auffüllen
suffix = tmp[-max_suff_len:] # Suffix extrahieren
return suffix + case
Eingabe: ein Dictionary von Dictionary für die Wort-Tag-Häufigkeiten
Ausgabe: ein Dictionary von Dictionary für die Suffix-Tag-Häufigkeiten
def compute_suffix_tag_freqs(word_tag_freq: FreqDict) -> FreqDict:
''' berechnet aus den Wort-Tag-Häufigkeiten die Suffix-Tag-Häufigkeiten.
Die Häufigkeit eines Suffix-Tag-Paares ist die Summe der Häufigkeiten
aller Wort-Tag-Paare, bei denen das Wort das entsprechende Suffix besitzt.
'''
# {suffix: {tag: int}}
suffix_tag_freq = defaultdict(lambda: defaultdict(int))
for word in word_tag_freq:
suffix = get_suffix(word)
for tag, freq in word_tag_freq[word].items():
suffix_tag_freq[suffix][tag] += 1 if KneserNey else freq
return suffix_tag_freq
Eingabe: ein Dictionary von Dictionary für die Kontext-Tag-Häufigkeiten
Ausgabe: Discount
def compute_discount(freq: FreqDict) -> float:
''' berechnet erst, wieviele Tupel genau einmal bzw. zweimal auftraten
und berechnet dann den Discount '''
N1 = sum(1 for context in freq for f in freq[context].values() if f == 1)
N2 = sum(1 for context in freq for f in freq[context].values() if f == 2)
discount = N1 / (N1 + 2.0 * N2) if N1 > N2 else 0.5
return discount
Eingabe: ein Dictionary von Dictionary mit Häufigkeiten
Ausgabe: ein Dictionary von Dictionary mit geglätteten Wahrscheinlichkeiten
def estimate_probs(freq, with_smoothing=True):
''' erhält ein Dictionary von Dictionaries mit Häufigkeiten und gibt
ein entsprechendes Dictionary mit "Wahrscheinlichkeiten" zurück.
Für with_smoothing=True wird Discounting gemacht, sonst nicht.'''
discount = compute_discount(freq) if with_smoothing else 0.0
prob = {}
for context in freq:
context_freq = sum(freq[context].values())
# relative Häufigkeiten mit Discount berechnen
prob[context] = {tag: (f - discount) / context_freq
for tag, f in freq[context].items()}
return prob
Eingabe: ein Dictionary von Dictionary mit Kontext-Tag-Häufigkeiten
Ausgabe: ein Dictionary von Dictionary mit Häufigkeiten kleinerer Kontexte
def reduced_context_freqs(context_tag_freq: FreqDict) -> FreqDict:
''' berechnet aus den übergebenen n-Gramm-Häufigkeiten
n-1-Gramm-Häufigkeiten nach der Kneser-Ney-Methode.
Die Keys des übergebenen Dictionaries können Suffix-Strings oder
Tag-Tupel sein. Ihre Funktion sollte mit beiden Argument-Typen funktionieren.'''
# N-1Gramm-Häufigkeiten aus NGramm-Häufigkeiten ableiten
new_freq = defaultdict(lambda: defaultdict(int))
for context in context_tag_freq:
new_context = context[1:] # 1. Symbol des Kontextes weglassen
for tag, freq in context_tag_freq[context].items():
if KneserNey:
# Zahl der verschiedenen Vorgänger berechnen
new_freq[new_context][tag] += 1
else:
# Häufigkeiten aufsummieren
new_freq[new_context][tag] += freq
return new_freq
Eingabe: ein Dictionary von Dictionary für Suffix-Tag-Häufigkeiten
Ausgabe: ein Dictionary von Dictionary für Suffix-Tag-Wahrscheinlichkeiten
def compute_suffix_tag_probs(suffix_tag_freq: FreqDict) -> ProbDict:
''' iteriert über alle Suffix-Längen und
berechnet die relativen Häufigkeiten der Tags gegeben die Wort-Suffixe.
Alle Suffix-Tag-Werte werden im selben Dictionary gespeichert.
Diese Funktion ruft "estimate_probs" und "reduced_context_freqs" auf.
Die relativen Tag-Häufigkeiten für das Suffix der Länge 0 werden ohne
Glättung/Discounting berechnet. '''
# Berechnung der relativen Häufigkeiten für Suffix-Tag-Paare
# alle Suffix-Tag-Wahrscheinlichkeiten werden im gleichen Dictionary gespeichert
suffix_tag_prob = {}
for sufflen in range(max_suff_len, -1, -1):
suffix_tag_prob.update(estimate_probs(suffix_tag_freq))
suffix_tag_freq = reduced_context_freqs(suffix_tag_freq)
# für Suffixlänge 0 keine Glättung
suffix_tag_prob.update(estimate_probs(suffix_tag_freq, False))
return suffix_tag_prob
Eingabe: ein Dictionary von Dictionary für Kontext-Tag-Häufigkeiten
Ausgabe: ein Dictionary für Apriori-Tag-Wahrscheinlichkeiten
def compute_prior_tag_probs(context_tag_freq: FreqDict) -> dict[str, float]:
''' extrahiert aus "context_tag_freq" durch Summation die Gesamthäufigkeiten
aller Tags und berechnet relative Taghäufigkeiten ohne Discounting '''
# Extraktion der Taghäufigkeiten aus den Tag-Trigramm-Häufigkeiten.
tag_freq = defaultdict(int)
for context in context_tag_freq:
for tag, freq in context_tag_freq[context].items():
tag_freq[tag] += freq
# Berechnung der Apriori-Wahrscheinlichkeiten
N = sum(tag_freq.values())
prior_tag_prob = {tag: freq/N for tag, freq in tag_freq.items()}
return prior_tag_prob
Eingabe: ein Dictionary von Dictionary für Kontext-Tag-Häufigkeiten
Ausgabe: ein Dictionary von Dictionary für Kontext-Tag-Wahrscheinlichkeiten
def compute_context_tag_probs(context_tag_freq: FreqDict) -> ProbDict:
''' berechnet analog zu compute_suffix_tag_probs, die relativen Häufigkeiten
von Tags gegeben ein Tag-nGramm (Kontext) der Länge 2, 1, oder 0.
Bei leerem Kontext wird auch hier nicht gelättet. '''
context_tag_prob = {}
for context_size in (2,1):
context_tag_prob.update(estimate_probs(context_tag_freq))
context_tag_freq = reduced_context_freqs(context_tag_freq)
# Tag-Unigramm-Wahrscheinlichkeiten ohne Glättung schätzen
context_tag_prob.update(estimate_probs(context_tag_freq, False))
return context_tag_prob
trainfilename, modelfilename = 'train.tagged', 'model.pkl'
word_tag_freq, context_tag_freq = extract_freqs(trainfilename)
word_tag_prob = estimate_probs(word_tag_freq)
suffix_tag_freq = compute_suffix_tag_freqs(word_tag_freq)
suffix_tag_prob = compute_suffix_tag_probs(suffix_tag_freq)
context_tag_prob = compute_context_tag_probs(context_tag_freq)
prior_tag_prob = compute_prior_tag_probs(context_tag_freq)
# Modell in einer Datei speichern
with open(modelfilename, 'wb') as file:
model = (max_suff_len, prior_tag_prob, word_tag_prob,
suffix_tag_prob, context_tag_prob)
pickle.dump(model, file)
Schreiben Sie ein Programm, um das getrainierte Wortart-Tagging-Modell auf die Annotierung des tokenisierten Textes anzuwenden.
In diesem Programm sollen Sie
Wir erstellen ein class Tagger
für die Implementierung des getrainierten HMMs.
import sys
import math
import pickle
from typing import Iterable
ProbDict = dict[Iterable, dict[str, float]]
class Tagger:
def __init__(self, filename: str):
# Modellparameter einlesen
with open(filename, 'rb') as file:
self._max_suff_len, self._prior_tag_prob, self._word_tag_prob, \
self._suffix_tag_prob, self._tag_ngram_prob = pickle.load(file)
# Backoff-Faktoren berechnen
self._word_tag_backoff = self.compute_backoff_factors(self._word_tag_prob)
self._suffix_tag_backoff = self.compute_backoff_factors(self._suffix_tag_prob)
self._tag_ngram_backoff = self.compute_backoff_factors(self._tag_ngram_prob)
def get_suffix(self, word: str) -> str:
''' liefert das Wortsuffix plus einer Markierung für Groß-/Kleinschreibung '''
case = 'k' if word[0].islower() else 'g' if word[0].isupper() else '0'
tmp = ' ' * self._max_suff_len + word # Wortanfang mit Leerzeichen auffüllen
suffix = tmp[-self._max_suff_len:] # Suffix extrahieren
return suffix + case
# statische Methoden erhalten keinen Zugriff auf "self"
@staticmethod
def compute_backoff_factors(prob: ProbDict) -> dict[Iterable, float]:
''' berechnet die Backoff-Faktoren '''
pass
# return backoff_factor
def compute_suffix_prob(self, suffix: str, tag: str) -> float:
''' berechnet rekursiv p(tag|suffix) '''
pass
# return prob
def compute_word_prob(self, word: str, tag: str) -> float:
''' berechnet den geglätteten Wert von p(tag|word) und
gibt den Wert 1 zurück, falls das Wort der leere String und das Tag das Grenztag ist,
sonst den Wert 0, falls das Word der leere String ist oder das Tag das Grenztag ist,
sonst die geglättete Wahrscheinlichkeit des Tags gegeben das Wort
'''
pass
# return word_prob
def lex_probs(self, word: str) -> list[tuple[str, float]]:
''' liefert die besten Tags für das aktuelle Wort und ihre Wahrscheinlichkeiten '''
pass
# return tagprobs
def compute_context_prob(self, context: tuple, tag: str):
''' berechnet rekursiv p(tag|context) wobei context aus 2 Tags besteht '''
pass
# return prob
def viterbi(self, words):
''' berechnet die Viterbi-Werte und die Tabelle mit den besten Vorgänger-Tags '''
pass
# return vitscore, bestprev
def extract_tags(self, bestprev):
''' extrahiert die beste Tagfolge '''
pass
# return result_tags
def best_tag_sequence(self, words):
vitscore, bestprev = self.viterbi(words)
tags = self.extract_tags(bestprev)
return tags
Eingabe: ein Dictionary von Dictionary für Wahrscheinlichkeiten
Ausgabe: ein Dictionary mit Backoff-Faktoren
# statische Methoden erhalten keinen Zugriff auf "self"
@staticmethod
def compute_backoff_factors(prob: ProbDict) -> dict[Iterable, float]:
''' berechnet die Backoff-Faktoren '''
backoff_factor = {context: 1.0-sum(prob[context].values()) for context in prob}
return backoff_factor
Eingabe: Suffix, Tag
Ausgabe: ein Dictionary mit Wahrscheinlichkeiten
def compute_suffix_prob(self, suffix: str, tag: str) -> float:
''' berechnet rekursiv p(tag|suffix) '''
if suffix == '':
return self._suffix_tag_prob[suffix][tag]
prob = self.compute_suffix_prob(suffix[1:], tag)
if suffix in self._suffix_tag_prob:
prob = self._suffix_tag_prob[suffix].get(tag, 0.0) + \
self._suffix_tag_backoff[suffix] * prob
return prob
Eingabe: Wort, Tag
Ausgabe: ein Dictionary mit Wahrscheinlichkeiten
def compute_word_prob(self, word: str, tag: str) -> float:
''' berechnet den geglätteten Wert von p(tag|word) und
gibt den Wert 1 zurück, falls das Wort der leere String und das Tag das Grenztag ist,
sonst den Wert 0, falls das Word der leere String ist oder das Tag das Grenztag ist,
sonst die geglättete Wahrscheinlichkeit des Tags gegeben das Wort
'''
# Spezialfall Satzgrenzsymbole
if word == '' and tag == '</s>':
return 1.0
if word == '' or tag == '</s>':
return 0.0
# normales Token
word_prob = self.compute_suffix_prob(self.get_suffix(word), tag) # Backoff-Wahrscheinlichkeit
if word in self._word_tag_prob:
# Backoff-Wahrscheinlichkeit mit Backoff-Faktor multiplizieren und zu
# der wortbasierten "Wahrscheinlichkeit" addieren
word_prob = self._word_tag_prob[word].get(tag, 0.0) + \
self._word_tag_backoff[word] * word_prob
return word_prob
Eingabe: Wort
Ausgabe: ein Tuple von Tuple mit Tags ihren Wahrscheinlichkeiten
def lex_probs(self, word: str) -> list[tuple[str, float]]:
''' liefert die besten Tags für das aktuelle Wort und ihre Wahrscheinlichkeiten '''
# lexikalische Wahrscheinlichkeiten aller Tags berechnen
tagprobs = ((tag, self.compute_word_prob(word, tag))
for tag in self._prior_tag_prob)
# unwahrscheinliche Tags ausfiltern
tagprobs = ((tag, lexprob) for tag, lexprob in tagprobs if lexprob > 0.001)
return tagprobs
Eingabe: Kontext, Tag
Ausgabe: ein Dictionary mit Wahrscheinlichkeiten
def compute_context_prob(self, context: tuple, tag: str):
''' berechnet rekursiv p(tag|context) wobei context aus 2 Tags besteht '''
if len(context) == 0: # Abbruch der Rekursion, falls context leer ist
return self._tag_ngram_prob[context][tag]
prob = self.compute_context_prob(context[1:], tag) # Backoff-Wahrscheinlichkeit
if context in self._tag_ngram_prob:
p = self._tag_ngram_prob[context].get(tag, 0.0) + \
self._tag_ngram_backoff[context] * prob
return prob
Eingabe: eine Liste von Wörtern
Ausgabe: Viterbi-Wert und die Tabelle mit besten Vorgänger-Tags (eine Liste von Dictionaries)
def viterbi(self, words):
''' berechnet die Viterbi-Werte und die Tabelle mit den besten Vorgänger-Tags '''
words = [''] + words + ['',''] # Grenztokens hinzufügen
# Initialisierung der Viterbi-Tabelle
# Zur Vermeidung von Underflow wird mit logarithmierten Werten gearbeitet
vitscore = [dict() for _ in words] # logarithmierte Viterbi-Wk.
bestprev = [dict() for _ in words] # bestes Vorgänger-Tagpaar
vitscore[0][('<s>','<s>')] = 0.0 # =log(1)
for i in range(1, len(words)):
lexprobs = self.lex_probs(words[i]) # die möglichen Tags nachschlagen
for tag, lexprob in lexprobs:
for tagpair in vitscore[i-1]:
tag1, tag2 = tagpair # Kontext-Tags
p = self.compute_context_prob(tagpair, tag) * lexprob / self._prior_tag_prob[tag]
p = vitscore[i-1][tagpair] + math.log(p)
newtagpair = (tag2, tag)
if vitscore[i].get(newtagpair, -math.inf) < p:
vitscore[i][newtagpair] = p
bestprev[i][newtagpair] = tagpair
return vitscore, bestprev
Eingabe: die Tabelle mit besten Vorgänger-Tags (eine Liste von Dictionaries)
Ausgabe: eine Liste von Tags
def extract_tags(self, bestprev):
''' extrahiert die beste Tagfolge '''
tagpair = ('</s>','</s>')
result_tags = []
for i in range(len(bestprev)-1, 2, -1):
tagpair = bestprev[i][tagpair]
result_tags.append(tagpair[0])
result_tags = result_tags[::-1] # Tagfolge umdrehen
return result_tags
modelfilename, testfilename = 'model.pkl', 'dev.txt'
# Taggermodel einlesen
tagger = Tagger(modelfilename)
# Eingabedatei taggen
with open(testfilename) as file:
for line in file:
# Jede Zeile enthält einen tokenisierten Satz
words = line.split()
tags = tagger.best_tag_sequence(words) # beste Tagfolge berechnen
# getaggten Satz ausgeben
for w,t in zip(words,tags):
print(w, t, sep="\t")
print('') # Leerzeile am Satzende