142 lines
4.6 KiB
Python
142 lines
4.6 KiB
Python
import torch
|
|
import torch.nn as nn
|
|
import torch.nn.functional as F
|
|
import torch.optim as optim
|
|
from torchvision import datasets, transforms
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
import cv2
|
|
from mnist import Net
|
|
from pykuwahara import kuwahara
|
|
|
|
epsilons = np.arange(0.05,0.35,0.05)
|
|
pretrained_model = "data/lenet_mnist_model.pth"
|
|
use_cuda=False
|
|
|
|
torch.manual_seed(69)
|
|
|
|
|
|
test_loader = torch.utils.data.DataLoader(
|
|
datasets.MNIST('data/', train=False, download=True, transform=transforms.Compose([
|
|
transforms.ToTensor(),
|
|
transforms.Normalize((0.1307,), (0.3081,)),
|
|
])),
|
|
batch_size=1, shuffle=True)
|
|
|
|
print("CUDA Available: ", torch.cuda.is_available())
|
|
device = torch.device("cuda" if use_cuda and torch.cuda.is_available() else "cpu")
|
|
|
|
model = Net().to(device)
|
|
|
|
model.load_state_dict(torch.load(pretrained_model, map_location=device))
|
|
|
|
model.eval()
|
|
|
|
def fgsm_attack(image, epsilon, data_grad):
|
|
sign_data_grad = data_grad.sign()
|
|
perturbed_image = image + epsilon*sign_data_grad
|
|
perturbed_image = torch.clamp(perturbed_image, 0, 1)
|
|
return perturbed_image
|
|
|
|
def denorm(batch, mean=[0.1307], std=[0.3081]):
|
|
"""
|
|
Convert a batch of tensors to their original scale.
|
|
|
|
Args:
|
|
batch (torch.Tensor): Batch of normalized tensors.
|
|
mean (torch.Tensor or list): Man used for normalization.
|
|
std (torch.Tensor or list): Standard deviation used for normalization.
|
|
|
|
Returns:
|
|
torch.Tensor: batch of tensors without normalization applied to them.
|
|
"""
|
|
|
|
if isinstance(mean, list):
|
|
mean = torch.tensor(mean).to(device)
|
|
if isinstance(std, list):
|
|
std = torch.tensor(std).to(device)
|
|
|
|
return batch * std.view(1, -1, 1, 1) + mean.view(1, -1, 1, 1)
|
|
|
|
def test(model, device, test_loader, epsilon):
|
|
orig_correct = 0
|
|
attacked_correct = 0
|
|
filtered_correct = 0
|
|
adv_examples = []
|
|
incorrect=0
|
|
|
|
for data, target in test_loader:
|
|
data, target = data.to(device), target.to(device)
|
|
data.requires_grad = True
|
|
|
|
output_orig = model(data)
|
|
orig_pred = output_orig.max(1, keepdim=True)[1]
|
|
|
|
loss = F.nll_loss(output_orig, target)
|
|
|
|
model.zero_grad()
|
|
|
|
loss.backward()
|
|
data_grad = data.grad.data
|
|
|
|
data_denorm = denorm(data)
|
|
|
|
perturbed_data = fgsm_attack(data_denorm, epsilon, data_grad)
|
|
|
|
perturbed_data_normalized = transforms.Normalize((0.1307,), (0.3081,))(perturbed_data)
|
|
|
|
# convert the perturbed data tensor to a cv2 readable image
|
|
image = perturbed_data_normalized.detach().numpy().transpose(0,2,3,1)
|
|
|
|
# apply the Kuwahara filter
|
|
filtered_image = np.ndarray((1,28,28,1))
|
|
filtered_image[0] = kuwahara(image[0], method='gaussian', radius=3, image_2d=image[0])
|
|
|
|
# convert the cv2 image back to a torch tensor
|
|
filtered_image = filtered_image.transpose(0,3,1,2)
|
|
perturbed_data_filtered = torch.tensor(filtered_image).float()
|
|
|
|
# evaluate the model on the attacked and filtered images
|
|
output_attacked = model(perturbed_data_normalized)
|
|
output_filtered = model(perturbed_data_filtered)
|
|
|
|
attacked_pred = output_attacked.max(1, keepdim=True)[1]
|
|
filtered_pred = output_filtered.max(1, keepdim=True)[1]
|
|
|
|
if orig_pred.item() == target.item():
|
|
orig_correct += 1
|
|
else:
|
|
incorrect += 1
|
|
|
|
if attacked_pred.item() == target.item():
|
|
attacked_correct += 1
|
|
|
|
if epsilon == 0 and len(adv_examples) < 5:
|
|
adv_ex = perturbed_data.squeeze().detach().cpu().numpy()
|
|
adv_examples.append( (orig_pred.item(), attacked_pred.item(), adv_ex) )
|
|
|
|
if filtered_pred.item() == target.item():
|
|
filtered_correct += 1
|
|
|
|
if epsilon == 0 and len(adv_examples) < 5:
|
|
adv_ex = perturbed_data.squeeze().detach().cpu().numpy()
|
|
adv_examples.append( (orig_pred.item(), filtered_pred.item(), adv_ex) )
|
|
|
|
orig_acc = orig_correct/float(len(test_loader))
|
|
attacked_acc = attacked_correct/float(len(test_loader))
|
|
filtered_acc = filtered_correct/float(len(test_loader))
|
|
print(f"Epsilon: {epsilon}")
|
|
print(f"Original Accuracy = {orig_correct} / {len(test_loader)} = {orig_acc}")
|
|
print(f"Attacked Accuracy = {attacked_correct} / {len(test_loader)} = {attacked_acc}")
|
|
print(f"Filtered Accuracy = {filtered_correct} / {len(test_loader)} = {filtered_acc}")
|
|
|
|
return attacked_acc, adv_examples
|
|
|
|
accuracies = []
|
|
examples = []
|
|
|
|
for eps in epsilons:
|
|
acc, ex = test(model, device, test_loader, eps)
|
|
accuracies.append(acc)
|
|
examples.append(ex)
|