Non-contextual representations

Nous nous intéressons à créer des plongements lexicaux (word embeddings) avec plusieurs méthodes. Pour cela, nous allons télécharger un corpus, le pré-traiter pour en extraire des coocurrences, puis calculer les emebddings à partir de ces coocurrences et finalement évaluer la qualité de ces embeddings.

Le corpus que nous allons utiliser est relativement petit. Il s'agit du livre "Le tour du monde en 80 jours" de Jules Vernes. Ce livre est disponible en .txt sur le site de l'ABU, la bibliothèque universelle du CNAM (http://abu.cnam.fr) qui contient plus de 200 ouvrages par des grands auteurs français.

Ce corpus est relativement petit pour garder des temps de calculs raisonnables, et il est généralement admis que pour apprendre des embeddings, il faut au minimum 10 millions de mots, voir plusieurs centaines de millions.

In [ ]:
%%bash
curl -s 'http://abu.cnam.fr/cgi-bin/donner_unformated?tdm80j2' | iconv -f latin1 > input.txt
# for about 200 times more data, use:
#curl -s https://pageperso.lis-lab.fr/benoit.favre/files/abu-all.txt.xz | xz -d > input.txt
head input.txt

Comme on peut le voir dans la sortie précédente, le fichier contient une license que nous ne voulons pas traiter. La première étape est donc de charger le texte sans la license. Notez que ce dernier est encodé en latin1 et qu'il faut spécifier cet encodage lors du chargement. On pourrait aussi filtrer tous les autres éléments textuels non pertinents comme le titre ou le sommaire, mais nous considèrerons que leur impact est négligeable.

In [ ]:
import re

with open('input.txt') as fp:
  text = fp.read().replace('\n', '\\n')
  found = re.search('- DEBUT DU FICHIER tdm80j2 -*', text)
  if found:
    text = text[found.end():]
  found = re.search('-* FIN DU FICHIER tdm80j2', text)
  if found:
    text = text[:found.start()]
  text = text.replace('\\n', '\n')

print(text[:500])

La prochaine étape est de tokeniser le texte, c'est à dire convertir la séquence de caractères en séquence de mots. La tokenisation est un processus délicat généralement effectué à l'aide de règles et de listes d'exceptions. Nous utiliserons NLTK qui fournit un tokeniseur standard qui marche raisonnablement bien pour le français. Mais ce tokeniseur n'est pas parfait car il ne détache pas les déterminants élidés (comme d') des mots, ce que nous devons faire manuellement.

Notez que NLTK nécessite de télécharger un modèle pour effectuer la tokenisation. Il existe de nombreux tutoriels pour NLTK (par exemple https://www.guru99.com/nltk-tutorial.html) qui démontrent la plupart des possibilités de cette bibliothèque.

In [ ]:
!pip -q install nltk
import nltk

## à compléter ##
# words = ...

print(words[:10], words[-10:])
print(len(words))

On peut voir en observant la séquence de tokens produite précédemment qu'il peut être souhaitable de normaliser un peu les mots avant de les traiter. Par exemple, on pourrait mettre les mots en miniscule, filtrer les séquences de ponctuation et les tokens trop peu fréquents.

Une manière de filtrer les tokens pour n'obtenir que des mots consiste à ne conserver que les mots contenant au moins un caractère alphanumérique. La bibliothèque standard re de python ne permet pas d'écrire facilement des expressions régulières prenant en compte les caratères accentués. Nous utiliserons pour cela le module regex qui supporte les classes de caractères unicode dont \p{Ll} qui représente les caractères alphabétiques. Nous en profitons pour mettre les tokens en minuscule.

In [ ]:
!pip -q install regex
import regex

filtered_words = [word.lower() for word in words if regex.search(r'(\p{Ll}|\d)', word.lower())]
print(filtered_words[:30])

Commençons par calculer la fréquence des tokens et afficher les moins et plus fréquents. Nous utilisons un collections.defaultdict pour facilement compter le nombre d'occurrences des mots. Quelle est la catégorie syntaxique des mots les plus fréquents ? et celle des mots les moins fréquents ?

In [ ]:
import collections

freq = collections.defaultdict(int)
for word in filtered_words:
  freq[word] += 1

## à compléter ##
# most_frequent = ...

for word in most_frequent[:10]:
  print(freq[word], word)

for word in most_frequent[-10:]:
  print(freq[word], word)

Nous allons maintenant construire un vocabulaire associant les mots à des identifiants numériques. Nous ignorerons les mots qui apparaissent moins de 4 fois pour écarter une partie du bruit et les mots les moins fréquents pour lesquels nous n'avons de toute façons pas suffisemment de données pour apprendre des représentations raisonnables.

In [ ]:
# trick: each new word is given a new id the first time we look it up in the dictionary
vocab = collections.defaultdict(lambda: len(vocab))

# build vocab by decreasing frequency
for word in most_frequent:
  if freq[word] > 3:
    vocab[word]

# build array with word ids
int_words = []
for word in filtered_words:
  if word in vocab:
    int_words.append(vocab[word])

# freeze content (trying to get the id of new words with fail)
vocab = dict(vocab)

print(len(vocab))
print(vocab)

Pour les approches qui travaillent sur des coocurrences, nous allons créer une matrice creuse contenant le nombre de fois où deux mots apparaissent ensemble dans une fenêtre glissante. Cette matrice prendra la forme d'un dictionnaire dont les clés sont des paires d'entiers (les identifiants des deux mots cooccurant, classés dans l'ordre des entiers pour neutraliser l'ordre des coocurrences), et les valeurs sont le nombre de coocurrences. Le principe est relativement simple. On parcourt le corpus, position par position. Puis il faut compter le nombre de fois où le premier mot de la fenêtre apparait avec chacun des autres mots de la fenêtre.

Attention, chaque fois que l'on essaie d'accéder à une paire dans cette matrice, une entrée est créée, même si elle est à zéro. Donc si on parcourt les occurences pour tout le vocabulaire, la matrice risque de prendre toute la mémoire. D'où l'importance dans la suite de ne travailler que sur les cooccurrences non nulles.

In [ ]:
window_size = 10

cooc = collections.defaultdict(int)

for i in range(len(int_words) - window_size + 1):
  window = int_words[i: i + window_size]
  word = window[0]
  for other in window[1:]:
    if other > word:
      cooc[(word, other)] += 1
    elif other < word:
      cooc[(other, word)] += 1

print(len(cooc))
print(cooc[(vocab['de'], vocab['le'])])

La première méthode que nous allons utiliser est LSA. L'idée est d'effectuer une décomposition en valeurs propres de la matrice de coocurrences à l'aide d'une SVD. Soit la matrice de coocurrence $A$ de taille $(n, n)$, la SVD produit 3 matrices $U$, $\Sigma$ et $V$ de taille $(n, n)$. $\Sigma$ une matrice diagonale contenant les valeurs propres ordonnées par valeur décroissante. $U$ et $V$ sont des bases de l'espace produit.

$A = U \Sigma V^\top$

Si on met à zero les éléments les plus petits de $\Sigma$, on obtient une approximation de $A$ de rang inférieur. Cela revient à mettre à zéro dans U les colonnes correspondant et dans $V^\top$ les lignes correspondant. Nous utiliserons les 50 premières dimensions de la base $U$ comme embeddings, ignorant les autres valeurs propres.

Il existe de nombreuses variantes pour la création de la matrice de coocurrences, comme l'utilisation de documents, de phrases, d'un sous vocabulaire, comme 2nde dimension de la matrice. Lorsque LSA a été proposé, il était admis que les premières valeurs propres correspondaient à des thèmes représentés dans le corpus (si on ignore les mots outils). Si la matrice de cooccurrence n'est pas carrée, il faut utiliser l'argument full_matrices de svd().

Notez que la SVD est calculée sur la matrice pleine. C'est très coûteux est pas raisonnable lorsque l'on utilise un vocabulaire réaliste comme dans wikipedia (plusieurs centaines de milliers de mots différents). On peut utiliser sklearn.decomposition.TruncatedSVD pour ne calculer que les $k$ valeurs propres les plus grandes, sur des matrices creuses.

In [ ]:
import numpy as np
import math

# note: reduce to 2000 if it doesn't fit memory
size = min(2000, len(vocab))

A = np.zeros((size, size), dtype=np.float32)
## à compléter : remplir la matrice A avec le contenu de cooc ##

U, S, Vt = np.linalg.svd(A)

print(S[:50])

U = U[:,:50]
print(U.shape)

Une fois les embeddings entraînés, on peut calculer la similarité entre les représentations associées à deux mots. Pour cela, on utilisera la similarité cosinus, c'est à dire l'angle entre deux vecteurs, qui peut être calculée en divisant le produit scalaire des deux vecteurs par le produit de leur norme.

$cosine(a, b) = \frac{\langle a, b\rangle}{||a||_2 ||b||_2}$

La bibliothèque sklearn offre une implémentation de cette simlarité.

In [ ]:
!pip -q install sklearn
from sklearn.metrics.pairwise import cosine_similarity

def similarity(embedding, word1, word2):
  if word1 in vocab and word2 in vocab:
    v1 = embedding[vocab[word1]].reshape(1, -1)  
    v2 = embedding[vocab[word2]].reshape(1, -1)
    return cosine_similarity(v1, v2)[0][0]

print('cosine(rues, ville) =', similarity(U, 'rues', 'ville'))
print('cosine(tour, monde) =', similarity(U, 'tour', 'monde'))

On peut aussi s'intéresser aux voisin d'un mot dans l'espace d'embeddings. Pour cela il faut calculer la similarité cosine de son vecteur avec tous les autres mots du vocabulaire. Numpy nous donne ensuite les n mots les plus similaires.

In [ ]:
reverse_vocab = {j: i for i, j in vocab.items()}

def most_similar(embedding, word, n=10):
  if word in vocab:
    v = embedding[vocab[word]].reshape(1, -1)
    scores = cosine_similarity(v, embedding).reshape(-1)
    result = []

    # argsort gives n-best scores
    for i in reversed(scores.argsort()[-n:]):
      result.append((reverse_vocab[i], scores[i]))
    return result

print(most_similar(U, 'monde'))
print(most_similar(U, 'pays'))

Enfin, on peut tenter de visualiser l'espace d'embedding entier. Pour cela, on peut projeter les vecteurs en 50 dimensions vers le plan à l'aide d'une méthode de projection qui conserve les voisinages. La PCA est fidèle mais difficile à interpréter. Beaucoup de monde utilise la t-SNE (https://lvdmaaten.github.io/tsne/) qui s'attache spécifiquement à conserver les voisinages locaux au détriment des voisinages globaux. Mais cette projection ne représente pas toujours fidèlement les données (https://pageperso.lis-lab.fr/benoit.favre/understanding-tsne). Un concurrent un peu plus rapide mais soumis aux mêmes problèmes est la projection umap. Qu'observe-t-on ?

In [ ]:
!pip -q install umap-learn
import umap
from matplotlib import pyplot as plt

# change size of figure
plt.rcParams['figure.dpi'] = 72
plt.rcParams['figure.figsize'] = [10, 10]

def plot_projection(embedding, n=500):
  projection = umap.UMAP().fit_transform(embedding[:n])
  fig, ax = plt.subplots()
  ax.scatter(x=projection[:,0], y=projection[:,1])

  for word, index in vocab.items():
    if index < n:
      ax.annotate(word, projection[index])
  plt.show()

plot_projection(U)

Nous allons maintenant entraîner des plongements lexicaux avec la méthodes "GloVe" de Pennington et al. Une implémentation est disponible (https://nlp.stanford.edu/projects/glove/), mais pour l'exercice, nous allons la refaire à l'aide des équations décrites dans l'article décrivant l'approche. Dans ce dernier, l'équation (8), décrit la fonction de loss minimisée pour obtenir les embeddings (réécrite pour coller à notre contexte):

$J = \sum_{ij} f(\text{cooc}_{ij}) (w_i^\top w_j + b_i + b_j - \log \text{cooc}_{ij})^2$

Dans cette équation, $i$ et $j$ sont des indentifiants de mots, $w_i$ et $w_j$ sont les embeddings associés, par exemple des vecteurs de dimension 50, $b_i$, $b_j$ sont des biais scalaires, cooc$_{ij}$ est le nombre de cooccurrences entre les mots $i$ et $j$, et $f$ est une fonction de pondération. En particulier,

$f(x) = \begin{cases} 0, & \text{si $x=0$},\\ (\frac{x}{x_{max}})^\alpha, & \text{si $0<x<x_{max}$},\\ 1, & \text{sinon}. \end{cases}$

Le fait que $f(0)=0$ permet de ne considérer que les coocurrences observées dans le corpus, ce qui réduit les calculs lorsque la taille du vocabulaire devient grande. Les hyper-paramètres utilisés dans l'article sont $x_{max}=100$ et $\alpha=\frac{3}{4}$. Les embeddings $w$ et les biais $b$ sont initialisés aléatoirement.

Si l'on ignore $f$, on peut observer que minimiser $J$ revient à minimiser le MSE entre $w_i^\top w_j + b_i + b_j$ et $\log X_{ij}$. C'est un problème de régression conventionnel.

Pour minimiser $J$ à l'aide de pytorch, nous devons:

  1. Créer des données $X$ et $Y$ représentant respectivement les paires de mots $(i,j)$ et les valeurs $X_{ij}$ associées. Le reste de $J$ pourra être dérivé à partir de ces entrées
  2. Créer une classe modèle nn.Module hébergeant les paramètres $w$ et $b$ sous forme de nn.Parameter
  3. Créer une fonction qui calcule le loss $J$ pour un batch échantilloné dans $X, Y$
  4. Créer une boucle d'apprentissage classique qui minimise $J$ par descente de gradient

Commençons par créer instancier les données sous forme de tenseurs. $X$ est un tenseur de type long et de taille $(len(cooc), 2)$ qui contient les indices des mots cooccurrant (les clés du dictionnaire cooc). $Y$ est un tenseur de type float et de taille $(len(cooc))$ contient les nombre de coocurrences associées (les valeurs du dictionnaire cooc).

In [ ]:
import torch
import torch.optim as optim
import torch.nn as nn

device = torch.device('cuda')

size = min(2000, len(vocab))

# input data: tensor with all pairs of words for which cooccurrences are non-null
X = torch.tensor([(i, j) for i, j in cooc.keys() if i < size and j < size]).long().to(device)
Y = torch.tensor([value for (i, j), v in cooc.items() if i < size and j < size]).float().to(device)

# cast as batch loader
from torch.utils.data import TensorDataset, DataLoader
train_dataset = TensorDataset(X, Y)
train_loader = DataLoader(train_dataset, batch_size=1024, shuffle=True)

Le modèle GloVe contient deux tenseurs de paramètres :

  • embedding, de taille (nombre de mots, taille de l'embedding)
  • bias, de taille (nombre de mots) Tous deux sont encapsulés dans des nn.Parameter et initialisés avec une gaussienne de moyenne 0 et de variance 1.

Plutôt qu'une fonction forward, nous calculerons directement le loss $J$ dans une fonction compute_loss. Cette fonction prend en entrée deux tenseurs $x$ et $y$ de taille respective (taile de batch, 2) et (taille de batch), contenant des valeurs extraites de $X$ et $Y$. Cette fonction doit retourner $J$ pour ces entrées, soit un tenseur de taille 1 (un scalaire).

Pour être rapide, cette fonction doit vectoriser les différentes opérations, et donc effectuer les calculs à l'aide d'opérations matricielles sur $x$ et $y$. En particulier, il faut vectoriser l'opération $w_i^T w_j$, ce qui peut être effectué à l'aide de torch.bmm, la multiplication de matrices batchée.

In [ ]:
class GloVe(nn.Module):
  def __init__(self):
    super().__init__()
    self.embedding = nn.Parameter(torch.randn((size, 50)))
    self.bias = nn.Parameter(torch.randn((size,)))

  # compute one round of loss
  def compute_loss(self, x, y):

    ## à compléter : calculer le loss de GloVe ##

    return glove_loss

model = GloVe().to(device)
model.compute_loss(X[:3], Y[:3])

L'apprentissage se fait à l'aide de l'optimiseur de votre choix, en effectuant plusieurs époques sur l'ensemble des données. Il faut bien veiller à voir toutes les données pour que tous les embeddings soient modifiés. Pour vérifier que l'apprentissage fonctionne, il faut que le loss diminue au fil des époques.

In [ ]:
optimizer = optim.SGD(model.parameters(), lr=0.05)
for epoch in range(25):
  total_loss = 0
  num = 0
  for x, y in train_loader:
    optimizer.zero_grad()
    loss = model.compute_loss(x, y)
    loss.backward()
    optimizer.step()
    total_loss += loss.item()
    num += len(x)
  print(1 + epoch, total_loss / num)

À partir de ces embedding, on peut effectuer les mêmes opérations que sur les embeddings précédents et observer la différence de comportement.

In [ ]:
# convert embeddings to numpy arrays 
embedding = model.embedding.detach().cpu().numpy()

print('cosine(rues, ville) =', similarity(embedding, 'rues', 'ville'))
print('cosine(tour, monde) =', similarity(embedding, 'tour', 'monde'))
print(most_similar(embedding, 'monde'))
print(most_similar(embedding, 'pays'))
plot_projection(embedding)

Le manque de données peut générer des embeddings de qualité moyenne. Pour aller plus loin, téléchargez des embeddings préentraînés, puis vérifiez si les invarants linguistiques comme "roi - homme + femme = reine" sont valides en regardant le vecteur le plus proche de celui résultant du calcule $A - B + C$ pour trois mots d'intérêt. Construisez un petit corpus de test et calculez le nombre de fois où le résultat attendu pour un tel calcul est dans les 10 mots les plus proches.