#!/usr/bin/env python # coding: utf-8 # In[1]: import sklearn.mixture import numpy as np import scipy as sp from keras import Input from keras.layers import Dense, Lambda, Layer, Activation from keras.models import Model from keras.layers.normalization import BatchNormalization import keras.backend as K import matplotlib.pyplot as plt import autograd.numpy.random as npr import pylab from keras import backend from sklearn.metrics import adjusted_rand_score get_ipython().run_line_magic('matplotlib', 'inline') from sklearn.metrics import accuracy_score # In[2]: def make_pinwheel_data(radial_std, tangential_std, num_classes, num_per_class, rate): rads = np.linspace(0, 2*np.pi, num_classes, endpoint=False) features = npr.randn(num_classes*num_per_class, 2) \ * np.array([radial_std, tangential_std]) features[:,0] += 1. labels = np.repeat(np.arange(num_classes), num_per_class) angles = rads[labels] + rate * np.exp(features[:,0]) rotations = np.stack([np.cos(angles), -np.sin(angles), np.sin(angles), np.cos(angles)]) rotations = np.reshape(rotations.T, (-1, 2, 2)) return 10*np.einsum('ti,tij->tj', features, rotations),labels # In[3]: num_clusters = 5 # number of clusters in pinwheel data samples_per_cluster = 200 # number of samples per cluster in pinwheel K = 15 # number of components in mixture model N = 2 # number of latent dimensions P = 2 # number of observation dimensions batch_size = num_clusters * samples_per_cluster data, label = make_pinwheel_data(0.3, 0.05, num_clusters, samples_per_cluster, 0.25) plt.figure(figsize=(10,10)) pylab.scatter(data[:,0],data[:,1],c=label) # In[4]: def vae_loss(y_true, y_pred): # 入力と出力の交差エントロピー xent_loss = backend.sum((backend.square(y_true - y_mean))/(backend.exp(y_log_var)) + backend.log(np.pi * 2) + y_log_var, axis=-1) * 0.5 # 事前分布と事後分布のKL情報量 kl_loss = backend.sum((backend.square(z - z_mean))/(backend.exp(z_log_var)) + backend.log(np.pi * 2) + z_log_var, axis=-1) * 0.5 return backend.mean(xent_loss - kl_loss - gmm_loss) def sampling(args): mean, log_var = args epsilon = backend.random_normal(shape=backend.shape(mean), mean=0.,stddev=1.) return mean + backend.exp(log_var / 2) * epsilon # In[5]: original_dim = data.shape[1] intermediate_dim = 100 latent_dim = 2 x = Input(shape=(original_dim,)) gmm_loss = Input(shape=(1,)) decoder = Dense(intermediate_dim)(x) decoder = BatchNormalization()(decoder) decoder = Activation('tanh')(decoder) z_mean = Dense(latent_dim,activation='linear')(decoder) z_log_var = Dense(latent_dim,activation='linear')(decoder) z = Lambda(sampling, output_shape=(latent_dim,))([z_mean, z_log_var]) decoder = Dense(intermediate_dim)(z) decoder = BatchNormalization()(decoder) decoder = Activation('tanh')(decoder) y_mean = Dense(original_dim,activation='linear')(decoder) y_log_var = Dense(original_dim,activation='linear')(decoder) y = Lambda(sampling, output_shape=(original_dim,))([y_mean, y_log_var]) VAE = Model(input=[x,gmm_loss], output=y) VAE_latent = Model(input=x, output=z) VAE_latent_mean = Model(input=x, output=z_mean) VAE_latent_var = Model(input=x, output=z_log_var) VAE_predict_mean = Model(input=x, output=y_mean) VAE_predict_var = Model(input=x, output=y_log_var) VAE.compile(optimizer='adam', loss=vae_loss) VAE.summary() # In[10]: from scipy.stats import multivariate_normal epoch = 500 latent = VAE_latent.predict(data) gmm = sklearn.mixture.GaussianMixture(n_components=num_clusters, covariance_type='full', max_iter=1) for i in range(1): gmm.fit(latent) pi_i = gmm.predict_proba(latent) mu_ = gmm.means_ sigma_ = gmm.covariances_ liklyhood = np.zeros(data.shape[0]) for m,s,pi in zip(mu_,sigma_,pi_i.T): liklyhood += pi*multivariate_normal.logpdf(latent,m,s) for i in range(epoch): loss = VAE.train_on_batch([data,liklyhood],data) latent = VAE_latent.predict(data) gmm.fit(latent) liklyhood = np.zeros(data.shape[0]) for m,s,pi in zip(mu_,sigma_,pi_i.T): liklyhood += pi*multivariate_normal.logpdf(latent,m,s) print(i,loss,adjusted_rand_score(gmm.predict(latent),label),liklyhood.mean()) # In[11]: import matplotlib as mpl plt.figure(figsize=(10,10)) ax = plt.axes() for n in set(gmm.predict(latent)): covariances = gmm.covariances_[n][:2, :2] v, w = np.linalg.eigh(covariances) u = w[0] / np.linalg.norm(w[0]) angle = np.arctan2(u[1], u[0]) angle = 180 * angle / np.pi # convert to degrees v = 2. * np.sqrt(2.) * np.sqrt(v) ell = mpl.patches.Ellipse(gmm.means_[n, :2], v[0], v[1], 180 + angle, color='k') ell.set_alpha(0.2) ax.add_patch(ell) pylab.scatter(latent[:,0],latent[:,1],c=gmm.predict(latent)) # In[12]: plt.figure(figsize=(10,10)) pylab.scatter(VAE_predict_mean.predict(data)[:,0],VAE_predict_mean.predict(data)[:,1],c=label) # In[13]: gmm.predict(latent) # In[14]: import pandas as pd result = gmm.predict(latent) df = pd.DataFrame({ 'result' : gmm.predict(latent),'label' : label}) iremono = np.zeros((5,5)) for i in range(5): acc = df[df['label']==i] for j in range(5): iremono[i,j] = len(acc[acc['result']==j]) acc = iremono.max(1).sum()/len(label) print(acc) # In[ ]: # In[ ]: