184 lines
7.2 KiB
Python
184 lines
7.2 KiB
Python
from __future__ import print_function
|
|
import argparse
|
|
|
|
import torch
|
|
import torch.nn as nn
|
|
import torch.nn.functional as F
|
|
import torch.optim as optim
|
|
from torchvision import datasets, transforms
|
|
from torch.optim.lr_scheduler import StepLR
|
|
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
|
|
import cv2
|
|
from pykuwahara import kuwahara
|
|
|
|
|
|
|
|
class Net(nn.Module):
|
|
def __init__(self):
|
|
super(Net, self).__init__()
|
|
self.conv1 = nn.Conv2d(1, 32, 3, 1)
|
|
self.conv2 = nn.Conv2d(32, 64, 3, 1)
|
|
self.dropout1 = nn.Dropout(0.25)
|
|
self.dropout2 = nn.Dropout(0.5)
|
|
self.fc1 = nn.Linear(9216, 128)
|
|
self.fc2 = nn.Linear(128, 10)
|
|
|
|
def forward(self, x):
|
|
x = self.conv1(x)
|
|
x = F.relu(x)
|
|
x = self.conv2(x)
|
|
x = F.relu(x)
|
|
x = F.max_pool2d(x,2)
|
|
x = self.dropout1(x)
|
|
x = torch.flatten(x,1)
|
|
x = self.fc1(x)
|
|
x = F.relu(x)
|
|
x = self.dropout2(x)
|
|
x = self.fc2(x)
|
|
output = F.log_softmax(x, dim=1)
|
|
return output
|
|
|
|
def train(args, model, device, train_loader, optimizer, epoch):
|
|
model.train()
|
|
for batch_idx, (data, target) in enumerate(train_loader):
|
|
data, target = data.to(device), target.to(device)
|
|
|
|
# Apply Kuwahara filter to training data on a batch-by-batch basis
|
|
if args.filter != 'none':
|
|
data = filtered(data, len(data), args.filter)
|
|
|
|
optimizer.zero_grad()
|
|
output = model(data)
|
|
loss = F.nll_loss(output, target)
|
|
loss.backward()
|
|
optimizer.step()
|
|
|
|
if batch_idx % args.log_interval == 0:
|
|
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(epoch, batch_idx*len(data), len(train_loader.dataset), 100.*batch_idx/len(train_loader), loss.item()))
|
|
if args.dry_run:
|
|
break
|
|
|
|
def test(args, model, device, test_loader):
|
|
model.eval()
|
|
test_loss = 0
|
|
correct = 0
|
|
with torch.no_grad():
|
|
for data, target in test_loader:
|
|
data, target = data.to(device), target.to(device)
|
|
|
|
# Apply Kuwahara filter to test data on a batch-by-batch basis
|
|
if args.filter != 'none':
|
|
data = filtered(data, len(data), args.filter)
|
|
|
|
output = model(data)
|
|
test_loss += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss
|
|
pred = output.argmax(dim=1, keepdim=True) # get the index of the max log-probability
|
|
correct += pred.eq(target.view_as(pred)).sum().item()
|
|
|
|
test_loss /= len(test_loader.dataset)
|
|
|
|
print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
|
|
test_loss, correct, len(test_loader.dataset),
|
|
100. * correct / len(test_loader.dataset)))
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
|
|
parser.add_argument('--batch-size', type=int, default=64, metavar='N',
|
|
help='input batch size for training (default: 64)')
|
|
parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
|
|
help='input batch size for testing (default: 1000)')
|
|
parser.add_argument('--epochs', type=int, default=14, metavar='N',
|
|
help='number of epochs to train (default: 14)')
|
|
parser.add_argument('--lr', type=float, default=1.0, metavar='LR',
|
|
help='learning rate (default: 1.0)')
|
|
parser.add_argument('--gamma', type=float, default=0.7, metavar='M',
|
|
help='Learning rate step gamma (default: 0.7)')
|
|
parser.add_argument('--no-cuda', action='store_true', default=False,
|
|
help='disables CUDA training')
|
|
parser.add_argument('--no-mps', action='store_true', default=False,
|
|
help='disables macOS GPU training')
|
|
parser.add_argument('--dry-run', action='store_true', default=False,
|
|
help='quickly check a single pass')
|
|
parser.add_argument('--seed', type=int, default=1, metavar='S',
|
|
help='random seed (default: 1)')
|
|
parser.add_argument('--log-interval', type=int, default=10, metavar='N',
|
|
help='how many batches to wait before logging training status')
|
|
parser.add_argument('--save-model', action='store_true', default=False,
|
|
help='For Saving the current Model')
|
|
parser.add_argument('--filter', type=str, metavar='S', default='none',
|
|
help='Apply a filter at runtime')
|
|
args = parser.parse_args()
|
|
|
|
train_kwargs = {'batch_size': args.batch_size}
|
|
test_kwargs = {'batch_size': args.test_batch_size}
|
|
|
|
torch.manual_seed(args.seed)
|
|
|
|
device = torch.device("cpu")
|
|
|
|
transform = transforms.Compose( [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))] )
|
|
|
|
dataset1 = datasets.MNIST('../data', train=True, download=True, transform=transform)
|
|
dataset2 = datasets.MNIST('../data', train=False, transform=transform)
|
|
|
|
train_loader = torch.utils.data.DataLoader(dataset1, **train_kwargs)
|
|
test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs)
|
|
|
|
print(f'Filter Type: {args.filter}')
|
|
|
|
model = Net().to(device)
|
|
optimizer = optim.Adadelta(model.parameters(), lr=args.lr)
|
|
|
|
scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma)
|
|
for epoch in range(1, args.epochs + 1):
|
|
print(f"===== EPOCH {epoch}/{args.epochs} =====")
|
|
train(args, model, device, train_loader, optimizer, epoch)
|
|
test(args, model, device, test_loader)
|
|
scheduler.step()
|
|
|
|
if args.save_model:
|
|
if args.filter is None:
|
|
torch.save(model.state_dict(), "mnist_cnn_unfiltered.pt")
|
|
else:
|
|
torch.save(model.state_dict(), f"mnist_cnn_{args.filter}.pt")
|
|
|
|
|
|
def filtered(data, batch_size=64, filter="kuwahara"):
|
|
# Turn the tensor into an image
|
|
images = data.numpy().transpose(0,2,3,1)
|
|
|
|
# Apply the Kuwahara filter
|
|
filtered_images = np.ndarray((batch_size,28,28,1))
|
|
|
|
if filter == "kuwahara":
|
|
for i in range(batch_size):
|
|
filtered_images[i] = kuwahara(images[i], method='gaussian', radius=5, image_2d=images[i])
|
|
elif filter == "aniso_diff":
|
|
for i in range(batch_size):
|
|
img_3ch = np.zeros((np.array(images[i]), np.array(images[i]).shape[1], 3))
|
|
img_3ch[:,:,0] = images[i]
|
|
img_3ch[:,:,1] = images[i]
|
|
img_3ch[:,:,2] = images[i]
|
|
img_3ch_filtered = cv2.ximgproc.anisotropicDiffusion(img2, alpha=0.2, K=0.5, niters=5)
|
|
filtered_images[i] = cv2.cvtColor(img_3ch_filtered, cv2.COLOR_RGB2GRAY)
|
|
plt.imshow(filtered_images[i])
|
|
plt.show()
|
|
elif filter == "noise":
|
|
pass
|
|
elif filter == "gaussian_blur":
|
|
for i in range(batch_size):
|
|
filtered_images[i] = cv2.GaussianBlur(images[i], ksize=(5,5), sigmaX=0).reshape(filtered_images[i].shape)
|
|
elif filter == "bilateral":
|
|
for i in range(batch_size):
|
|
filtered_images[i] = cv2.bilateralFilter(images[i], 5, 50, 50).reshape(filtered_images[i].shape)
|
|
|
|
# Modify the data with the filtered image
|
|
filtered_images = filtered_images.transpose(0,3,1,2)
|
|
return torch.tensor(filtered_images).float()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|