ResNet은 'Deep Residual Learning for Image Recognition'이라는 논문에서 소개된 모델로
skip connection을 사용하여 2015년 ImageNet 대회에서 1위를 차지한 모델이다.
본 포스트에서는 Deep Residual Learning for Image Recognition이라는 논문을 참고하여 ResNet을 구현해볼 것이다.
그런데 논문에서는 ImageNet 데이터셋에 사용된 모델을 중심으로 설명하고 있지만, ImageNet 데이터를 사용해서 모델이 제대로 학습되었나 확인하기에는 너무 많은 시간이 소요되기에 포스트에서는 CIFAR10 데이터셋에 사용되는 ResNet-32모델을 중심으로 설명할 것이다.
먼저 ResNet의 전체적인 구조는 다음과 같다.
ResNet은 위와 같이 처음 한 개의 Convolution Layer 이후로 여러 종류의 Residual Block들의 순차적인 집합으로 이루어져 있으며, Residual Block은 두 개의 Convolution Layer들로 이루어져있고, block의 input은 skip connection에 의해서 output과 합쳐진다.
그리고 각 Residual Block의 Convolution Layer 이후에는 Batch Normalization과 Relu Activation이 순차적으로 나타나며 skip connection은 마지막 Activation 직전에 더해져서 최종적인 Block의 출력 값을 만들어낸다.
하지만 각 residual block의 집합이 처음 시작될 때는 stride의 값을 2로 설정되어 있어 feature의 크기를 절반으로 축소하며 채널의 수도 2배 증가한다. 따라서 residual block 집합의 중간에 위치한 skip connection은 residual block의 입력값을 그대로 출력 값과 더해줄 수 있지만, residual block 집합의 맨 처음 나오는 residual block의 skip connection에는 다른 방법이 필요하다. ResNet의 논문에서는 이를 위해 2가지 방법을 제시한다.
A) zero padding으로 채널을 증가시키거나, B) 1 x 1 Convolution Layer를 통해서 채널을 증가시킨 후에 각각 stride를 2로 설정하여 feature의 크기를 줄이는 방법등을 제시하는데, ImageNet 데이터에 대한 ResNet에서는 B의 방식을 사용하며, 이번 포스팅에서 설명할 CIFAR10 데이터에 대한 ResNet에서는 A의 방식을 사용한다.
이제 위에 서술한 정보들로 Residual Block을 구현하면 다음과 같다.
class IdentityShortcut(nn.Module):
"""
A class to make Identity Mapping Shortcut
Attributes:
pooling : A max pooling layer
extra_cahnnel : The difference between input an output channels
"""
def __init__(self, in_channels, out_channels, stride):
"""
Initialize the Identity Shortcut Class
Args:
in_channels: number of input channels
out_channels: number of output channels
stride: size of stride
Returns:
None
"""
super().__init__()
self.pooling = nn.MaxPool2d(1, stride=stride)
self.extra_channel = out_channels - in_channels
def forward(self, x):
x = F.pad(x, (0, 0, 0, 0, 0, self.extra_channel))
x = self.pooling(x)
return x
먼저 Residual Block 내부에서 사용되는 타입 A의 skip connection 클래스이다. 위에 서술한대로 zero padding을 이용하여 채널을 증가시킨 다음, 2만큼의 stride를 가진 max pooling을 적용하여 feature의 크기를 절반으로 감소시키도록 구성된 것을 볼 수 있다.
class ResidualBlock(nn.Module):
"""
A class about residual block
When stride == 1, just add input value to output of residual block
When stride == 2, process the input value according to shortcut type and add it to output of residual block
Attributes:
residual_blcok: A sequential container to sequentially configure the layers for residual learning
shortcut: A shortcut connection of residual block
relu: A relu activation layer
"""
def __init__(self, in_channels, out_channels, kernel_size, stride, shortcut_type = None):
"""
Initialize the residual block
Args:
in_channels: The number of input channels
out_channels: The number of output channels
kernel_size: The number of kernnel size in the convolution layers
stride: The size of stride
shortcut_type: The type of shortcut connection when stride is not 1
Returns:
None
"""
super().__init__()
self.residual_block = nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding=1, bias=False),
nn.BatchNorm2d(out_channels),
nn.ReLU(),
nn.Conv2d(out_channels, out_channels, kernel_size, padding = 1, bias = False),
nn.BatchNorm2d(out_channels)
)
self.shortcut = nn.Sequential()
self.relu = nn.ReLU()
if stride != 1:
if shortcut_type == 'A':
self.shortcut = IdentityShortcut(in_channels, out_channels, stride)
elif shortcut_type == 'B':
self.shortcut = nn.Sequential(
nn.Conv2d(in_channels, out_channels, 1, stride),
nn.BatchNorm2d(out_channels)
)
def forward(self, x):
x = self.residual_block(x) + self.shortcut(x)
x = self.relu(x)
return x
이어서 Residual Block이다. 앞서 서술한 것처럼 각각의 Convolution Layer 이후에는 Batch Normalization과 ReLU Activation이 적용되며, shortcut의 경우 stride가 1인 경우에는 단순히 block의 입력과 출력을 더해주지만, stride가 2인 경우에는 입력의 크기를 절반으로 줄이고, 채널을 2배 늘리는 shortcut을 적용하는 것을 볼 수 있다.
Residual Blcok이 구현이 완료되었으니 이제는 ResNet을 구현할 차례이다. 전체적인 구성은 첫번째 이미지의 구조도와 표와 같이 되어 있으나, 그건 전적으로 ImageNet 데이터에 대한 ResNet으로 CIFAR10 데이터에 대한 ResNet은 다음과 같이 약간의 변형이 이루어져 있다.
CIFAR10 데이터에 대한 ResNet은 먼저 첫번째 Convolution Layer가 7x7 크기가 아닌 3x3 크기이다. 또한 Residual Block의 집합도 4개가 아닌 3개로 이루어져 있다. 그리고 각 집합에서의 채널의 수가 ImageNet 모델 대비 1/4라는 점을 유념해야 한다. 이 정보들을 토대로 모델을 구현하면 다음과 같다.
class ResNet(nn.Module):
"""
A class about residual network
Attributes:
conv1_x: First set of layers
conv2_x: Second set of residual blocks
conv3_x: Third set of residual blocks
conv4_x: Fourth set of residual blocks
avg_pool: A average pooling layer of residual network
flatten: A flatten layer for the fully connected layer
fc: A fully connected layer to get probability of each class about given image
"""
def __init__(self, conv_num, num_classes):
"""
Initialize the residual network
Args:
conv_num: The number of residual blocks in each set of residual block
num_classes: The number of classes to predict
Returns:
None
"""
super().__init__()
self.conv1_x = nn.Sequential(
nn.Conv2d(3, 16, kernel_size = 3, stride = 1, padding=1, bias=False),
nn.BatchNorm2d(16),
nn.ReLU(),
#nn.MaxPool2d(kernel_size = 3, stride = 1, padding = 1)
)
self.conv2_x = nn.Sequential(*[ResidualBlock(16, 16, 3, 1) for _ in range(conv_num[0])])
self.conv3_x = nn.Sequential(ResidualBlock(16, 32, 3, 2, 'A'), *[ResidualBlock(32, 32, 3, 1) for _ in range(conv_num[1] - 1)])
self.conv4_x = nn.Sequential(ResidualBlock(32, 64, 3, 2, 'A'), *[ResidualBlock(64, 64, 3, 1) for _ in range(conv_num[2] - 1)])
#self.conv5_x = nn.Sequential(ResidualBlock(256, 128, 3, 2), *[ResidualBlock(512, 512, 3, 1) for _ in range(conv_num[3])])
#self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))
self.avg_pool = nn.AvgPool2d(8, stride=1)
self.flatten = nn.Flatten()
self.fc = nn.Linear(in_features=64,out_features=num_classes)
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity = 'relu')
if m.bias is not None:
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.BatchNorm2d):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.Linear):
nn.init.normal_(m.weight, 0, 0.01)
nn.init.constant_(m.bias, 0)
def forward(self, x):
x = self.conv1_x(x)
x = self.conv2_x(x)
x = self.conv3_x(x)
x = self.conv4_x(x)
#x = self.conv5_x(x)
x = self.avg_pool(x)
x = self.flatten(x)
x = self.fc(x)
return x
위 코드에서는 모델 마지막의 average pooling을 정적인 크기를 가지는 AvgPool2d 모듈을 사용하였지만, 모델을 사용할 때, 입력 이미지의 크기가 동적이라면 AdaptiveAvgPool2d 모듈을 사용하면 된다. 참고로 Pytorch에서 제공하는 ResNet의 average pooling 방식은 후자의 것이라고 한다. 그리고 ResNet의 생성할 때, 각 레이어의 weight을 초기화하는데 이는 논문이 참고한 방식대로 초기화하였다.
summary 메소드로 위의 코드로 구현한 ResNet32 모델의 정보를 조회한 결과 논문에서 선보인 것과 동일한 0.46M의 parameter를 가져 올바르게 구현한 것을 확인할 수 있었다.
이제 모델의 구현이 끝났으며 모델을 학습할 때이다.
논문에서는 모델을 학습할 때 사용된 Data Augmentation 기법을 그대로 따라서 무작위로 이미지를 잘라내고, 수평방향으로 뒤집는 방식을 사용하였다.
train_transforms = transforms.Compose([
#transforms.Resize(224),
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize(train_mean, train_std),
])
val_transforms = transforms.Compose([
transforms.ToTensor(),
#transforms.Resize(224),
transforms.Normalize(train_mean, train_std)
])
train_cifar10.transform = train_transforms
val_cifar10.transform = train_transforms
train_dl = DataLoader(train_cifar10, batch_size=256, shuffle=True, num_workers=4)
val_dl = DataLoader(val_cifar10, batch_size=128, shuffle=True, num_workers=4)
그리고 optimizer의 설정과 Learning Rate Scheduler의 설정을 그대로 따라주었다. 하지만 정말로 64k의 epoch를 진행해볼 수는 없으니 임의로 100 epoch만 학습하였다.
writer = SummaryWriter('resnet_logs')
optimizer = torch.optim.SGD(model.parameters(), lr=0.1,
momentum=0.9, weight_decay=1e-4)
device = 'cuda'
model.to(device)
epochs = 100
loss_func = nn.CrossEntropyLoss()
decay_epoch = [32000, 48000]
lr_scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=decay_epoch, gamma=0.1)
best_model = None
best_accs = -1
for _ in tqdm(range(epochs)):
global_loss = 0
corrects = 0
model.train()
for batch_idx, (data, target) in enumerate(train_dl):
data, target = data.to(device), target.to(device)
output = model(data)
loss = loss_func(output, target)
global_loss = global_loss + loss.item()
optimizer.zero_grad()
loss.backward()
optimizer.step()
lr_scheduler.step()
corrects += count_correct(output, target)
train_losses.append(global_loss / (batch_idx + 1))
train_accs.append(corrects / len(train_cifar10) * 100)
model.eval()
corrects = 0
global_loss = 0
for batch_idx, (data, target) in enumerate(val_dl):
data, target = data.to(device), target.to(device)
with torch.no_grad():
output = model(data)
loss = loss_func(output, target)
global_loss = global_loss + loss.item()
corrects += count_correct(output, target)
val_losses.append(global_loss / (batch_idx + 1))
val_accs.append(corrects / len(val_cifar10) * 100)
writer.add_scalar('resnet_log/train_error', 100 - train_accs[-1], _ + 1)
writer.add_scalar('resnet_log/validation_error', 100 - val_accs[-1], _ + 1)
if (_ + 1) % 10 == 0:
print("Epoch %d | train_loss = %.2f | train_acc = %.2f | val_loss = %.2f | val_acc = %.2f" % (_ + 1, train_losses[-1], train_accs[-1], val_losses[-1], val_accs[-1]))
100 Epoch 학습 결과 학습이 원활하게 이루어지는 것을 확인할 수 있었다.
구현한 전체 소스코드는 다음 링크의 resnet.py와 resnet.ipynb 파일에서 확인할 수 있다.
ResNet은 이것으로 마무리되었고, 다음에는 Attention 개념의 alignment를 도입한 'Neural_Machine_Translation_by_Jointly_Learning_to_Align_and_Translate' 논문의 모델을 구현해볼 것이다.
개인적으로 이번 주 내로 마무리하고 싶지만 NLP 쪽이 익숙하지 않아서 언제 마무리될지는 모르겠다.