Ce petit document est un notebook Jupyter, ayant pour but de présenter le concept de problèmes de bandit, comment les simuler et les résoudre, et deux algorithmes conçus dans ce but.
Je ne vais pas donner beaucoup d'explications mathématiques, je conseille plutôt ce petit article, datant de 2017, en français, écrit par Émilie Kaufmann.
Je préfère me focaliser sur une implémentation simple, claire et concise de chaque morceau nécessaire à la simulation de problèmes et d'algorithmes de bandit. J'utilise le langage de programmation Python.
Dans ce but, j'utilise une approche objet : chaque morceau sera une classe, et des instances (des objets) seront utilisées pour toutes les composantes.
# Dépendances
import numpy as np
import random as rd
import matplotlib as mpl
import matplotlib.pyplot as plt
import seaborn as sns
sns.set(context="notebook", style="darkgrid", palette="hls", font="sans-serif", font_scale=1.4)
from tqdm import tqdm_notebook as tqdm
Une simple fonction suffira ici, pour initialiser un algorithme, le simuler durant $T$ étapes, et stocker les récompenses et les bras tirés.
Il est important de tout stocker pour ensuite pouvoir afficher différentes statistiques sur l'expérience, permettant d'évaluer l'efficacité des différentes algorithmes.
Je préfère donner directement cette fonction afin de fixer les signatures des différentes classes qu'on va écrire ensuite :
tire()
qui donne $r_k(t) \sim \nu_k$ à l'instant $t$ pour la distribution $\nu_k$ du $k^{\text{ième}}$ bras, noté r_k_t
.commence()
pour initialiser l'algorithme, une fois,A_t = choix()
pour choisir un bras, à chaque instant $t$, noté $A(t) \in \{1,\dots,K\}$,recompense(A_t, r_k_t)
pour donner la récompense r_k_t
tirée du bras A_t
.def simulation(bras, algorithme, horizon):
""" Simule l'algorithme donné sur ces bras, durant horizon étapes."""
choix, recompenses = np.zeros(horizon), np.zeros(horizon)
# 1. Initialise l'algorithme
algorithme.commence()
# 2. Boucle en temps, pour t = 0 à horizon - 1
for t in range(horizon):
# 2.a. L'algorithme choisi son bras à essayer
A_t = algorithme.choix()
# 2.b. Le bras k donne une récompense
r_k_t = bras[A_t].tire()
# 2.c. La récompense est donnée à l'algorithme
algorithme.recompense(A_t, r_k_t)
# 2.d. On stocke les deux
choix[t] = A_t
recompenses[t] = r_k_t
# 3. On termine en renvoyant ces deux vecteurs
return recompenses, choix
Les récompenses de tels bras, notées $r_k(t)$ pour le bras $k$ à l'instant $t$, sont tirées de façons identiquement distribuées et indépendantes, selon une loi de Bernoulli : $$ \forall t\in\mathbb{N}, \forall k\in\{1,\dots,K\}, r_k(t) \sim \mathrm{B}(\mu_k). $$
class Bernoulli():
""" Bras distribués selon une loi de Bernoulli."""
def __init__(self, probabilite):
assert 0 <= probabilite <= 1, "Erreur, probabilite doit être entre 0 et 1 pour un bras de Bernoulli."
self.probabilite = probabilite
def tire(self):
""" Tire une récompense aléatoire."""
return float(rd.random() <= self.probabilite)
Par exemple, on peut considérer le problème à trois bras ($K = 3$), caractérisé par ces paramètres $\boldsymbol{\mu} = [\mu_1,\dots,\mu_K] = [0.1, 0.5, 0.9]$ :
mus = [ 0.1, 0.5, 0.9 ]
bras = [ Bernoulli(mu) for mu in mus ]
On peut prendre 10 échantillons de chaque bras, et vérifier leurs moyennes :
rd.seed(10000)
T = 10
exemples_echantillons = [ [ bras_k.tire() for _ in range(T) ] for bras_k in bras ]
exemples_echantillons
[[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0], [1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, 1.0, 0.0, 0.0], [0.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]]
np.mean(exemples_echantillons, axis=1)
array([ 0.1, 0.5, 0.9])
C'est assez proche de $\boldsymbol{\mu} = [\mu_1,\dots,\mu_K] = [0.1, 0.5, 0.9]$... Non ?
Comme on l'a dit plus haut, les algorithmes ont besoin de trois méthodes :
commence()
pour initialiser l'algorithme, une fois. Généralement, il s'agit de remettre à zero les vecteurs de mémoires internes de l'algorithme, et de mettre $t = 0$.A_t = choix()
pour choisir un bras, à chaque instant $t$, noté $A(t) \in \{1,\dots,K\}$. C'est la partie "intelligente" qui doit être conçue avec soin.recompense(A_t, r_k_t)
pour donner la récompense r_k_t
tirée du bras A_t
. Souvent, il suffit de mettre à jour les deux ou trois vecteurs internes.En fait, il faut aussi une méthode pour créer l'instance de la classe, i.e., une méthode __init__(K)
, qui demande de simplement connaître $K$, le nombre de bras.
Bien-sûr, les algorithmes ne doivent pas connaître $\boldsymbol{\mu} = [\mu_1,\dots,\mu_K]$ les paramètres du problème... Sinon l'apprentissage n'a aucun intérêt : il suffit de viser $k^* = \arg\max_k \mu_k$...
On va commencer par donner deux exemples naïfs :
Un algorithme "stupide" qui choisi un bras de façon complètement uniforme, $A^{1}(t) \sim U(1,\dots,K), \forall t$, à chaque instant $t \in \mathbb{N}$, via la classe ChoixUniforme
.
Un algorithme moins stupide, mais assez naïf, qui utilise un estimateur empirique $\widehat{\mu_k}(t) = \frac{X_k(t)}{N_k(t)}$ de la moyenne de chaque bras, et tire $A^{2}(t) \in \arg\max_k \widehat{\mu_k}(t)$ à chaque instant $t \in \mathbb{N}$. Ici, $X_k(t) = \sum_{\tau=0}^{t} \mathbb{1}(A(\tau) = k) r_k(\tau)$ compte les récompenses accumulées en tirant le bras $k$, sur les instants $t = 0,\dots,\tau$. Et $N_k(t) = \sum_{\tau=0}^{t} \mathbb{1}(A(\tau) = k)$ compte le nombre de sélections de ce bras $k$. Via la classe MoyenneEmpirique
.
ChoixUniforme
¶class ChoixUniforme(object):
"""Algorithme stupide, choix uniforme."""
def __init__(self, K):
"""Crée l'instance de l'algorithme."""
self.K = K
def commence(self):
"""Initialise l'algorithme : rien à faire ici."""
pass
def choix(self):
"""Choix uniforme d'un indice A(t) ~ U(1...K)."""
return rd.randint(0, self.K - 1)
def recompense(self, k, r):
"""Donne une récompense r tirée sur le bras k à l'algorithme : rien à faire ici."""
pass
MoyenneEmpirique
¶Voilà qui donne une bonne idée de la structure ("API") que vont devoir suivre les différentes algorithmes.
L'algorithme suivant est un peu plus complexe.
class MoyenneEmpirique(object):
"""Algorithme naïf, qui utilise la moyenne empirique."""
def __init__(self, K):
"""Crée l'instance de l'algorithme."""
self.K = K
# Il nous faut de la mémoire interne
self.recompenses = np.zeros(K) # X_k(t) pour chaque k
self.tirages = np.zeros(K) # N_k(t) pour chaque k
self.t = 0 # Temps t interne
def commence(self):
"""Initialise l'algorithme : remet à zeros chaque X_k et N_k, et t = 0."""
self.recompenses.fill(0)
self.tirages.fill(0)
self.t = 0
def choix(self):
"""Si on a vu tous les bras, on prend celui de moyenne empirique la plus grande."""
# 1er cas : il y a encore des bras qu'on a jamais vu
if np.min(self.tirages) == 0:
k = np.min(np.where(self.tirages == 0)[0])
# 2nd cas : tous les bras ont été essayé
else:
# Notez qu'on aurait pu ne stocker que ce vecteur moyennes_empiriques
moyennes_empiriques = self.recompenses / self.tirages
k = np.argmax(moyennes_empiriques)
self.t += 1 # Inutile ici
return k
def recompense(self, k, r):
"""Donne une récompense r tirée sur le bras k à l'algorithme : met à jour les deux vecteurs internes."""
self.recompenses[k] += r
self.tirages[k] += 1
Il s'agit d'une amélioration de l'algorithme précédent, où on utilise un autre indice.
Au lieu d'utiliser la moyenne empirique $g_k(t) = \widehat{\mu_k}(t) = \frac{X_k(t)}{N_k(t)}$ et $A(t) = \arg\max_k g_k(t)$, on utilise une borne supérieure d'un intervalle de confiance autour de cette moyenne : $$g'_k(t) = \widehat{\mu_k}(t) + \sqrt{\alpha \frac{\log t}{N_k(t)}}.$$ Et cet indice est toujours utilisé pour décider le bras à essayer à chaque instant : $$A^{\mathrm{UCB}1}(t) = \arg\max_k g'_k(t).$$
Il faut une constante $\alpha \geq 0$, qu'on choisira $\alpha \geq \frac12$ pour avoir des performances raisonnables. $\alpha$ contrôle le compromis entre exploitation et exploration, et ne doit pas être trop grand. $\alpha = 1$ est un bon choix par défaut.
On va gagner du temps en héritant de la classe
MoyenneEmpirique
précédente. Ça permet de ne pas réécrire les méthodes qui sont déjà bien écrites.
class UCB1(MoyenneEmpirique):
"""Algorithme UCB1."""
def __init__(self, K, alpha=1):
"""Crée l'instance de l'algorithme. Par défaut, alpha=1."""
super(UCB1, self).__init__(K) # On laisse la classe mère faire le travaille
assert alpha >= 0, "Erreur : alpha doit etre >= 0."
self.alpha = alpha
def choix(self):
"""Si on a vu tous les bras, on prend celui d'indice moyenne empirique + UCB le plus grand."""
self.t += 1 # Nécessaire ici
# 1er cas : il y a encore des bras qu'on a jamais vu
if np.min(self.tirages) == 0:
k = np.min(np.where(self.tirages == 0)[0])
# 2nd cas : tous les bras ont été essayé
else:
moyennes_empiriques = self.recompenses / self.tirages
ucb = np.sqrt(self.alpha * np.log(self.t) / np.log(self.tirages))
indices = moyennes_empiriques + ucb
k = np.argmax(indices)
return k
Ce petit article explique très bien l'approche bayésienne.
On a besoin de savoir manipuler des posteriors, qui seront les posteriors conjugués des distributions des bras.
Pour des bras de Bernoulli, le posterior conjugué associé est une loi Beta, notée $\mathrm{Beta}(\alpha,\beta)$ pour deux paramètres $\alpha,\beta > 0$.
from numpy.random import beta
class Beta():
"""Posteriors d'expériences de Bernoulli."""
def __init__(self):
self.N = [1, 1]
def reinitialise(self):
self.N = [1, 1]
def echantillon(self):
"""Un échantillon aléatoire de ce posterior Beta."""
return beta(self.N[1], self.N[0])
def observe(self, obs):
"""Ajoute une nouvelle observation. Si 'obs'=1, augmente alpha, sinon si 'obs'=0, augmente beta."""
self.N[int(obs)] += 1
Dès qu'on sait manipuler ces postériors Beta, on peut implémenter rapidement le dernier algorithme, Thompson Sampling.
Les paramètres du posterior sur $\mu_k$, i.e., $\alpha_k(t)$,$\beta_k(t)$ seront mis à jour à chaque étape pour compter le nombre d'observations réussies et échouées : $$ \alpha_k(t) = 1 + X_k(t) \\ \beta_k(t) = 1 + N_k(t) - X_k(t).$$
La moyenne empirique estimant $\mu_k$ sera, à l'instant $t$, $$ \widetilde{\mu_k}(t) = \frac{\alpha_k(t)}{\alpha_k(t) + \beta_k(t)} = \frac{1 + X_k(t)}{2 + N_k(t)} \simeq \frac{X_k(t)}{N_k(t)}.$$
La différence avec UCB1 est que la prise de décision de Thompson Sampling se fait sur un indice, tiré aléatoirement selon les posteriors. C'est une politique d'indice randomisée.
D'un point de vue bayésien, un modèle est tiré selon les posteriors, puis on joue selon le meilleur modèle : $$ g''_k(t) \sim \mathrm{Beta}(\alpha_k(t), \beta_k(t)) \\ A^{\mathrm{TS}}(t) = \arg\max_k g''_k(t). $$
class ThompsonSampling(MoyenneEmpirique):
"""Algorithme Thompson Sampling."""
def __init__(self, K, posterior=Beta):
"""Crée l'instance de l'algorithme. Par défaut, alpha=1."""
self.K = K
# On créé K posteriors
self.posteriors = [posterior() for k in range(K)]
def commence(self):
"""Réinitialise les K posteriors."""
for posterior in self.posteriors:
posterior.reinitialise()
def choix(self):
"""On tire K modèles depuis les posteriors, et on joue dans le meilleur."""
moyennes_estimees = [posterior.echantillon() for posterior in self.posteriors]
k = np.argmax(moyennes_estimees)
return k
def recompense(self, k, r):
"""Observe cette récompense r sur le bras k en mettant à jour le kième posterior."""
self.posteriors[k].observe(r)
On va comparer, sur deux problèmes, les 4 algorithmes définis plus haut.
Les problèmes sont caractérisés par les moyennes des bras de Bernoulli, $\boldsymbol{\mu} = [\mu_1,\dots,\mu_K]$, et on les suppose ordonnées par ordre décroissant : $\mu_1 > \mu_2 \ge \dots \ge \mu_K$.
On affichera plusieurs choses, dans des graphiques au cours du temps $t = 0, \dots, T$ pour un horizon $T = 1000$ ou $T = 5000$ étapes :
On définit 4 fonctions d'affichage pour ces quantités.
mpl.rcParams['figure.figsize'] = (15, 8)
def affiche_selections(choix, noms, kstar=0):
plt.figure()
for i, c in enumerate(choix):
selection_kstar = 1.0 * (c == kstar)
selection_moyenne = np.cumsum(selection_kstar) / np.cumsum(np.ones_like(c))
plt.plot(selection_moyenne, label=noms[i])
plt.legend()
plt.xlabel("Temps discret, $t = 1, ..., T = {}$".format(len(choix[0])))
plt.ylabel("Taux de sélection")
plt.title("Sélection du meilleur bras #{} pour différents algorithmes".format(1 + kstar))
plt.show()
def affiche_recompenses(recompenses, noms):
plt.figure()
for i, r in enumerate(recompenses):
recompense_accumulee = np.cumsum(r)
plt.plot(recompense_accumulee, label=noms[i])
plt.legend()
plt.xlabel("Temps discret, $t = 1, ..., T = {}$".format(len(choix[0])))
plt.ylabel("Récompenses accumulées")
plt.title("Récompenses accumulées pour différents algorithmes")
plt.show()
def affiche_recompenses_moyennes(recompenses, noms):
plt.figure()
for i, r in enumerate(recompenses):
recompense_moyenne = np.cumsum(r) / np.cumsum(np.ones_like(r))
plt.plot(recompense_moyenne, label=noms[i])
plt.legend()
plt.xlabel("Temps discret, $t = 1, ..., T = {}$".format(len(choix[0])))
plt.ylabel(r"Récompenses moyennes $\in [0, 1]$")
plt.title("Récompenses moyennes pour différents algorithmes")
plt.show()
def affiche_regret(recompenses, noms, mustar=1):
plt.figure()
for i, r in enumerate(recompenses):
recompense_accumulee = np.cumsum(r)
regret = mustar * np.cumsum(np.ones_like(r)) - recompense_accumulee
plt.plot(regret, label=noms[i])
plt.legend()
plt.xlabel("Temps discret, $t = 1, ..., T = {}$".format(len(choix[0])))
plt.ylabel("Regret")
plt.title("Regret accumulé pour différents algorithmes")
plt.show()
On reprend le problème donné plus haut :
horizon = 1000
mus = [0.9, 0.5, 0.1]
bras = [ Bernoulli(mu) for mu in mus ]
K = len(mus)
kstar = np.argmax(mus) # = 0
horizon, mus, bras, K, kstar
(1000, [0.9, 0.5, 0.1], [<__main__.Bernoulli at 0x7f81b7636470>, <__main__.Bernoulli at 0x7f81b76364e0>, <__main__.Bernoulli at 0x7f81b7636518>], 3, 0)
algorithmes = [ChoixUniforme(K), MoyenneEmpirique(K), UCB1(K, alpha=1), ThompsonSampling(K)]
algorithmes
[<__main__.ChoixUniforme at 0x7f81b7636048>, <__main__.MoyenneEmpirique at 0x7f81b76360b8>, <__main__.UCB1 at 0x7f81b7636080>, <__main__.ThompsonSampling at 0x7f81b76360f0>]
Pour les légendes, on a besoin des noms des algorithmes :
noms = ["ChoixUniforme", "MoyenneEmpirique", "UCB1(alpha=1)", "ThompsonSampling"]
On peut commencer la simulation, pour chaque algorithme.
%%time
N = len(algorithmes)
recompenses, choix = np.zeros((N, horizon)), np.zeros((N, horizon))
for i, alg in tqdm(enumerate(algorithmes), desc="Algorithmes"):
rec, ch = simulation(bras, alg, horizon)
recompenses[i] = rec
choix[i] = ch
CPU times: user 76 ms, sys: 12 ms, total: 88 ms Wall time: 82.4 ms
/usr/local/lib/python3.5/dist-packages/ipykernel_launcher.py:19: RuntimeWarning: divide by zero encountered in true_divide
recompenses, choix
(array([[ 1., 0., 0., ..., 0., 0., 0.], [ 1., 0., 0., ..., 1., 1., 1.], [ 0., 0., 0., ..., 1., 1., 1.], [ 0., 1., 1., ..., 1., 1., 1.]]), array([[ 0., 1., 1., ..., 2., 1., 2.], [ 0., 1., 2., ..., 0., 0., 0.], [ 0., 1., 2., ..., 0., 0., 0.], [ 1., 0., 0., ..., 0., 0., 0.]]))
On affiche et vérifie les résultats attendus :
affiche_selections(choix, noms, kstar)
affiche_recompenses(recompenses, noms)
affiche_recompenses_moyennes(recompenses, noms)
affiche_regret(recompenses, noms, mustar=mus[kstar])
$\implies$ L'algorithme uniforme est, bien évidemment, très inefficace ! Il empêche de visualiser le regret des trois autres algorithmes.
Et sur ce problème simple à trois bras, avec une seule simulation (donc une variance immense), difficile de savoir lequel des trois algorithmes est le plus efficace...
horizon = 5000
mus = [0.9, 0.8, 0.7, 0.6, 0.5, 0.4, 0.3, 0.2, 0.1]
bras = [ Bernoulli(mu) for mu in mus ]
K = len(mus)
kstar = np.argmax(mus) # = 0
horizon, mus, bras, K, kstar
(5000, [0.9, 0.8, 0.7, 0.6, 0.5, 0.4, 0.3, 0.2, 0.1], [<__main__.Bernoulli at 0x7f53800e3518>, <__main__.Bernoulli at 0x7f53800e34e0>, <__main__.Bernoulli at 0x7f53800e30b8>, <__main__.Bernoulli at 0x7f53800e30f0>, <__main__.Bernoulli at 0x7f53800e34a8>, <__main__.Bernoulli at 0x7f53800e3080>, <__main__.Bernoulli at 0x7f53800e36a0>, <__main__.Bernoulli at 0x7f53800e36d8>, <__main__.Bernoulli at 0x7f53800e3710>], 9, 0)
On va comparer différents choix de $\alpha$ pour l'algorithme UCB : $\alpha = 4, 1, 0.5, 0.1$.
algorithmes = [MoyenneEmpirique(K), UCB1(K, alpha=4), UCB1(K, alpha=1), UCB1(K, alpha=0.5), UCB1(K, alpha=0.1), ThompsonSampling(K)]
algorithmes
[<__main__.MoyenneEmpirique at 0x7f53800e3ac8>, <__main__.UCB1 at 0x7f53800e3978>, <__main__.UCB1 at 0x7f53800e3390>, <__main__.UCB1 at 0x7f53800e3c18>, <__main__.UCB1 at 0x7f53800f9eb8>, <__main__.ThompsonSampling at 0x7f53800f9080>]
Pour les légendes, on a besoin des noms des algorithmes :
noms = ["MoyenneEmpirique", "UCB1(alpha=4)", "UCB1(alpha=1)", "UCB1(alpha=0.5)", "UCB1(alpha=0.1)", "ThompsonSampling"]
On peut commencer la simulation, pour chaque algorithme.
%%time
N = len(algorithmes)
recompenses, choix = np.zeros((N, horizon)), np.zeros((N, horizon))
for i, alg in tqdm(enumerate(algorithmes), desc="Algorithmes"):
rec, ch = simulation(bras, alg, horizon)
recompenses[i] = rec
choix[i] = ch
Widget Javascript not detected. It may not be installed or enabled properly.
/usr/local/lib/python3.5/dist-packages/ipykernel_launcher.py:20: RuntimeWarning: divide by zero encountered in true_divide
CPU times: user 628 ms, sys: 112 ms, total: 740 ms Wall time: 616 ms
recompenses, choix
(array([[ 0., 1., 0., ..., 0., 1., 1.], [ 1., 1., 1., ..., 1., 1., 1.], [ 1., 1., 0., ..., 1., 1., 1.], [ 1., 0., 1., ..., 1., 1., 1.], [ 0., 0., 1., ..., 0., 1., 1.], [ 0., 1., 1., ..., 1., 1., 0.]]), array([[ 0., 1., 2., ..., 1., 1., 1.], [ 0., 1., 2., ..., 0., 0., 0.], [ 0., 1., 2., ..., 0., 0., 0.], [ 0., 1., 2., ..., 0., 0., 0.], [ 0., 1., 2., ..., 0., 0., 0.], [ 4., 1., 1., ..., 0., 0., 0.]]))
On affiche et vérifie les résultats attendus :
affiche_selections(choix, noms, kstar)
On commence à voir une différence de vitesse de convergence, pour l'identification du meilleur bras.
affiche_recompenses(recompenses, noms)
Difficile de différence quel algorithme est le plus efficace, même si clairement les plus grandes valeurs de $\alpha = 4, 1$ pour UCB semblent avoir moins bien fonctionner.
affiche_recompenses_moyennes(recompenses, noms)
En terme de récompenses moyennes au cours du temps, les 4 algorithmes les plus efficaces convergent vers $\mu^* = \mu_1 = 0.9$.
affiche_regret(recompenses, noms, mustar=mus[kstar])
Encore une fois, une seule simulation ne permet pas vraiment de conclure...
On s'intéresse au même problème, mais simulé 100 fois et sur un horizon plus long ($T = 10000$).
On s'intéressera ensuite aux statistiques moyennes sur ces 100 répétitions pour les graphiques.
horizon = 10000
repetitions = 100
mus = [0.9, 0.8, 0.7, 0.6, 0.5, 0.4, 0.3, 0.2, 0.1]
bras = [ Bernoulli(mu) for mu in mus ]
K = len(mus)
kstar = np.argmax(mus) # = 0
horizon, repetitions, mus, bras, K, kstar
(10000, 100, [0.9, 0.8, 0.7, 0.6, 0.5, 0.4, 0.3, 0.2, 0.1], [<__main__.Bernoulli at 0x7f538005aa58>, <__main__.Bernoulli at 0x7f538005ae10>, <__main__.Bernoulli at 0x7f538005ae80>, <__main__.Bernoulli at 0x7f538005af98>, <__main__.Bernoulli at 0x7f538005ad30>, <__main__.Bernoulli at 0x7f538005a080>, <__main__.Bernoulli at 0x7f538005a0f0>, <__main__.Bernoulli at 0x7f538005af28>, <__main__.Bernoulli at 0x7f538005a588>], 9, 0)
On va comparer différents choix de $\alpha$ pour l'algorithme UCB : $\alpha = 4, 1, 0.5, 0.1$.
algorithmes = [MoyenneEmpirique(K), UCB1(K, alpha=4), UCB1(K, alpha=1), UCB1(K, alpha=0.5), UCB1(K, alpha=0.1), ThompsonSampling(K)]
algorithmes
[<__main__.MoyenneEmpirique at 0x7f538005a5f8>, <__main__.UCB1 at 0x7f538005af60>, <__main__.UCB1 at 0x7f538005a518>, <__main__.UCB1 at 0x7f538005a1d0>, <__main__.UCB1 at 0x7f538005a9b0>, <__main__.ThompsonSampling at 0x7f538005a978>]
Pour les légendes, on a besoin des noms des algorithmes :
noms = ["MoyenneEmpirique", "UCB1(alpha=4)", "UCB1(alpha=1)", "UCB1(alpha=0.5)", "UCB1(alpha=0.1)", "ThompsonSampling"]
On peut commencer la simulation, pour chaque algorithme.
%%time
N = len(algorithmes)
recompenses, choix = np.zeros((N, horizon)), np.zeros((N, horizon))
# Pour chaque répétitions
for rep in tqdm(range(repetitions), desc="Répétitions"):
for i, alg in enumerate(algorithmes):
rec, ch = simulation(bras, alg, horizon)
recompenses[i] += rec
choix[i] += ch
# On moyenne
recompenses /= repetitions
choix /= repetitions
Widget Javascript not detected. It may not be installed or enabled properly.
/usr/local/lib/python3.5/dist-packages/ipykernel_launcher.py:20: RuntimeWarning: divide by zero encountered in true_divide
CPU times: user 1min 37s, sys: 6.76 s, total: 1min 44s Wall time: 1min 37s
Cette fois, les statistiques accumulées ne sont plus entières.
recompenses, choix
(array([[ 0.94, 0.73, 0.73, ..., 0.9 , 0.83, 0.8 ], [ 0.89, 0.79, 0.73, ..., 0.79, 0.81, 0.74], [ 0.9 , 0.74, 0.76, ..., 0.91, 0.86, 0.86], [ 0.88, 0.81, 0.8 , ..., 0.89, 0.88, 0.91], [ 0.92, 0.87, 0.74, ..., 0.89, 0.87, 0.88], [ 0.54, 0.39, 0.49, ..., 0.85, 0.9 , 0.95]]), array([[ 0. , 1. , 2. , ..., 0.22, 0.22, 0.22], [ 0. , 1. , 2. , ..., 1.18, 0.9 , 1.18], [ 0. , 1. , 2. , ..., 0.25, 0.23, 0.29], [ 0. , 1. , 2. , ..., 0.13, 0.07, 0.08], [ 0. , 1. , 2. , ..., 0.02, 0.02, 0.02], [ 3.87, 4.16, 3.74, ..., 0. , 0. , 0. ]]))
On affiche et vérifie les résultats attendus :
choix = np.floor(choix)
affiche_selections(choix, noms, kstar)
On commence à voir une différence de vitesse de convergence, pour l'identification du meilleur bras.
affiche_recompenses(recompenses, noms)
Difficile de différence quel algorithme est le plus efficace, même si clairement les plus grandes valeurs de $\alpha = 4, 1$ pour UCB semblent avoir moins bien fonctionner.
affiche_recompenses_moyennes(recompenses, noms)
En terme de récompenses moyennes au cours du temps, les 4 algorithmes les plus efficaces convergent vers $\mu^* = \mu_1 = 0.9$.
affiche_regret(recompenses, noms, mustar=mus[kstar])
Cette fois, ces 100 simulations permet de conclure quelques points :
MoyenneEmpirique
converge rapidement au début, mais son regret $\mathcal{R}(t)$ ne semble pas avoir un profil logarithmique (il faudrait essayer de plus grands horizons pour le confirmer),UCB
semblent tous avoir un regret logarithmique, i.e., $\mathcal{R}(t) = \mathcal{O}(\log t)$, et plus le $\alpha$ est petit, plus le regret est bas,ThompsonSampling
fonctionne très bien, sans avoir à choisir de constante $\alpha$.On s'intéresse enfin à un problème bien plus difficile, simulé 1000 fois et sur un horizon plus long ($T = 10000$).
horizon = 10000
repetitions = 1000
mus = [0.3, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.02, 0.02, 0.02, 0.01, 0.01, 0.005, 0.005, 0.001, 0.001]
bras = [ Bernoulli(mu) for mu in mus ]
K = len(mus)
kstar = np.argmax(mus)
horizon, repetitions, mus, bras, K, kstar
(10000, 1000, [0.3, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.02, 0.02, 0.02, 0.01, 0.01, 0.005, 0.005, 0.001, 0.001], [<__main__.Bernoulli at 0x7f537bf349b0>, <__main__.Bernoulli at 0x7f537bf34f28>, <__main__.Bernoulli at 0x7f537bf34160>, <__main__.Bernoulli at 0x7f537bf34e48>, <__main__.Bernoulli at 0x7f538015e390>, <__main__.Bernoulli at 0x7f538015e3c8>, <__main__.Bernoulli at 0x7f538015e5c0>, <__main__.Bernoulli at 0x7f538015e630>, <__main__.Bernoulli at 0x7f538015ef28>, <__main__.Bernoulli at 0x7f538015ee80>, <__main__.Bernoulli at 0x7f538015ee48>, <__main__.Bernoulli at 0x7f538015e828>, <__main__.Bernoulli at 0x7f538015ed30>, <__main__.Bernoulli at 0x7f538015e588>, <__main__.Bernoulli at 0x7f538015e128>, <__main__.Bernoulli at 0x7f538015e7b8>], 16, 0)
On va comparer différents choix de $\alpha$ pour l'algorithme UCB : $\alpha = 4, 1, 0.5, 0.1, 0.01, 0.001$.
algorithmes = [MoyenneEmpirique(K), UCB1(K, alpha=4), UCB1(K, alpha=1),
UCB1(K, alpha=0.5), UCB1(K, alpha=0.1), UCB1(K, alpha=0.01),
UCB1(K, alpha=0.001), ThompsonSampling(K)]
algorithmes
[<__main__.MoyenneEmpirique at 0x7f53800ef9b0>, <__main__.UCB1 at 0x7f53800ef748>, <__main__.UCB1 at 0x7f53800ef780>, <__main__.UCB1 at 0x7f53800ef7f0>, <__main__.UCB1 at 0x7f53800ef630>, <__main__.UCB1 at 0x7f53800ef198>, <__main__.UCB1 at 0x7f538015ef98>, <__main__.ThompsonSampling at 0x7f538015e780>]
Pour les légendes, on a besoin des noms des algorithmes :
noms = ["MoyenneEmpirique", "UCB1(alpha=4)", "UCB1(alpha=1)",
"UCB1(alpha=0.5)", "UCB1(alpha=0.1)", "UCB1(alpha=0.01)",
"UCB1(alpha=0.001)", "ThompsonSampling"]
On peut commencer la simulation, pour chaque algorithme.
%%time
N = len(algorithmes)
recompenses, choix = np.zeros((N, horizon)), np.zeros((N, horizon))
# Pour chaque répétitions
for rep in tqdm(range(repetitions), desc="Répétitions"):
for i, alg in enumerate(algorithmes):
rec, ch = simulation(bras, alg, horizon)
recompenses[i] += rec
choix[i] += ch
# On moyenne
recompenses /= repetitions
choix /= repetitions
Widget Javascript not detected. It may not be installed or enabled properly.
/usr/local/lib/python3.5/dist-packages/ipykernel_launcher.py:20: RuntimeWarning: divide by zero encountered in true_divide
CPU times: user 22min 48s, sys: 1min 4s, total: 23min 52s Wall time: 22min 48s
Cette fois, les statistiques accumulées ne sont plus entières.
recompenses, choix
(array([[ 0.308, 0.057, 0.046, ..., 0.254, 0.236, 0.238], [ 0.297, 0.048, 0.056, ..., 0.113, 0.113, 0.124], [ 0.292, 0.057, 0.048, ..., 0.225, 0.232, 0.237], ..., [ 0.292, 0.061, 0.038, ..., 0.303, 0.299, 0.312], [ 0.301, 0.059, 0.048, ..., 0.253, 0.261, 0.275], [ 0.025, 0.045, 0.041, ..., 0.292, 0.316, 0.313]]), array([[ 0. , 1. , 2. , ..., 0.983, 0.983, 0.983], [ 0. , 1. , 2. , ..., 5.258, 5.139, 5.099], [ 0. , 1. , 2. , ..., 2.106, 2.011, 1.99 ], ..., [ 0. , 1. , 2. , ..., 0. , 0. , 0. ], [ 0. , 1. , 2. , ..., 0.591, 0.591, 0.591], [ 7.842, 7.398, 7.347, ..., 0.026, 0.037, 0.022]]))
On affiche et vérifie les résultats attendus :
choix = np.floor(choix)
affiche_selections(choix, noms, kstar)
On commence à voir une différence de vitesse de convergence, pour l'identification du meilleur bras.
affiche_recompenses(recompenses, noms)
Difficile de différence quel algorithme est le plus efficace, même si clairement les plus grandes valeurs de $\alpha = 4, 1$ pour UCB semblent avoir moins bien fonctionner.
affiche_recompenses_moyennes(recompenses, noms)
En terme de récompenses moyennes au cours du temps, les 4 algorithmes les plus efficaces convergent vers $\mu^* = \mu_1 = 0.9$.
affiche_regret(recompenses, noms, mustar=mus[kstar])
Cette fois, ces 1000 simulations permet de renforcer les conclusions précédentes :
MoyenneEmpirique
converge rapidement au début, mais son regret $\mathcal{R}(t)$ ne semble pas avoir un profil logarithmique (il faudrait essayer de plus grands horizons pour le confirmer),UCB1
semblent tous avoir un regret logarithmique, i.e., $\mathcal{R}(t) = \mathcal{O}(\log t)$, et plus le $\alpha$ est petit, plus le regret est bas,UCB1(alpha)
n'est pas assez exploiteur, et pour ce problème à $K=15$ bras, il ne fonctionne plus bien,ThompsonSampling
fonctionne très bien, sans avoir à choisir de constante $\alpha$.Je propose de terminer par l'implémentation d'un dernier algorithme, ressemblant beaucoup à UCB1
, mais qui utilise des indices un peu plus compliqués à calculer.
Il s'agit de l'algorithme proposé en 2016 par Tor Lattimore, dans cet article (et sa présentation à COLT'16), nommé ApproximatedFHGittins
, pour Approximated Finite-Horizon Gittins index, ou Approximation des Indices de Gittins à Horizon Fini en français.
Cet algorithme exige de connaître l'horizon $T$, et calcule l'indice suivant, au temps $t$ et après $N_k(t)$ sélections du bras $k$ : $$ I_k(t) = \frac{X_k(t)}{N_k(t)} + \sqrt{\frac{\alpha}{2 N_k(t)} \log\left( \frac{m}{N_k(t) \log^{1/2}\left( \frac{m}{N_k(t)} \right)} \right)}, \\ \text{où}\;\; m = T - t + 1.$$
class ApproximatedFHGittins(MoyenneEmpirique):
"""Algorithme ApproximatedFHGittins de Tor Lattimore."""
def __init__(self, K, horizon, alpha=1):
"""Crée l'instance de l'algorithme. Par défaut, alpha=1."""
super(ApproximatedFHGittins, self).__init__(K) # On laisse la classe mère faire le travaille
self.horizon = horizon
assert alpha >= 0, "Erreur : alpha doit etre >= 0."
self.alpha = alpha
def choix(self):
"""Si on a vu tous les bras, on prend celui d'indice le plus grand."""
self.t += 1 # Nécessaire ici
# 1er cas : il y a encore des bras qu'on a jamais vu
if np.min(self.tirages) == 0:
k = np.min(np.where(self.tirages == 0)[0])
# 2nd cas : tous les bras ont été essayé
else:
# D'abord les moyennes
moyennes_empiriques = self.recompenses / self.tirages
# Puis le terme supplémentaire
m = self.horizon - self.t + 1
m_sur_Nk = m / self.tirages
logdemi = np.sqrt(np.maximum(0, np.log(m_sur_Nk)))
terme_sup = np.sqrt(self.alpha * np.log(m_sur_Nk / logdemi) / (2. * self.tirages))
indices = moyennes_empiriques + terme_sup
k = np.argmax(indices)
return k
ApproximatedFHGittins
avec les autres algorithmes¶Sur le même problème que précédemment, on va vérifier que ce dernier algorithme est plus efficace que les autres.
horizon = 10000
repetitions = 1000
mus = [0.3, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.02, 0.02, 0.02, 0.01, 0.01, 0.005, 0.005, 0.001, 0.001]
bras = [ Bernoulli(mu) for mu in mus ]
K = len(mus)
kstar = np.argmax(mus)
horizon, repetitions, mus, bras, K, kstar
(10000, 1000, [0.3, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.02, 0.02, 0.02, 0.01, 0.01, 0.005, 0.005, 0.001, 0.001], [<__main__.Bernoulli at 0x7f81b6cb1c50>, <__main__.Bernoulli at 0x7f81b6cb1c18>, <__main__.Bernoulli at 0x7f81b6cb15f8>, <__main__.Bernoulli at 0x7f81b6cb1550>, <__main__.Bernoulli at 0x7f81b6cb1828>, <__main__.Bernoulli at 0x7f81b6cb1128>, <__main__.Bernoulli at 0x7f81b6cb13c8>, <__main__.Bernoulli at 0x7f81b6cb1390>, <__main__.Bernoulli at 0x7f81b6cb1400>, <__main__.Bernoulli at 0x7f81b6cb17f0>, <__main__.Bernoulli at 0x7f81b6cb1780>, <__main__.Bernoulli at 0x7f81b6cb1208>, <__main__.Bernoulli at 0x7f81b6cb10f0>, <__main__.Bernoulli at 0x7f81b6cb1198>, <__main__.Bernoulli at 0x7f81b6cb1710>, <__main__.Bernoulli at 0x7f81b6cb16d8>], 16, 0)
algorithmes = [UCB1(K, alpha=1), UCB1(K, alpha=0.5),
ThompsonSampling(K),
ApproximatedFHGittins(K, 1.1 * T, alpha=4),
ApproximatedFHGittins(K, 1.1 * T, alpha=1),
ApproximatedFHGittins(K, 1.1 * T, alpha=0.5)
]
algorithmes
[<__main__.UCB1 at 0x7f81b6cb52b0>, <__main__.UCB1 at 0x7f81b6cb5128>, <__main__.ThompsonSampling at 0x7f81b6cb5048>, <__main__.ApproximatedFHGittins at 0x7f81b6cb5c18>, <__main__.ApproximatedFHGittins at 0x7f81b6cb5a58>, <__main__.ApproximatedFHGittins at 0x7f81b6cb5400>]
noms = ["UCB1(alpha=1)", "UCB1(alpha=0.5)",
"ThompsonSampling",
"ApproximatedFHGittins(T, alpha=4)",
"ApproximatedFHGittins(T, alpha=1)",
"ApproximatedFHGittins(T, alpha=0.5)"
]
On peut commencer la simulation, pour chaque algorithme.
%%time
N = len(algorithmes)
recompenses, choix = np.zeros((N, horizon)), np.zeros((N, horizon))
# Pour chaque répétitions
for rep in tqdm(range(repetitions), desc="Répétitions"):
for i, alg in enumerate(algorithmes):
rec, ch = simulation(bras, alg, horizon)
recompenses[i] += rec
choix[i] += ch
# On moyenne
recompenses /= repetitions
choix /= repetitions
/usr/local/lib/python3.5/dist-packages/ipykernel_launcher.py:19: RuntimeWarning: divide by zero encountered in true_divide /usr/local/lib/python3.5/dist-packages/ipykernel_launcher.py:24: RuntimeWarning: invalid value encountered in log
CPU times: user 22min 19s, sys: 1min 9s, total: 23min 28s Wall time: 22min 18s
Cette fois, les statistiques accumulées ne sont plus entières.
recompenses, choix
(array([[ 0.294, 0.053, 0.043, ..., 0.229, 0.235, 0.229], [ 0.317, 0.044, 0.046, ..., 0.267, 0.291, 0.262], [ 0.048, 0.05 , 0.053, ..., 0.322, 0.29 , 0.305], [ 0.299, 0.038, 0.056, ..., 0.288, 0.294, 0.302], [ 0.328, 0.062, 0.043, ..., 0.301, 0.3 , 0.311], [ 0.333, 0.05 , 0.057, ..., 0.32 , 0.316, 0.303]]), array([[ 0. , 1. , 2. , ..., 1.665, 1.795, 1.602], [ 0. , 1. , 2. , ..., 0.577, 0.601, 0.679], [ 7.457, 7.419, 7.478, ..., 0.017, 0.031, 0.069], [ 0. , 1. , 2. , ..., 0. , 0. , 0. ], [ 0. , 1. , 2. , ..., 0. , 0. , 0. ], [ 0. , 1. , 2. , ..., 0. , 0. , 0. ]]))
On affiche et vérifie les résultats attendus :
choix = np.floor(choix)
affiche_selections(choix, noms, kstar)
On commence à voir une différence de vitesse de convergence, pour l'identification du meilleur bras.
affiche_recompenses(recompenses, noms)
Difficile de différence quel algorithme est le plus efficace, même si clairement les plus grandes valeurs de $\alpha = 4, 1$ pour UCB semblent avoir moins bien fonctionner.
affiche_recompenses_moyennes(recompenses, noms)
En terme de récompenses moyennes au cours du temps, les 4 algorithmes les plus efficaces convergent vers $\mu^* = \mu_1 = 0.9$.
affiche_regret(recompenses, noms, mustar=mus[kstar])
affiche_regret(recompenses[2:], noms[2:], mustar=mus[kstar])
Merci d'avoir lu jusqu'ici...
J'espère que ce petit document a pu vous aider à mieux comprendre les bases de la simulation et l'implémentation de problèmes et d'algorithmes de bandits.
J'ai adopté une approche très modulaire, pour chaque composant de la simulation (bras, algorithmes, fonctions de simulation, d'affichages etc). C'est une façon de faire, bien-sûr ce n'est pas nécessaire, mais ça me semble clair, efficace et facile à lire et à compléter (si vous voulez rajouter un algorithme de plus).
- Pour plus de détails, je recommande de lire en détails ce petit article, datant de 2017, en français, écrit par Émilie Kaufmann.
- Cette petite simulation s'inspire de mon environnement plus lourd et plus complet, AlgoBandits.