La modélisation du langage consiste à apprendre la distribution de probabilité du mot suivant étant donné un historique. Ici, nous allons créer un modèle de langage sur des caractères pour apprendre à générer des titres de films de science fiction.
Le jeu de données provient d'IMDB qui permet d'accèder à de nombreuses infos sur les films, et en plus donne ces données en téléchargement libre (http://www.imdb.com/interfaces/).
Le fichier movies-sf.txt contient des noms de films suivis de leur année de sortie entre parenthèses extraits à partir de la base de données IMDB à l'aide de la commande awk en commentaire.
%matplotlib inline
%%bash
#wget https://datasets.imdbws.com/title.basics.tsv.gz
#zcat title.basics.tsv.gz | awk -F"\t" '$2=="movie" && $5==0 && /Sci-Fi/ && $6!="\\N"{print $3" ("$6")"}' | iconv -f utf8 -t ascii//TRANSLIT | sort -u | shuf > movies-sf.txt
[ -f movies-sf.txt ] || wget -q https://raw.githubusercontent.com/benob/dl4nlp-tutorials-data/master/movies-sf.txt
head movies-sf.txt
Nous allons charger les titres caractère par caractère et encoder ces derniers sous forme d'entiers. Le vocabulaire est produit avec un defaultdict
qui donne un nouvel identifiant à chaque nouveau caractère rencontré. Nous ajoutons deux caractères spéciaux :
<eos>
pour le padding<start>
qui indique le début de la séquenceLe problème va être posé comme prédire le caractère suivant étant donné le caractère courant et un état caché, et nous avons donc besoin d'un symbole <start>
pour prédire le premier caractère. La fin d'un texte sera prédite par la première occurrence d'un symbole <eos>
.
Nous pouvons tout de suite créer un vocabulaire inversé pour vérifier le contenu des données chargées.
import collections
vocab = collections.defaultdict(lambda: len(vocab))
vocab['<eos>'] = 0
vocab['<start>'] = 1
int_texts = []
with open('movies-sf.txt', 'r') as fp:
for line in fp:
int_texts.append([vocab['<start>']] + [vocab[char] for char in line.strip()])
rev_vocab = {y: x for x, y in vocab.items()}
print(rev_vocab)
print(len(int_texts))
print(int_texts[42])
print(''.join([rev_vocab[x] for x in int_texts[42]]))
Afin de bien choisir la longueur maximale sur laquelle le modèle va être entrainé, affichons l'histograme des longueurs de séquences.
from matplotlib import pyplot as plt
plt.hist([len(text) for text in int_texts])
plt.show()
Il semble qu'une longeur maximale de 40 permettra de traiter une bonne partie des titres.
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable
max_len = 40
batch_size = 8
embed_size = 16
hidden_size = 64
Le problème est similaire à un problème de tagging sauf que l'étiquette à prédire est le caractère suivant, donc nous devons agencer les tenseurs de manière à ce que $y_{t} = x_{t+1}$. Il faut calculer la longueur après coupure des séquences les plus longues, puis créer un tenseur à partir du texte pour $x$ et un tenseur à partir du texte décalé de 1 vers la gauche pour $y$.
N'oublions pas de vérifier que les données ont la bonne forme.
X = torch.zeros(len(int_texts), max_len).long()
Y = torch.zeros(len(int_texts), max_len).long()
for i, text in enumerate(int_texts):
length = min(max_len, len(text) - 1) + 1
X[i,:length - 1] = torch.LongTensor(text[:length - 1])
Y[i,:length - 1] = torch.LongTensor(text[1:length])
print(X[42].tolist())
print(Y[42].tolist())
print([rev_vocab[y] for y in Y[42].tolist()])
Nous découpons les données en un ensemble d'entraînement et un ensemble de validation, puis les outils pytorch pour créer des batches mélangés sont utilisés comme d'habitude.
X_train = X[:6500]
Y_train = Y[:6500]
X_valid = X[6500:]
Y_valid = Y[6500:]
from torch.utils.data import TensorDataset, DataLoader
train_set = TensorDataset(X_train, Y_train)
valid_set = TensorDataset(X_valid, Y_valid)
train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
valid_loader = DataLoader(valid_set, batch_size=batch_size)
Le modèle ressemble beaucoup à un taggeur. La première différence est qu'il ne peut pas être bidirectionnel, puisque la causalité est importante (on va générer des textes caractère par caratère en partant de <start>
). La seconde différence est que la fonction forward
va prendre un nouveau paramètre optionnel, l'état caché au temps précédent, et renvoyer non seulmenent les scores générés par le modèle, mais le nouvel état caché après avoir vu la séquence représentée dans x
. Ceci sera nécessire pour la génération caractère par caractère.
class LM(nn.Module):
def __init__(self):
super().__init__()
self.embed = nn.Embedding(len(vocab), embed_size, padding_idx=vocab['<eos>'])
self.rnn = nn.GRU(embed_size, hidden_size, bias=False, num_layers=1, batch_first=True)
self.dropout = nn.Dropout(0.3)
self.decision = nn.Linear(hidden_size, len(vocab))
## Optional: tie embedding and decision weights (recommanded for word LM)
# assert embed_size == hidden_size
# self.decision.weight = self.embed.weight
def forward(self, x, h_0=None):
embed = self.embed(x)
output, h_n = self.rnn(embed, h_0)
return self.decision(self.dropout(output)), h_n
model = LM()
model
On peut tester le modèle sur un batch. Il n'est pas obligatoire de passer un état caché initial (le module GRU s'en occupe si l'état caché passé est à None
), mais on doit récupérer le nouvel état caché même si nous n'allons pas l'utiliser.
Remarquons que les sorties sont de taille (batch_size, sequence_length, num_labels)
et l'état caché (num_layers, batch_size, hidden_size)
.
output, hidden = model(Variable(X[:2]))
print(output.size(), hidden.size())
Il n'y a très peu de différences avec l'évaluation des performances pour un taggeur. Il faut penser que le modèle renvoie maintenant deux résultats (les scores et l'état caché) et donc mettre l'état caché dans une variable qui ne sert à rien.
À la place du taux de corrects, nous allons calculer la perplexité du modèle sur les données.
$ PP(x) = P(x)^{-\frac{1}{N}} = \left[\prod_i P(x_i)\right]^{-\frac{1}{N}} $
où $x$ est une séquence de mots, $P(x)=\prod_i P(x_i)$ est la probabilité donnée par le modèle à cette séquence, et $N$ est sa longueur. On peut réécrire ce calcul en domaine log :
$ PP(x) = exp\left(-\frac{1}{N}\sum_i \log P(x_i)\right) $
Il se trouve que la fonction de loss renvoie $-\frac{1}{N}\log P(x_i)$, donc il suffit de calculer l'exponentielle du loss moyen pour obtenir la perplexité. Cette perplexité n'est pas masquée pour éliminer le padding, donc elle est influencée par ce dernier (on ne pourrait pas profiter de la fonction de loss si l'on souhaitait ignorer le padding).
import math
def perf(model, loader):
criterion = nn.CrossEntropyLoss()
model.eval()
total_loss = num = 0
for x, y in loader:
with torch.no_grad():
y_scores, _ = model(x)
loss = criterion(y_scores.view(y.size(0) * y.size(1), -1), y.view(y.size(0) * y.size(1)))
total_loss += loss.item()
num += len(y)
return total_loss / num, math.exp(total_loss / num)
perf(model, valid_loader)
L'apprentissage est le même que pour le taggeur sauf qu'il faut prendre en compte l'état caché.
def fit(model, epochs):
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())
for epoch in range(epochs):
model.train()
total_loss = num = 0
for x, y in train_loader:
optimizer.zero_grad()
y_scores, _ = model(x)
loss = criterion(y_scores.view(y.size(0) * y.size(1), -1), y.view(y.size(0) * y.size(1)))
loss.backward()
optimizer.step()
total_loss += loss.item()
num += len(y)
print(epoch, total_loss / num, *perf(model, valid_loader))
fit(model, 10)
Écrivons maintenant une fonction de génération. Cette dernère créé un tenseur $x$ contenant le symbole <start>
, et un état caché à 0. Puis, elle repète l'application du modèle sur $x$ et l'état caché, pour générer un nouvel état caché et un vecteur de $y_{\mathrm{scores}}$ sur les caractères. On peut alors sélectionner la composante de plus haut score, l'afficher et mettre à jour $x$ pour qu'il contienne le symbole généré. Il suffit ensuite de boucler jusqu'à la génération de <eos>
.
Le modèle génère toujours la même séquence de caractères, la séquence la plus probable étant donné le corpus d'apprentissage.
def generate_most_probable(model):
x = torch.zeros((1, 1)).long()
x[0, 0] = vocab['<start>']
# size for hidden: (batch, num_layers * num_directions, hidden_size)
hidden = torch.zeros(1, 1, hidden_size)
with torch.no_grad():
for i in range(200):
y_scores, hidden = model(x, hidden)
y_pred = torch.max(y_scores, 2)[1]
selected = y_pred.data[0, 0].item()
if selected == vocab['<eos>']:
break
print(rev_vocab[selected], end='')
x[0, 0] = selected
print()
generate_most_probable(model)
Plutôt que de sélectionner le caractère ayant la plus grande probabilité, on peut tirer aléatoirement un caractère dans la distribution de probabilité après softmax. Utilisez F.softmax
et torch.multinomial
pour tirer aléatoirement un élément $s \sim softmax(y_{\textrm{scores}})$ dans la distribution des scores, et l'utiliser comme élément sélectionné à la place de celui issu du $max$.
On peut diviser les scores par une température $\theta$ avant de faire le softmax pour tasser la distriution. Une valeur de $\theta<1$ poussera le modèle à prendre moins de risque et générer des caractères plus probables, alors que $\theta>1$ lui fera prendre plus de risques et générer des séquences moins probables. En général, $\theta=0.7$ donne des résultats satisfaisants.
<start>
) avant de générer une fin pour le titre.
word_language_model
pour entraîner un modèle de langage sur les mots