import numpy as np
import sys
import numpy as np
from time import sleep
import sys
import keras.backend as K
from keras.models import Model, Sequential
import matplotlib.pyplot as plt
import seaborn as sns
Using Theano backend. WARNING (theano.sandbox.cuda): The cuda backend is deprecated and will be removed in the next release (v0.10). Please switch to the gpuarray backend. You can get more information about how to switch at this URL: https://github.com/Theano/Theano/wiki/Converting-to-the-new-gpu-back-end%28gpuarray%29 Using gpu device 0: GeForce GTX 980 Ti (CNMeM is disabled, cuDNN 6021) /home/hiranumn/anaconda2/lib/python2.7/site-packages/theano/sandbox/cuda/__init__.py:631: UserWarning: Your cuDNN version is more recent than the one Theano officially supports. If you see any problems, try updating Theano or downgrading cuDNN to version 5.1. warnings.warn(warn)
# IRIS DATA
_X = np.array([[float(j) for j in i.rstrip().split(",")[:-1]] for i in open("iris.data").readlines()][:-1])
_Y = np.array([0 for i in range(100)] + [1 for i in range(50)])
# MNIST DATA
from tensorflow.examples.tutorials.mnist import input_data
mnist = input_data.read_data_sets("MNIST_data/", one_hot=True)
X = mnist.train._images.reshape(55000,28, 28)
Y = mnist.train._labels
Extracting MNIST_data/train-images-idx3-ubyte.gz Extracting MNIST_data/train-labels-idx1-ubyte.gz Extracting MNIST_data/t10k-images-idx3-ubyte.gz Extracting MNIST_data/t10k-labels-idx1-ubyte.gz
'''
Input: numpy array of a sample
Optional inputs:
- reference: reference values (defaulted to 0s).
- steps: # steps from reference values to the actual sample.
Output: list of numpy arrays to integrated over.
'''
def linearly_interpolate(sample, reference=None, num_steps=50):
# Use default reference values if reference is not specified
if reference is None: reference = np.zeros(sample.shape);
# Reference and sample shape needs to match exactly
assert sample.shape == reference.shape
# Calcuated stepwise difference from reference to the actual sample.
ret = np.zeros(tuple([num_steps] +[i for i in sample.shape]))
for s in range(num_steps):
ret[s] = reference+(sample-reference)*(s*1.0/num_steps)
return ret, num_steps, (sample-reference)*(1.0/num_steps)
# Interpolate to 0
samples, num_steps, step_sizes = linearly_interpolate(X[0].reshape(28,28), num_steps=100)
image = np.zeros((int(np.ceil(num_steps/10)*28), 10*28))
count = 0
for i in range(10):
for j in range(int(np.ceil(num_steps/10))):
s = samples[count]
count += 1
image[j*28:(j+1)*28, i*28:(i+1)*28] = s
plt.figure(figsize=(int(np.ceil(num_steps/10)*0.25), 10*0.25))
plt.imshow(image)
plt.xticks([],[])
plt.yticks([],[])
plt.show()
# Interpolate to 1
samples, num_steps, step_sizes = linearly_interpolate(X[0].reshape(28,28), np.ones((28,28)), num_steps=100)
image = np.zeros((int(np.ceil(num_steps/10)*28), 10*28))
count = 0
for i in range(10):
for j in range(int(np.ceil(num_steps/10))):
s = samples[count]
count += 1
image[j*28:(j+1)*28, i*28:(i+1)*28] = s
plt.figure(figsize=(int(np.ceil(num_steps/10)*0.25), 10*0.25))
plt.imshow(image)
plt.xticks([],[])
plt.yticks([],[])
plt.show()
################################################################
# Implemented by Naozumi Hiranuma (hiranumn@uw.edu) #
# #
# Kears-compatible implmentation of Integrated Gradients #
# proposed in "Axiomatic attribution for deep neuron networks" #
# (https://arxiv.org/abs/1703.01365). #
# #
# Keywords: Shapley values, interpretable machine learning #
################################################################
import numpy as np
from time import sleep
import sys
import keras.backend as K
from keras.models import Model, Sequential
'''
Integrated gradients approximates Shapley values by integrating partial
gradients with respect to input features from reference input to the
actual input. The following class implements this concept.
'''
class integrated_gradients:
# model: Keras model that you wish to explain.
# outchannels: In case the model are multi tasking, you can specify which channels you want.
def __init__(self, model, outchannels=[], verbose=1):
# Bacnend: either tensorflow or theano)
self.backend = K.backend()
#load model supports keras.Model and keras.Sequential
if isinstance(model, Sequential):
self.model = model.model
elif isinstance(model, Model):
self.model = model
else:
print "Invalid input model"
return -1
#load input tensors
self.input_tensors = []
for i in self.model.inputs:
self.input_tensors.append(i)
# The learning phase flag is a bool tensor (0 = test, 1 = train)
# to be passed as input to any Keras function that uses
# a different behavior at train time and test time.
self.input_tensors.append(K.learning_phase())
#If outputchanel is specified, use it.
#Otherwise evalueate all outputs.
self.outchannels = outchannels
if len(self.outchannels) == 0:
if verbose: print "Evaluated output channel (0-based index): All"
if K.backend() == "tensorflow":
self.outchannels = range(self.model.output.shape[1]._value)
elif K.backend() == "theano":
self.outchannels = range(model1.output._keras_shape[1])
else:
if verbose:
print "Evaluated output channels (0-based index):",
for i in self.outchannels: print i
print
#Build gradient functions for desired output channels.
self.get_gradients = {}
if verbose: print "Building gradient functions"
# Evaluate over all channels.
for c in self.outchannels:
# Get tensor that calcuates gradient
if K.backend() == "tensorflow":
gradients = self.model.optimizer.get_gradients(self.model.output[:, c], self.model.input)
if K.backend() == "theano":
gradients = self.model.optimizer.get_gradients(self.model.output[:, c].sum(), self.model.input)
# Build computational graph that calculates the tensfor given inputs
self.get_gradients[c] = K.function(inputs=self.input_tensors, outputs=gradients)
# This takes a lot of time for a big model with many tasks.
# So lets pring the progress.
if verbose:
sys.stdout.write('\r')
sys.stdout.write("Progress: "+str(int((c+1)*1.0/len(self.outchannels)*1000)*1.0/10)+"%")
sys.stdout.flush()
# Done
if verbose: print "\nDone."
'''
Input: sample to explain, channel to explain
Optional inputs:
- reference: reference values (defaulted to 0s).
- steps: # steps from reference values to the actual sample.
Output: list of numpy arrays to integrated over.
'''
def explain(self, sample, outc=0, reference=False, num_steps=50, verbose=0):
# Each element for each input stream.
samples = []
numsteps = []
step_sizes = []
# If multiple inputs are present, feed them as list of np arrays.
if isinstance(sample, list):
#If reference is present, reference and sample size need to be equal.
if reference != False:
assert len(sample) == len(reference)
for i in range(len(sample)):
if reference == False:
_output = integrated_gradients.linearly_interpolate(sample[i], False, num_steps)
else:
_output = integrated_gradients.linearly_interpolate(sample[i], False, num_steps)
samples.append(_output[0])
numsteps.append(_output[1])
step_sizes.append(_output[2])
# Or you can feed just a single numpy arrray.
elif isinstance(sample, np.ndarray):
_output = integrated_gradients.linearly_interpolate(sample, reference, num_steps)
samples.append(_output[0])
numsteps.append(_output[1])
step_sizes.append(_output[2])
# Desired channel must be in the list of outputchannels
assert outc in self.outchannels
if verbose: print "Explaning the "+str(self.outchannels[outc])+"th output."
# For tensorflow backend
_input = []
for s in samples:
_input.append(s)
_input.append(0)
if K.backend() == "tensorflow":
gradients = self.get_gradients[outc](_input)
elif K.backend() == "theano":
gradients = self.get_gradients[outc](_input)
if len(self.model.inputs) == 1:
gradients = [gradients]
explanation = []
for i in range(len(gradients)):
_temp = np.sum(gradients[i], axis=0)
explanation.append(np.multiply(_temp, step_sizes[i]))
if isinstance(sample, list):
return explanation
elif isinstance(sample, np.ndarray):
return explanation[0]
return -1
'''
Input: numpy array of a sample
Optional inputs:
- reference: reference values (defaulted to 0s).
- steps: # steps from reference values to the actual sample.
Output: list of numpy arrays to integrated over.
'''
@staticmethod
def linearly_interpolate(sample, reference=False, num_steps=50):
# Use default reference values if reference is not specified
if reference is False: reference = np.zeros(sample.shape);
# Reference and sample shape needs to match exactly
assert sample.shape == reference.shape
# Calcuated stepwise difference from reference to the actual sample.
ret = np.zeros(tuple([num_steps] +[i for i in sample.shape]))
for s in range(num_steps):
ret[s] = reference+(sample-reference)*(s*1.0/num_steps)
return ret, num_steps, (sample-reference)*(1.0/num_steps)
from keras.layers import Input, Dense
from keras.models import Model
_input = Input(shape=[4])
probs = Dense(1, activation='sigmoid')(_input)
model1 = Model(inputs=_input, outputs=probs)
model1.compile(optimizer='sgd', loss='binary_crossentropy')
model1.fit(_X, _Y, epochs=10000, batch_size=128, verbose=0)
<keras.callbacks.History at 0x7fa542503910>
ig = integrated_gradients(model1)
Evaluated output channel (0-based index): All Building gradient functions Progress: 100.0% Done.
ig.explain(_X[0], outc=0, num_steps=1000)
array([-0.13235252, -0.07901507, 0.05334638, 0.00674734])
import keras
from keras.layers import Input, Dense, Flatten
from keras.models import Model
_input1 = Input(shape=[2])
probs1 = Dense(2, activation='tanh')(_input1)
_input2 = Input(shape=[2])
probs2 = Dense(2, activation='tanh')(_input2)
x = keras.layers.concatenate([probs1, probs2])
x_out = Dense(1, activation='sigmoid')(x)
model3 = Model(inputs=[_input1, _input2], outputs=x_out)
model3.compile(optimizer='sgd', loss='binary_crossentropy')
model3.fit([_X[:, :2], _X[:, 2:]], _Y, epochs=1000, batch_size=128, verbose=0)
<keras.callbacks.History at 0x7fa542ccf050>
ig = integrated_gradients(model3)
Evaluated output channel (0-based index): All Building gradient functions Progress: 100.0% Done.
ig.explain([_X[0, :2], _X[0, 2:]], outc=0, num_steps=1000)
[array([-0.15271009, 0.03400158]), array([ 0.06474359, 0.00295305])]
from keras.models import Sequential
from keras.layers import Dense
from keras.layers.core import Activation
model2 = Sequential([
Dense(1, input_dim=4),
Activation('sigmoid'),
])
model2.compile(optimizer='sgd', loss='binary_crossentropy')
model2.fit(_X, _Y, epochs=10000, batch_size=128, verbose=0)
<keras.callbacks.History at 0x7fa5432e01d0>
ig = integrated_gradients(model2)
ig.explain(_X[0], outc=0, num_steps=1000)
Evaluated output channel (0-based index): All Building gradient functions Progress: 100.0% Done.
array([-0.14350999, -0.09356359, 0.0575112 , 0.00818367])