#!/usr/bin/env python # coding: utf-8 # In[18]: import numpy as np from scipy.io import loadmat from sklearn.linear_model import LinearRegression from sklearn.decomposition import PCA from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler from sklearn.metrics import mean_squared_error,explained_variance_score from lightgbm import LGBMRegressor from sklearn.ensemble import RandomForestRegressor from sklearn.model_selection import train_test_split from sklearn.neural_network import MLPRegressor from sklearn.datasets import load_boston from keras.wrappers.scikit_learn import KerasRegressor from keras.models import Sequential from keras.layers import Dense,Dropout import keras from mpl_toolkits.mplot3d import Axes3D import matplotlib.pyplot as plt # In[19]: scaler = StandardScaler() forest = RandomForestRegressor(n_estimators=100,n_jobs=-1) linreg = LinearRegression(normalize=True) boosting = LGBMRegressor() pca = PCA(3) # ## Цель вычислительного эксперимента: # Необходимо решить задачу регрессии с использованием моделей: линейная регрессия, линейная регрессия + метод главных компонент, простая нейросеть и критериями качества: квадратичная ошибка, число обусловленности. # ## Описание выборок: # Используется датасет ECoG (electrocorticographic data), уже разбитый на выборки обучение/контроль. # ## Блок загрузки и предобработки выборок: # In[20]: X_train = loadmat('ECoG_X_train.mat')['X_train'] y_train = loadmat('ECoG_Y_train.mat')['Y_train'] X_test = loadmat('ECoG_X_test.mat')['X_hold_out'] y_test = loadmat('ECoG_Y_test.mat')['Y_hold_out'] # In[21]: X_train.shape # In[22]: y_train.shape # In[23]: X_test.shape # In[24]: y_test.shape # Объектами X_train, X_test являются матрицы 32x27. Преобразуем их в одномерные векторы: # In[25]: X_train = np.array([i.flatten() for i in X_train]) X_test = np.array([i.flatten() for i in X_test]) # ## Анализ пропусков # In[26]: np.isnan(X_train).any() # In[27]: np.isnan(y_train).any() # Данная выборка не имеет пропусков. # ## Список моделей # Для решения поставленной задачи используются следующие модели: # * линейная регрессия # * PCA + линейная регрессия # * простая нейросеть # # ## Список критериев качетсва: # * mean squared error # * variance score # # ## Структурные параметры нейронной сети: # * число и состав признаков # * размерность скрытого пространства # * структура сети # # ## Способ разбиения выборки на обучение-контроль: # Данные уже разбиты на 2 выборки. # ## Линейная регрессия: # In[28]: model = linreg pipe = Pipeline(steps=[('model', model)]) # In[29]: pipe.fit(X_train,y_train) # In[30]: res = pipe.predict(X_test) mse = mean_squared_error(res,y_test) varience = explained_variance_score(y_test,res) print(mse,varience) # ## Линейная регрессия c PCA до 80 компонент # In[31]: model = linreg pca = PCA(n_components=80) pipe = Pipeline(steps=[('pca',pca),('model', model)]) # In[32]: pipe.fit(X_train,y_train) # In[33]: res = pipe.predict(X_test) mse = mean_squared_error(res,y_test) varience = explained_variance_score(y_test,res) print(mse,varience) # ## Нейронная сеть # In[34]: def big_model(): model = Sequential() model.add(Dense(512, input_dim=864, kernel_initializer='normal', activation='relu',kernel_regularizer=keras.regularizers.l2(0.01))) model.add(Dense(256, kernel_initializer='normal', activation='relu', kernel_regularizer=keras.regularizers.l2(0.01))) model.add(Dense(3, kernel_initializer='normal')) # Compile model model.compile(loss='mean_squared_error', optimizer='rmsprop') return model # In[35]: neural_net = KerasRegressor(big_model,epochs=100, batch_size=200,verbose=1) model = neural_net pipe = Pipeline(steps=[('scaler',scaler),('model', model)]) # In[36]: pipe.fit(X_train,y_train) # In[37]: res = pipe.predict(X_test) mse = mean_squared_error(res,y_test) varience = explained_variance_score(y_test,res) print(mse,varience) # ## Графики # ##### Зависимость функции ошибки от количества эпох # In[47]: results = np.zeros((5, 5)) N = 1218 for i, n_epoch in enumerate([1,5,10,20,50]): model = KerasRegressor(big_model,epochs=n_epoch, batch_size=100,verbose=0) pipe = Pipeline(steps=[('scaler',scaler),('model', model)]) pipe.fit(X_train,y_train) res = pipe.predict(X_test) for j in range(5): results[i][j] = mean_squared_error(y_test[N * j: N * (j + 1)], res[N * j: N * (j + 1)]) # In[48]: results # In[50]: x_ticks = [1,5,10,20,50] results_mean = np.mean(results, axis=1) results_std = np.std(results, axis=1) plt.plot(x_ticks, results_mean, 'k-') plt.plot(x_ticks, results_mean - results_std, 'b-') plt.plot(x_ticks, results_mean + results_std, 'b-') plt.fill_between(x_ticks, results_mean - results_std, results_mean + results_std) plt.grid() plt.title('Зависимость функции ошибки от количества эпох') plt.xlabel("Количество эпох обучения") plt.ylabel("Среднеквадратичное отклонение") plt.show() # ##### Зависимость функции ошибки от размера скрытого слоя # In[52]: def variable_model(): model = Sequential() model.add(Dense(64, input_dim=864, kernel_initializer='normal', activation='relu')) for i in range(n_l-1): model.add(Dense(64, kernel_initializer='normal', activation='relu')) model.add(Dense(3, kernel_initializer='normal')) # Compile model model.compile(loss='mean_squared_error', optimizer='rmsprop') return model results1 = np.zeros((3,5)) n_layers = [1,2,3] for i, n_l in enumerate(n_layers): model = KerasRegressor(variable_model,epochs=10, batch_size=100,verbose=0) pipe = Pipeline(steps=[('scaler',scaler),('model', model)]) pipe.fit(X_train,y_train) res = pipe.predict(X_test) for j in range(5): results1[i][j] = mean_squared_error(y_test[N * j: N * (j + 1)], res[N * j: N * (j + 1)]) # In[55]: results1 # In[54]: results1_mean = np.mean(results1, axis=1) results1_std = np.std(results1, axis=1) fig, ax = plt.subplots() ax.set_xlabel("Количество скрытых слоев") ax.set_ylabel("Среднеквадратичное отклонение") ax.set_title('Зависимость функции ошибки от размера скрытого слоя') ax.set_xticks(n_layers) plt.plot(n_layers, results1_mean, 'k-') plt.plot(n_layers, results1_mean - results1_std, 'b-') plt.plot(n_layers, results1_mean + results1_std, 'b-') plt.fill_between(n_layers, results1_mean - results1_std, results1_mean + results1_std) plt.grid() plt.show() # ##### Зависимость функции ошибки от размера обучающей выборки # In[58]: percents_of_data = [0.1,0.2,0.3,0.5,0.8,1.0] results3 = np.zeros((6, 5)) for i, pod in enumerate(percents_of_data): model = KerasRegressor(big_model,epochs=10, batch_size=100,verbose=0) pipe = Pipeline(steps=[('scaler',scaler),('model', model)]) pipe.fit(X_train[:int(len(X_train)*pod)],y_train[:int(len(X_train)*pod)]) res = pipe.predict(X_test) for j in range(5): results3[i][j] = mean_squared_error(y_test[N * j: N * (j + 1)], res[N * j: N * (j + 1)]) # In[59]: results3 # In[60]: results3_mean = np.mean(results3, axis=1) results3_std = np.std(results3, axis=1) fig, ax = plt.subplots() #ax.set_ylim(0,60) ax.set_xlabel("Процент от полной train выборки") ax.set_ylabel("Среднеквадратичное отклонение") ax.set_title('Зависимость функции ошибки от размера скрытого слоя') ax.set_xticks(percents_of_data) plt.plot(percents_of_data, results3_mean, 'k-') plt.plot(percents_of_data, results3_mean - results3_std, 'b-') plt.plot(percents_of_data, results3_mean + results3_std, 'b-') plt.fill_between(percents_of_data, results3_mean - results3_std, results3_mean + results3_std) plt.grid() plt.show() # ## Анализ результатов # # | | linreg | linreg + PCA | neural_net | # |--------------------|---------|--------------|------------| # | mean squared error | 1786.77 | 1634.90 | 2201.43 | # | variance score | 0.070 | 0.147 | -0.240 | # # ## Вывод # Можно сделать вывод, что наилучшие результаты показывает linear regression + 80-component PCA. # # Синтетическая выборка # В качестве синтетической выборки для задачи регрессии возьмём Boston dataset. # In[61]: dataset = load_boston() # In[62]: dataset.data.shape # In[78]: y = dataset.target X = dataset.data X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=228) # ## Линейная регрессия # In[64]: model = linreg pipe = Pipeline(steps=[('model', model)]) # In[65]: pipe.fit(X_train,y_train) # In[66]: res = pipe.predict(X_test) mse = mean_squared_error(res,y_test) varience = explained_variance_score(y_test,res) print(mse,varience) # ## Линейная регрессия c PCA до 8 компонент # In[67]: model = linreg pca = PCA(n_components=8) pipe = Pipeline(steps=[('pca',pca),('model', model)]) # In[68]: pipe.fit(X_train,y_train) # In[69]: res = pipe.predict(X_test) mse = mean_squared_error(res,y_test) varience = explained_variance_score(y_test,res) print(mse,varience) # ## Нейронная сеть # In[70]: def small_model(): model = Sequential() model.add(Dense(128, input_dim=13, kernel_initializer='normal', activation='relu')) model.add(Dense(64, kernel_initializer='normal', activation='relu')) model.add(Dense(1, kernel_initializer='normal')) # Compile model model.compile(loss='mean_squared_error', optimizer='rmsprop') return model # In[71]: neural_net = KerasRegressor(small_model,epochs=200, batch_size=100,verbose=0) model = neural_net pipe = Pipeline(steps=[('scaler',scaler),('model', model)]) # In[72]: pipe.fit(X_train,y_train) # In[73]: res = pipe.predict(X_test) mse = mean_squared_error(res,y_test) varience = explained_variance_score(y_test,res) print(mse,varience) # ## Графики # ##### Зависимость функции ошибки от количества эпох # In[81]: results = np.zeros((8, 5)) N = 26 for i, n_epoch in enumerate([1,5,10,20,50,100,200,500]): model = KerasRegressor(small_model,epochs=n_epoch, batch_size=100,verbose=0) pipe = Pipeline(steps=[('scaler',scaler),('model', model)]) pipe.fit(X_train,y_train) res = pipe.predict(X_test) for j in range(5): results[i][j] = mean_squared_error(y_test[N * j: N * (j + 1)], res[N * j: N * (j + 1)]) # In[82]: results # In[83]: results_mean = np.mean(results, axis=1) results_std = np.std(results, axis=1) x_ticks = [1,5,10,20,50,100,200,500] fig, ax = plt.subplots() plt.plot(x_ticks, results_mean, 'k-') plt.plot(x_ticks, results_mean - results_std, 'b-') plt.plot(x_ticks, results_mean + results_std, 'b-') plt.fill_between(x_ticks, results_mean - results_std, results_mean + results_std) plt.grid() ax.set_yscale('log') ax.set_xscale('log') ax.set_xticks(x_ticks) ax.set_xticklabels(x_ticks) ax.set_title('Зависимость функции ошибки от количества эпох') ax.set_xlabel("Количество эпох обучения") ax.set_ylabel("Среднеквадратичное отклонение") plt.show() # ##### Зависимость функции ошибки от размера скрытого слоя # In[85]: def variable_model(): model = Sequential() model.add(Dense(64, input_dim=13, kernel_initializer='normal', activation='relu')) for i in range(n_l-1): model.add(Dense(64, kernel_initializer='normal', activation='relu')) model.add(Dense(1, kernel_initializer='normal')) # Compile model model.compile(loss='mean_squared_error', optimizer='rmsprop') return model results1 = np.zeros((5, 5)) n_layers = [1,2,3,5,10] for i, n_l in enumerate(n_layers): model = KerasRegressor(variable_model,epochs=200, batch_size=100,verbose=0) pipe = Pipeline(steps=[('scaler',scaler),('model', model)]) pipe.fit(X_train,y_train) res = pipe.predict(X_test) for j in range(5): results1[i][j] = mean_squared_error(y_test[N * j: N * (j + 1)], res[N * j: N * (j + 1)]) # In[88]: results1 # In[87]: results1_mean = np.mean(results1, axis=1) results1_std = np.std(results1, axis=1) fig, ax = plt.subplots() ax.set_ylim(0,30) ax.set_xlabel("Количество скрытых слоев") ax.set_ylabel("Среднеквадратичное отклонение") ax.set_title('Зависимость функции ошибки от размера скрытого слоя') ax.set_xticks(n_layers) plt.plot(n_layers, results1_mean, 'k-') plt.plot(n_layers, results1_mean - results1_std, 'b-') plt.plot(n_layers, results1_mean + results1_std, 'b-') plt.fill_between(n_layers, results1_mean - results1_std, results1_mean + results1_std) plt.grid() plt.show() # ##### Зависимость функции ошибки от размера обучающей выборки # In[89]: percents_of_data = [0.1,0.2,0.3,0.5,0.8,1.0] results3 = np.zeros((6, 5)) for i, pod in enumerate(percents_of_data): model = KerasRegressor(small_model,epochs=100, batch_size=100,verbose=0) pipe = Pipeline(steps=[('scaler',scaler),('model', model)]) pipe.fit(X_train[:int(len(X_train)*pod)],y_train[:int(len(X_train)*pod)]) res = pipe.predict(X_test) error = mean_squared_error(res,y_test) for j in range(5): results3[i][j] = mean_squared_error(y_test[N * j: N * (j + 1)], res[N * j: N * (j + 1)]) # In[90]: results3 # In[91]: results3_mean = np.mean(results3, axis=1) results3_std = np.std(results3, axis=1) fig, ax = plt.subplots() ax.set_ylim(0,60) ax.set_xlabel("Процент от полной train выборки") ax.set_ylabel("Среднеквадратичное отклонение") ax.set_title('Зависимость функции ошибки от размера скрытого слоя') ax.set_xticks(percents_of_data) plt.plot(percents_of_data, results3_mean, 'k-') plt.plot(percents_of_data, results3_mean - results3_std, 'b-') plt.plot(percents_of_data, results3_mean + results3_std, 'b-') plt.fill_between(percents_of_data, results3_mean - results3_std, results3_mean + results3_std) plt.grid() plt.show() # In[ ]: