CNN
CNN on text
2021/03/18 Happy-jihye
Reference : pytorch-sentiment-analysis/4 - Convolutional Sentiment Analysis
!apt install python3.7
!pip install torchtext==0.6.0
!python -m spacy download en
import torch
from torchtext import data
TEXT = data.Field(tokenize = 'spacy',
tokenizer_language = 'en',
batch_first = True)
LABEL = data.LabelField(dtype = torch.float) # pos -> 1 / neg -> 0
from torchtext import datasets
import random
import numpy as np
SEED = 1234
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True
train_data, test_data = datasets.IMDB.splits(TEXT, LABEL)
train_data, valid_data = train_data.split(random_state = random.seed(SEED))
print(f'training examples 수 : {len(train_data)}')
print(f'validations examples 수 : {len(valid_data)}')
print(f'testing examples 수 : {len(test_data)}')
training examples 수 : 17500 validations examples 수 : 7500 testing examples 수 : 25000
MAX_VOCAB_SIZE = 25_000
TEXT.build_vocab(train_data,
max_size = MAX_VOCAB_SIZE,
vectors = "glove.6B.100d",
unk_init = torch.Tensor.normal_)
LABEL.build_vocab(train_data)
print(f"Unique tokens in TEXT vocabulary: {len(TEXT.vocab)}")
print(f"Unique tokens in LABEL vocabulary: {len(LABEL.vocab)}")
Unique tokens in TEXT vocabulary: 25002 Unique tokens in LABEL vocabulary: 2
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
BATCH_SIZE = 64
train_iterator, valid_iterator, test_iterator = data.BucketIterator.splits(
(train_data, valid_data, test_data),
batch_size = BATCH_SIZE,
device = device
)
보통 image는 2-dimension이지만, text는 1-dimension입니다. 하지만 tutorial-3처럼 embedding하면, text를 2차원의 vector라고 생각할 수 있습니다.
filter는 [n x emb_dim]의 size를 가진 tensor이고, 이때 n은 연속된 단어의 수를 의미합니다.
filter는 아래로 이동하며, embedding vector의 내적을 통해 계산을 하면, 그림의 빨간색에 해당하는 하나의 실수값을 얻을 수 있습니다.
| | |
import torch.nn as nn
import torch.nn.functional as F
class CNN(nn.Module):
def __init__(self, vocab_size, embedding_dim,
n_filters, filter_sizes,
output_dim,
dropout,
pad_idx #<pad> token
):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx = pad_idx)
self.conv_0 = nn.Conv2d(in_channels = 1,
out_channels = n_filters,
kernel_size = (filter_sizes[0], embedding_dim)) # 각 kernel의 size는 [n x emb_dim] 입니다.
self.conv_1 = nn.Conv2d(in_channels = 1,
out_channels = n_filters,
kernel_size = (filter_sizes[1], embedding_dim))
self.conv_2 = nn.Conv2d(in_channels = 1,
out_channels = n_filters,
kernel_size = (filter_sizes[2], embedding_dim))
self.fc = nn.Linear(len(filter_sizes) * n_filters, output_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, text):
# text = [batch size, sentence length]
## RNN에서는 batch size의 입력을 두번째로 원하기 때문에 text가 [sentence length, batch size] 였다면,
## CNN에서는 batch size를 먼저 입력받아야하기 때문에 batch_first를 True로 설정하여 이렇게 데이터를 구성해주었습니다.
embedded = self.embedding(text).unsqueeze(1) # 두번째 위치에 1인 차원을 추가
# embedded = [batch size, sentence length, embedding dim]
# unsquezzed_embedded = [batch size, 1, sentence length, embedding dim]
conved_0 = F.relu(self.conv_0(embedded).squeeze(3))
conved_1 = F.relu(self.conv_1(embedded).squeeze(3))
conved_2 = F.relu(self.conv_2(embedded).squeeze(3))
## self.conv_0(embedded) -> [batch size, n_filters, sentence length - filter_sizes[n] + 1, 1]
## squeeze (1인 차원 제거) -> [batch size, n_filters, sentence length - filter_sizes[n] + 1]
# conved_n = [batch size, n_filters, sentence length - filter_sizes[n] + 1]
pooled_0 = F.max_pool1d(conved_0, conved_0.shape[2]).squeeze(2)
pooled_1 = F.max_pool1d(conved_1, conved_1.shape[2]).squeeze(2)
pooled_2 = F.max_pool1d(conved_2, conved_2.shape[2]).squeeze(2)
# pooled_n = [batch size, n_filters]
cat = self.dropout(torch.cat((pooled_0, pooled_1, pooled_2), dim = 1))
# cat = [batch_size, n_filters * len(filter_sizes)]
return self.fc(cat)
import torch.nn as nn
import torch.nn.functional as F
class CNN(nn.Module):
def __init__(self, vocab_size, embedding_dim,
n_filters, filter_sizes,
output_dim,
dropout,
pad_idx #<pad> token
):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx = pad_idx)
self.convs = nn.ModuleList([
nn.Conv2d(in_channels = 1,
out_channels = n_filters,
kernel_size = (fs, embedding_dim))
for fs in filter_sizes
])
self.fc = nn.Linear(len(filter_sizes) * n_filters, output_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, text):
# text = [batch size, sentence length]
## RNN에서는 batch size의 입력을 두번째로 원하기 때문에 text가 [sentence length, batch size] 였다면,
## CNN에서는 batch size를 먼저 입력받아야하기 때문에 batch_first를 True로 설정하여 이렇게 데이터를 구성해주었습니다.
embedded = self.embedding(text).unsqueeze(1) # 두번째 위치에 1인 차원을 추가
# embedded = [batch size, sentence length, embedding dim]
# unsquezzed_embedded = [batch size, 1, sentence length, embedding dim]
conved = [F.relu(conv(embedded).squeeze(3)) for conv in self.convs]
# conved_n = [batch size, n_filters, sentence length - filter_sizes[n] + 1]
pooled = [F.max_pool1d(conv, conv.shape[2]).squeeze(2) for conv in conved]
# pooled_n = [batch size, n_filters]
cat = self.dropout(torch.cat(pooled, dim = 1))
# cat = [batch_size, n_filters * len(filter_sizes)]
return self.fc(cat)
class CNN1d(nn.Module):
def __init__(self, vocab_size, embedding_dim, n_filters, filter_sizes, output_dim,
dropout, pad_idx):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx = pad_idx)
self.convs = nn.ModuleList([
nn.Conv1d(in_channels = embedding_dim,
out_channels = n_filters,
kernel_size = fs)
for fs in filter_sizes
])
self.fc = nn.Linear(len(filter_sizes) * n_filters, output_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, text):
#text = [batch size, sent len]
embedded = self.embedding(text)
#embedded = [batch size, sent len, emb dim]
embedded = embedded.permute(0, 2, 1)
#embedded = [batch size, emb dim, sent len]
conved = [F.relu(conv(embedded)) for conv in self.convs]
#conved_n = [batch size, n_filters, sent len - filter_sizes[n] + 1]
pooled = [F.max_pool1d(conv, conv.shape[2]).squeeze(2) for conv in conved]
#pooled_n = [batch size, n_filters]
cat = self.dropout(torch.cat(pooled, dim = 1))
#cat = [batch size, n_filters * len(filter_sizes)]
return self.fc(cat)
INPUT_DIM = len(TEXT.vocab)
EMBEDDING_DIM = 100
N_FILTERS = 100
FILTER_SIZES = [3,4,5]
OUTPUT_DIM = 1
DROPOUT = 0.5
PAD_IDX = TEXT.vocab.stoi[TEXT.pad_token]
model = CNN(INPUT_DIM, EMBEDDING_DIM, N_FILTERS, FILTER_SIZES, OUTPUT_DIM, DROPOUT, PAD_IDX)
def count_parameters(model):
return sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f'The model has {count_parameters(model):,} trainable parameters')
The model has 2,620,801 trainable parameters
pretrained_embeddings = TEXT.vocab.vectors
print(pretrained_embeddings.shape)
model.embedding.weight.data.copy_(pretrained_embeddings)
torch.Size([25002, 100])
tensor([[-1.1172e-01, -4.9659e-01, 1.6307e-01, ..., 1.2647e+00, -2.7527e-01, -1.3254e-01], [-8.5549e-01, -7.2081e-01, 1.3755e+00, ..., 8.2522e-02, -1.1314e+00, 3.9972e-01], [-3.8194e-02, -2.4487e-01, 7.2812e-01, ..., -1.4590e-01, 8.2780e-01, 2.7062e-01], ..., [-1.8866e-01, 7.5537e-01, -1.6210e-01, ..., 7.8887e-05, 2.2752e-01, -1.9435e-01], [ 8.1298e-02, 2.4855e-01, 4.3583e-01, ..., -3.6564e-01, 6.6258e-01, 5.0125e-01], [ 7.9125e-01, -2.5157e-03, 8.3469e-01, ..., 2.5457e-01, 8.3578e-01, 1.2827e+00]])
# PAD_IDX = TEXT.vocab.stoi[TEXT.pad_token] : 1
UNK_IDX = TEXT.vocab.stoi[TEXT.unk_token] #0
model.embedding.weight.data[UNK_IDX] = torch.zeros(EMBEDDING_DIM)
model.embedding.weight.data[PAD_IDX] = torch.zeros(EMBEDDING_DIM)
print(model.embedding.weight.data)
tensor([[ 0.0000e+00, 0.0000e+00, 0.0000e+00, ..., 0.0000e+00, 0.0000e+00, 0.0000e+00], [ 0.0000e+00, 0.0000e+00, 0.0000e+00, ..., 0.0000e+00, 0.0000e+00, 0.0000e+00], [-3.8194e-02, -2.4487e-01, 7.2812e-01, ..., -1.4590e-01, 8.2780e-01, 2.7062e-01], ..., [-1.8866e-01, 7.5537e-01, -1.6210e-01, ..., 7.8887e-05, 2.2752e-01, -1.9435e-01], [ 8.1298e-02, 2.4855e-01, 4.3583e-01, ..., -3.6564e-01, 6.6258e-01, 5.0125e-01], [ 7.9125e-01, -2.5157e-03, 8.3469e-01, ..., 2.5457e-01, 8.3578e-01, 1.2827e+00]])
import torch.optim as optim
optimizer =optim.Adam(model.parameters())
criterion = nn.BCEWithLogitsLoss()
# GPU
model = model.to(device)
criterion = criterion.to(device)
def binary_accuracy(preds, y):
rounded_preds = torch.round(torch.sigmoid(preds))
correct = (rounded_preds == y).float()
acc = correct.sum() / len(correct)
return acc
def train(model, iterator, optimizer, criterion):
epoch_loss = 0
epoch_acc = 0
model.train()
for batch in iterator:
# 모든 batch마다 gradient를 0으로 초기화합니다.
optimizer.zero_grad()
# batch of sentences인 batch.text를 model에 입력
predictions = model(batch.text).squeeze(1)
# prediction결과와 batch.label을 비교하여 loss값 계산
loss = criterion(predictions, batch.label)
# 정확도 계산
acc = binary_accuracy(predictions, batch.label)
# backward()를 사용하여 역전파 수행
loss.backward()
# 최적화 알고리즘을 사용하여 parameter를 update
optimizer.step()
epoch_loss += loss.item()
epoch_acc += acc.item()
return epoch_loss / len(iterator), epoch_acc / len(iterator)
def evaluate(model, iterator, criterion):
epoch_loss = 0
epoch_acc = 0
# "evaluation mode" : dropout이나 batch nomalizaation을 끔
model.eval()
# pytorch에서 gradient가 계산되지 않도록 해서 memory를 적게 쓰고 computation 속도를 높임
with torch.no_grad():
for batch in iterator :
predictions = model(batch.text).squeeze(1)
loss = criterion(predictions, batch.label)
acc = binary_accuracy(predictions, batch.label)
epoch_loss += loss.item()
epoch_acc += acc.item()
return epoch_loss / len(iterator), epoch_acc / len(iterator)
import time
def epoch_time(start_time, end_time):
elapsed_time = end_time - start_time
elapsed_mins = int(elapsed_time / 60)
elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
return elapsed_mins, elapsed_secs
N_EPOCHS = 5
best_valid_loss = float('inf')
for epoch in range(N_EPOCHS):
start_time = time.time()
train_loss, train_acc = train(model, train_iterator, optimizer, criterion)
valid_loss, valid_acc = evaluate(model, valid_iterator, criterion)
end_time = time.time()
epoch_mins, epoch_secs = epoch_time(start_time, end_time)
if valid_loss < best_valid_loss:
best_valid_loss = valid_loss
torch.save(model.state_dict(), 'tut4-model.pt')
print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
print(f'\t Val. Loss: {valid_loss:.3f} | Val. Acc: {valid_acc*100:.2f}%')
Epoch: 01 | Epoch Time: 0m 20s Train Loss: 0.648 | Train Acc: 61.42% Val. Loss: 0.497 | Val. Acc: 79.02% Epoch: 02 | Epoch Time: 0m 20s Train Loss: 0.419 | Train Acc: 81.02% Val. Loss: 0.341 | Val. Acc: 85.42% Epoch: 03 | Epoch Time: 0m 20s Train Loss: 0.305 | Train Acc: 87.28% Val. Loss: 0.306 | Val. Acc: 87.04% Epoch: 04 | Epoch Time: 0m 20s Train Loss: 0.224 | Train Acc: 91.17% Val. Loss: 0.326 | Val. Acc: 86.20% Epoch: 05 | Epoch Time: 0m 20s Train Loss: 0.158 | Train Acc: 94.07% Val. Loss: 0.317 | Val. Acc: 87.27%
model.load_state_dict(torch.load('tut4-model.pt'))
test_loss, test_acc = evaluate(model, test_iterator, criterion)
print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')
Test Loss: 0.340 | Test Acc: 85.20%
import torch
model.load_state_dict(torch.load('tut4-model.pt'))
<All keys matched successfully>
import spacy
nlp = spacy.load('en_core_web_sm')
def predict_sentiment(model, sentence, min_len = 5):
model.eval()
tokenized = [tok.text for tok in nlp.tokenizer(sentence)]
if len(tokenized) < min_len:
tokenized += ['<pad>'] * (min_len - len(tokenized))
indexed = [TEXT.vocab.stoi[t] for t in tokenized]
tensor = torch.LongTensor(indexed).to(device)
tensor = tensor.unsqueeze(0)
prediction = torch.sigmoid(model(tensor))
return prediction.item()
predict_sentiment(model, "This film is terrible")
0.21040275692939758
predict_sentiment(model, "This film is great")
0.9589695334434509
predict_sentiment(model, "This movie is fantastic")
0.9532867670059204
실험 결과 1차원이므로 학습 속도가 더 빠름을 확인할 수 있습니다. 성능은 2차원의 convolutional layer와 비슷합니다.
INPUT_DIM = len(TEXT.vocab)
EMBEDDING_DIM = 100
N_FILTERS = 100
FILTER_SIZES = [3,4,5]
OUTPUT_DIM = 1
DROPOUT = 0.5
PAD_IDX = TEXT.vocab.stoi[TEXT.pad_token]
model = CNN1d(INPUT_DIM, EMBEDDING_DIM, N_FILTERS, FILTER_SIZES, OUTPUT_DIM, DROPOUT, PAD_IDX)
def count_parameters(model):
return sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f'The model has {count_parameters(model):,} trainable parameters')
The model has 2,620,801 trainable parameters
pretrained_embeddings = TEXT.vocab.vectors
model.embedding.weight.data.copy_(pretrained_embeddings)
tensor([[-1.1172e-01, -4.9659e-01, 1.6307e-01, ..., 1.2647e+00, -2.7527e-01, -1.3254e-01], [-8.5549e-01, -7.2081e-01, 1.3755e+00, ..., 8.2522e-02, -1.1314e+00, 3.9972e-01], [-3.8194e-02, -2.4487e-01, 7.2812e-01, ..., -1.4590e-01, 8.2780e-01, 2.7062e-01], ..., [-1.8866e-01, 7.5537e-01, -1.6210e-01, ..., 7.8887e-05, 2.2752e-01, -1.9435e-01], [ 8.1298e-02, 2.4855e-01, 4.3583e-01, ..., -3.6564e-01, 6.6258e-01, 5.0125e-01], [ 7.9125e-01, -2.5157e-03, 8.3469e-01, ..., 2.5457e-01, 8.3578e-01, 1.2827e+00]])
UNK_IDX = TEXT.vocab.stoi[TEXT.unk_token]
model.embedding.weight.data[UNK_IDX] = torch.zeros(EMBEDDING_DIM)
model.embedding.weight.data[PAD_IDX] = torch.zeros(EMBEDDING_DIM)
import torch.optim as optim
optimizer = optim.Adam(model.parameters())
criterion = nn.BCEWithLogitsLoss()
model = model.to(device)
criterion = criterion.to(device)
N_EPOCHS = 5
best_valid_loss = float('inf')
for epoch in range(N_EPOCHS):
start_time = time.time()
train_loss, train_acc = train(model, train_iterator, optimizer, criterion)
valid_loss, valid_acc = evaluate(model, valid_iterator, criterion)
end_time = time.time()
epoch_mins, epoch_secs = epoch_time(start_time, end_time)
if valid_loss < best_valid_loss:
best_valid_loss = valid_loss
torch.save(model.state_dict(), 'tut4-model2.pt')
print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
print(f'\t Val. Loss: {valid_loss:.3f} | Val. Acc: {valid_acc*100:.2f}%')
Epoch: 01 | Epoch Time: 0m 9s Train Loss: 0.650 | Train Acc: 61.42% Val. Loss: 0.505 | Val. Acc: 77.79% Epoch: 02 | Epoch Time: 0m 9s Train Loss: 0.431 | Train Acc: 80.11% Val. Loss: 0.372 | Val. Acc: 83.54% Epoch: 03 | Epoch Time: 0m 9s Train Loss: 0.304 | Train Acc: 87.29% Val. Loss: 0.310 | Val. Acc: 86.78% Epoch: 04 | Epoch Time: 0m 9s Train Loss: 0.223 | Train Acc: 91.35% Val. Loss: 0.294 | Val. Acc: 87.63% Epoch: 05 | Epoch Time: 0m 9s Train Loss: 0.162 | Train Acc: 93.96% Val. Loss: 0.299 | Val. Acc: 87.73%
model.load_state_dict(torch.load('tut4-model2.pt'))
test_loss, test_acc = evaluate(model, test_iterator, criterion)
print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')
Test Loss: 0.336 | Test Acc: 85.69%