Here we will load data from Cichy et al. 2014 [1]. The data consist of fMRI responses from early visual cortex (EVC) and inferior temporal (IT) cortex and MEG responses at different timepoints in the form of representational dissimilarity matrices (RDMs) to 92 images. These images belong to different categories as shown in the Figure below.
RSA is a method to relate signals from different source spaces (such as behavior, neural responses, DNN activations) by abstracting signals from separate source spaces into a common similarity space. For this, in each source space, condition-specific responses are compared to each other for dissimilarity (e.g., by calculating Euclidean distances between signals), and the values are aggregated in so-called representational dissimilarity matrices (RDMs) indexed in rows and columns by the conditions compared. RDMs thus summarize the representational geometry of the source space signals. Different from source space signals themselves, RDMs from different sources spaces are directly comparable to each other for similarity and thus can relate signals from different spaces.
The figure below illustrates how RSA can be applied to different problems by comparing RDMs of different modalities/species.
In the cells below, we will download and visualize MEG and fMRI RDMs. Please refer Figure 1 in [1] to learn details about the image category order in RDMs.
# Imports
import os
import cv2
import glob
import h5py
import time
import random
import urllib
import pickle
import argparse
import numpy as np
import nibabel as nib
import scipy.io as sio
import matplotlib.pyplot as plt
from tqdm import tqdm
from PIL import Image
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA, IncrementalPCA
import torch
import torch.nn as nn
import torch.utils.model_zoo as model_zoo
from torch.autograd import Variable as V
from torchvision import transforms as trn
# @title Load mat files
def loadmat(matfile):
"""Function to load .mat files.
Parameters
----------
matfile : str
path to `matfile` containing fMRI data for a given trial.
Returns
-------
dict
dictionary containing data in key 'vol' for a given trial.
"""
try:
f = h5py.File(matfile)
except (IOError, OSError):
return sio.loadmat(matfile)
else:
return {name: np.transpose(f.get(name)) for name in f.keys()}
# @title Data download and unzip in `data/`
import os, zipfile, requests
fname = 'data.zip'
url = "https://osf.io/7vpyh/download"
if not os.path.isfile(fname):
try:
r = requests.get(url)
except requests.ConnectionError:
print("!!! Failed to download data !!!")
else:
if r.status_code != requests.codes.ok:
print("!!! Failed to download data !!!")
else:
print("Downlading data...")
with open(fname, "wb") as fid:
fid.write(r.content)
with zipfile.ZipFile(fname, 'r') as zip_ref:
zip_ref.extractall('data/')
print("Download completed!")
Downlading data... Download completed!
# Load MEG RDMs for each time point for all subjects all sessions
MEG_RDMs = loadmat("data/MEG_decoding_RDMs.mat")['MEG_decoding_RDMs']
print(MEG_RDMs.shape)
(16, 2, 1301, 92, 92)
Shape of RDM is num_subjects x num_sessions x num_timepoints x num_stimulus x num_stimulus
# average RDM across subjects and sessions
MEG_RDM_sub_averaged = np.mean(MEG_RDMs, axis=(0, 1))
del MEG_RDMs
# @title visualize subject averaged MEG RDMs
timepoint = 420 # @param {type:"slider", min:-100, max:600, step:20}
# Load RDM at a given timepoint
# +100 as the RDMs provided are from -100ms to 1000ms after the stimulus onset
RDM = np.array(MEG_RDM_sub_averaged[timepoint+100])
# Since the matrix is symmetric we set upper triangular values to NaN
RDM[np.triu_indices(RDM.shape[0], 1)] = np.nan
# plot the RDM at given timepoint
plt.imshow(RDM, cmap="bwr")
plt.title("MEG RDM at t = " + str(timepoint))
cbar = plt.colorbar()
plt.xlabel("Stimuli")
plt.ylabel("Stimuli")
cbar.ax.get_yaxis().labelpad = 15
cbar.ax.set_ylabel('Decoding Accuracy', rotation=270)
plt.show()
fMRI_file = 'data/92_Image_Set/target_fmri.mat' # path of fMRI RDM file
fMRI_RDMs = loadmat(fMRI_file) # load the fMRI RDMs
print(fMRI_RDMs.keys())
print(fMRI_RDMs['EVC_RDMs'].shape)
dict_keys(['EVC_RDMs', 'IT_RDMs']) (15, 92, 92)
fMRI_RDMs is a dictionary with keys 'EVC_RDMs'
and 'IT_RDMs'
corresponding to ROIs EVC and IT respectively. The shape of each RDM is num_subjects
$\times$ num_stimulus
$\times$ num_stimulus
.
# @title visualize subject averaged fMRI RDMs
ROI = 'EVC' # @param ["EVC", "IT"]
# Average the ROI RDM across subjects
RDM = np.array(fMRI_RDMs[ROI + '_RDMs'].mean(axis=0))
# Since the matrix is symmetric we set upper triangular values to NaN
RDM[np.triu_indices(RDM.shape[0], 1)] = np.nan
# plot the ROI RDM at given timepoint
plt.imshow(RDM, cmap="bwr")
plt.title(ROI + " RDM")
cbar = plt.colorbar()
plt.xlabel("Stimuli")
plt.ylabel("Stimuli")
cbar.ax.get_yaxis().labelpad = 15
cbar.ax.set_ylabel('1-Correlation', rotation=270)
plt.show()
Below we will perform two analyses:
In other words, the comparison will inform us about the sequential order of visual feature processing in the cortex.
# @title RDM Comparison functions
from scipy.stats import spearmanr
def RSA_spearman(rdm1,rdm2):
"""
computes and returns the spearman correlation between lower triangular
part of the input rdms. We only need to compare either lower or upper
triangular part of the matrix as RDM is symmetric
"""
# get lower triangular part of the RDM1
lt_rdm1 = get_lowertriangular(rdm1)
# get lower triangular part of the RDM1
lt_rdm2 = get_lowertriangular(rdm2)
# return Spearman's correlation between lower triangular part of rdm1 & rdm2
return spearmanr(lt_rdm1, lt_rdm2)[0]
def get_lowertriangular(rdm):
"""
returns lower triangular part of the matrix
"""
num_conditions = rdm.shape[0]
return rdm[np.tril_indices(num_conditions, -1)]
# @title Correlating MEG RDMs with fMRI RDMs
num_timepoints = MEG_RDM_sub_averaged.shape[0] #get number of timepoints
# initialize a dictionary to store MEG and ROI RDM correlation at each timepoint
MEG_correlation = {}
ROIs = ['EVC','IT']
for ROI in ROIs:
MEG_correlation[ROI] = []
# for loop that goes over MEG RDMs at all time points and correlate with ROI RDMs
for t in range(num_timepoints):
MEG_RDM_t = MEG_RDM_sub_averaged[t, :, :]
for ROI in ROIs:
ROI_RDM = np.mean(fMRI_RDMs[ROI + '_RDMs'], axis=0)
MEG_correlation[ROI].append(RSA_spearman(ROI_RDM, MEG_RDM_t))
# @title Plotting MEG-fMRI comparison
plt.rc('font', size=12)
fig, ax = plt.subplots(figsize=(10, 6))
time_range = range(-100, 1201)
ax.plot(time_range, MEG_correlation['IT'], color='tab:orange', label='IT')
ax.plot(time_range, MEG_correlation['EVC'], color='tab:blue', label='EVC')
# Same as above
ax.set_xlabel('Time')
ax.set_ylabel('Spearmans Correlation')
ax.set_title('MEG-fMRI fusion')
ax.grid(True)
ax.legend(loc='upper left')
fig.show()
# @title AlexNet Definition
__all__ = ['AlexNet', 'alexnet']
model_urls = {
'alexnet': 'https://download.pytorch.org/models/alexnet-owt-4df8aa71.pth',
}
# Here we redefine AlexNet differently from torchvision code for better understanding
class AlexNet(nn.Module):
def __init__(self, num_classes=1000):
super(AlexNet, self).__init__()
self.conv1 = nn.Sequential(
nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=3, stride=2),
)
self.conv2 = nn.Sequential(
nn.Conv2d(64, 192, kernel_size=5, padding=2),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=3, stride=2),
)
self.conv3 = nn.Sequential(
nn.Conv2d(192, 384, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
)
self.conv4 = nn.Sequential(
nn.Conv2d(384, 256, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
)
self.conv5 = nn.Sequential(
nn.Conv2d(256, 256, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=3, stride=2),
)
self.fc6 = nn.Sequential(
nn.Dropout(),
nn.Linear(256 * 6 * 6, 4096),
nn.ReLU(inplace=True),
)
self.fc7 =nn.Sequential(
nn.Dropout(),
nn.Linear(4096, 4096),
)
self.fc8 = nn.Sequential(
nn.ReLU(inplace=True),
nn.Linear(4096, num_classes),
)
def forward(self, x):
out1 = self.conv1(x)
out2 = self.conv2(out1)
out3 = self.conv3(out2)
out4 = self.conv4(out3)
out5 = self.conv5(out4)
out5_reshaped = out5.view(out5.size(0), 256 * 6 * 6)
out6= self.fc6(out5_reshaped)
out7= self.fc7(out6)
out8 = self.fc8(out7)
return out1, out2, out3,out4, out5, out6,out7,out8
def alexnet(pretrained=False, **kwargs):
"""AlexNet model architecture from the
`"One weird trick..." <https://arxiv.org/abs/1404.5997>`_ paper.
Args:
pretrained (bool): If True, returns a model pre-trained on ImageNet
"""
model = AlexNet(**kwargs)
if pretrained:
model.load_state_dict(model_zoo.load_url(model_urls['alexnet']))
return model
# @title Feature extraction code
def load_alexnet(model_checkpoints):
"""This function initializes an Alexnet and load
its weights from a pretrained model. Since we redefined model in a different
way we have to rename the weights that were in the pretrained checkpoint.
----------
model_checkpoints : str
model checkpoints location.
Returns
-------
model
pytorch model of alexnet
"""
model = alexnet()
# Load checkpoint
model_file = model_checkpoints
checkpoint = torch.load(model_file, map_location=lambda storage, loc: storage)
# Rename the checkpoint keys according to new definition
model_dict = ["conv1.0.weight", "conv1.0.bias",
"conv2.0.weight", "conv2.0.bias",
"conv3.0.weight", "conv3.0.bias",
"conv4.0.weight", "conv4.0.bias",
"conv5.0.weight", "conv5.0.bias",
"fc6.1.weight", "fc6.1.bias",
"fc7.1.weight", "fc7.1.bias",
"fc8.1.weight", "fc8.1.bias"]
state_dict = {}
i = 0
for k, v in checkpoint.items():
state_dict[model_dict[i]] = v
i += 1
# initialize model with pretrained weights
model.load_state_dict(state_dict)
if torch.cuda.is_available():
model.cuda()
model.eval()
return model
def get_activations_and_save(model, image_list, activations_dir):
"""This function generates Alexnet features and save them in a specified directory.
Parameters
----------
model :
pytorch model : alexnet.
image_list : list
the list contains path to all images.
activations_dir : str
save path for extracted features.
"""
resize_normalize = trn.Compose([
trn.Resize((224, 224)),
trn.ToTensor(),
trn.Normalize([0.485, 0.456, 0.406],
[0.229, 0.224, 0.225])
])
# for all images in the list generate and save activations
for image_file in tqdm(image_list):
# open image
img = Image.open(image_file)
image_file_name = os.path.split(image_file)[-1].split(".")[0]
# apply transformations before feeding to model
input_img = V(resize_normalize(img).unsqueeze(0))
if torch.cuda.is_available():
input_img=input_img.cuda()
x = model.forward(input_img)
activations = []
for i,feat in enumerate(x):
activations.append(feat.data.cpu().numpy().ravel())
for layer in range(len(activations)):
save_path = os.path.join(activations_dir,
f"{image_file_name}_{layer}_{layer + 1}.npy")
np.save(save_path, activations[layer])
# get the paths to all the images in the stimulus set
image_dir = 'data/92_Image_Set/92images'
image_list = glob.glob(image_dir + '/*.jpg')
image_list.sort()
print(f'Total Number of Images: {len(image_list)}')
save_dir = "activations_alexnet"
######### load Alexnet initialized with pretrained weights ###################
# Download pretrained Alexnet from:
# https://download.pytorch.org/models/alexnet-owt-4df8aa71.pth
# and save in the current directory
checkpoint_path = "alexnet.pth"
if not os.path.exists(checkpoint_path):
url = "https://download.pytorch.org/models/alexnet-owt-4df8aa71.pth"
urllib.request.urlretrieve(url, "alexnet.pth")
model = load_alexnet(checkpoint_path)
##############################################################################
######### get and save activations ################################
activations_dir = os.path.join(save_dir)
if not os.path.exists(activations_dir):
os.makedirs(activations_dir)
print("-------------Saving activations ----------------------------")
get_activations_and_save(model, image_list, activations_dir)
###################################################################
Total Number of Images: 92 -------------Saving activations ----------------------------
100%|██████████| 92/92 [00:05<00:00, 16.96it/s]
num_layers = 8 # number of layers in the model
layers = []
for i in range(num_layers):
layers.append(f"layer_{i + 1}")
model_RDMs = {}
# create RDM for each layer from activations
for j, layer in enumerate(layers):
activation_files = glob.glob(activations_dir + '/*' + f'{j}.npy')
activation_files.sort()
activations = []
# Load all activations
for activation_file in activation_files:
activations.append(np.load(activation_file))
activations = np.array(activations)
# calculate Pearson's distance for all pairwise comparisons
model_RDMs[layer] = 1 - np.corrcoef(activations)
/usr/local/lib/python3.7/dist-packages/numpy/lib/function_base.py:380: RuntimeWarning: Mean of empty slice. avg = a.mean(axis) /usr/local/lib/python3.7/dist-packages/numpy/core/_methods.py:182: RuntimeWarning: invalid value encountered in true_divide ret, rcount, out=ret, casting='unsafe', subok=False) /usr/local/lib/python3.7/dist-packages/numpy/lib/function_base.py:2683: RuntimeWarning: Degrees of freedom <= 0 for slice c = cov(x, y, rowvar, dtype=dtype) /usr/local/lib/python3.7/dist-packages/numpy/lib/function_base.py:2542: RuntimeWarning: divide by zero encountered in true_divide c *= np.true_divide(1, fact) /usr/local/lib/python3.7/dist-packages/numpy/lib/function_base.py:2542: RuntimeWarning: invalid value encountered in multiply c *= np.true_divide(1, fact)
# @title visualize model RDMs
layer = 'layer_8' # @param ['layer_1','layer_2','layer_3','layer_4','layer_5','layer_6','layer_7','layer_8']
# loading layer RDM
RDM = np.array(model_RDMs[layer])
# Since the matrix is symmetric we set upper triangular values to NaN
RDM[np.triu_indices(RDM.shape[0], 1)] = np.nan
# Visualize layer RDM
plt.imshow(RDM, cmap="bwr")
plt.title(layer + " RDM")
cbar = plt.colorbar()
plt.xlabel("Stimuli")
plt.ylabel("Stimuli")
cbar.ax.get_yaxis().labelpad = 15
cbar.ax.set_ylabel('1-Correlation', rotation=270)
plt.show()
# @title Correlating MEG RDMs with DNN RDMs
run = False
if run:
num_timepoints = MEG_RDM_sub_averaged.shape[0] # get number of timepoints
# initialize a dictionary to store MEG and DNN RDM correlation at each timepoint
for layer in layers:
MEG_correlation[layer] = []
# for loop that goes over MEG RDMs at all time points and correlate with DNN RDMs
for t in range(num_timepoints):
MEG_RDM_t = MEG_RDM_sub_averaged[t, :, :]
for layer in layers:
model_RDM = model_RDMs[layer]
MEG_correlation[layer].append(RSA_spearman(model_RDM, MEG_RDM_t))
# @title Plotting MEG-DNN comparison
if run:
plt.rc('font', size=12)
fig, ax = plt.subplots(figsize=(10, 6))
time_range = range(-100,1201)
ax.plot(time_range, MEG_correlation['layer_1'],
color='tab:orange', label='layer_1')
ax.plot(time_range, MEG_correlation['layer_7'],
color='tab:blue', label='layer_7')
# Same as above
ax.set_xlabel('Time')
ax.set_ylabel('Spearmans Correlation')
ax.set_title('MEG-model comparison')
ax.grid(True)
ax.legend(loc='upper left')
fig.show()