# flake8: noqa """ResNet in PyTorch. ImageNet-Style ResNet [1] Kaiming He, Xiangyu Zhang, Shaoqing Ren, Jian Sun Deep Residual Learning for Image Recognition. arXiv:1512.03385 Adapted from: https://github.com/bearpaw/pytorch-classification """ import torch import torch.nn as nn import torch.nn.functional as F from geowatch.tasks.rutgers_material_seg.models.tex_refine import TeRN class ASPP(nn.Module): def __init__(self, C, depth, num_classes, conv=nn.Conv2d, norm=nn.BatchNorm2d, momentum=0.0003, mult=1): super(ASPP, self).__init__() self._C = C self._depth = depth self._num_classes = num_classes self.global_pooling = nn.AdaptiveAvgPool2d(1) self.relu = nn.ReLU(inplace=True) self.aspp1 = conv(C, depth, kernel_size=1, stride=1, bias=False) self.aspp2 = conv(C, depth, kernel_size=3, stride=1, dilation=int(6 * mult), padding=int(6 * mult), bias=False) self.aspp3 = conv(C, depth, kernel_size=3, stride=1, dilation=int(12 * mult), padding=int(12 * mult), bias=False) self.aspp4 = conv(C, depth, kernel_size=3, stride=1, dilation=int(18 * mult), padding=int(18 * mult), bias=False) self.aspp5 = conv(C, depth, kernel_size=1, stride=1, bias=False) self.aspp1_bn = norm(depth, momentum) self.aspp2_bn = norm(depth, momentum) self.aspp3_bn = norm(depth, momentum) self.aspp4_bn = norm(depth, momentum) self.aspp5_bn = norm(depth, momentum) self.conv2 = conv(depth * 5, depth, kernel_size=1, stride=1, bias=False) self.bn2 = norm(depth, momentum) self.conv3 = nn.Conv2d(depth, num_classes, kernel_size=1, stride=1) def forward(self, x): x1 = self.aspp1(x) x1 = self.aspp1_bn(x1) x1 = self.relu(x1) x2 = self.aspp2(x) x2 = self.aspp2_bn(x2) x2 = self.relu(x2) x3 = self.aspp3(x) x3 = self.aspp3_bn(x3) x3 = self.relu(x3) x4 = self.aspp4(x) x4 = self.aspp4_bn(x4) x4 = self.relu(x4) x5 = self.global_pooling(x) x5 = self.aspp5(x5) x5 = self.aspp5_bn(x5) x5 = self.relu(x5) x5 = nn.Upsample((x.shape[2], x.shape[3]), mode='bilinear', align_corners=True)(x5) x = torch.cat((x1, x2, x3, x4, x5), 1) x = self.conv2(x) x = self.bn2(x) x = self.relu(x) x = self.conv3(x) return x class DoubleConv(nn.Module): """(convolution => [BN] => ReLU) * 2""" def __init__(self, in_channels, out_channels): super().__init__() self.double_conv = nn.Sequential( nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=1, padding=1), nn.GroupNorm(32, out_channels), nn.LeakyReLU(0.2), nn.Dropout(0.5), nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1), nn.GroupNorm(32, out_channels), nn.LeakyReLU(0.2), ) def forward(self, x): return self.double_conv(x) class Up(nn.Module): """Upscaling then double conv""" def __init__(self, in_channels, out_channels, bilinear=True): super().__init__() # if bilinear, use the normal convolutions to reduce the number of # channels if bilinear: self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True) else: self.up = nn.ConvTranspose2d(in_channels // 2, in_channels // 2, kernel_size=2, stride=2) self.conv = DoubleConv(in_channels, out_channels) # Given transposed=1, weight of size [48, 48, 2, 2], 48 -> 32+64//2, instead, # expected input[4, 64, 128, 128] to have 48 channels, but got 64 # channels instead def forward(self, x1, x2): # print(x1.shape) x1 = self.up(x1) # input is CHW diffY = x2.size()[2] - x1.size()[2] diffX = x2.size()[3] - x1.size()[3] x1 = F.pad(x1, [diffX // 2, diffX - diffX // 2, diffY // 2, diffY - diffY // 2]) x = torch.cat([x2, x1], dim=1) return self.conv(x) class BasicBlock(nn.Module): expansion = 1 def __init__(self, in_planes, planes, stride=1, is_last=False): super(BasicBlock, self).__init__() self.is_last = is_last self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False) self.bn1 = nn.BatchNorm2d(planes) self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False) self.bn2 = nn.BatchNorm2d(planes) self.shortcut = nn.Sequential() if stride != 1 or in_planes != self.expansion * planes: self.shortcut = nn.Sequential( nn.Conv2d(in_planes, self.expansion * planes, kernel_size=1, stride=stride, bias=False), nn.BatchNorm2d(self.expansion * planes) ) def forward(self, x): out = F.relu(self.bn1(self.conv1(x))) out = self.bn2(self.conv2(out)) out += self.shortcut(x) preact = out out = F.relu(out) if self.is_last: return out, preact else: return out class Bottleneck(nn.Module): expansion = 4 def __init__(self, in_planes, planes, stride=1, is_last=False): super(Bottleneck, self).__init__() self.is_last = is_last self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False) self.bn1 = nn.BatchNorm2d(planes) self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False) self.bn2 = nn.BatchNorm2d(planes) self.conv3 = nn.Conv2d(planes, self.expansion * planes, kernel_size=1, bias=False) self.bn3 = nn.BatchNorm2d(self.expansion * planes) self.shortcut = nn.Sequential() if stride != 1 or in_planes != self.expansion * planes: self.shortcut = nn.Sequential( nn.Conv2d(in_planes, self.expansion * planes, kernel_size=1, stride=stride, bias=False), nn.BatchNorm2d(self.expansion * planes) ) def forward(self, x): out = F.relu(self.bn1(self.conv1(x))) out = F.relu(self.bn2(self.conv2(out))) out = self.bn3(self.conv3(out)) out += self.shortcut(x) preact = out out = F.relu(out) if self.is_last: return out, preact else: return out class ResNet(nn.Module): def __init__(self, block, num_blocks, num_channels=3, zero_init_residual=False, pretrained=False, num_classes=None, beta=False, weight_std=False, num_groups=32, out_dim=128, feats=[64, 128, 256, 512, 256]): super(ResNet, self).__init__() self.in_planes = 64 self.conv1_diff = nn.Conv2d(num_channels, 64, kernel_size=3, stride=1, padding=1, bias=False) self.conv1_cat = nn.Conv2d(num_channels, 64, kernel_size=3, stride=1, padding=1, bias=False) self.bn1 = nn.BatchNorm2d(64) self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1) self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2) self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2) self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2) self.drop1 = nn.Dropout(0.35) self.drop2 = nn.Dropout(0.2) self.drop3 = nn.Dropout(0.35) self.avgpool = nn.AdaptiveAvgPool2d((1, 1)) # self.fc = nn.Linear(512 * block.expansion, num_classes) # def _norm(planes, momentum=0.05): # return nn.BatchNorm2d(planes, momentum=momentum) # self.norm = _norm # self.conv = nn.Conv2d self.aspp = ASPP(feats[3], 256, 256) self._aff = TeRN(num_iter=10, dilations=[1, 1, 2, 4, 6, 8]) self.up1 = Up(feats[4] + feats[3], feats[3], bilinear=True) self.up2 = Up(feats[2] + feats[3], feats[2], bilinear=True) self.up3 = Up(feats[1] + feats[2], feats[1], bilinear=True) self.up4 = Up(feats[0] + feats[1], feats[0], bilinear=True) self.outconv = nn.Conv2d(feats[0], num_classes, kernel_size=1, stride=1, bias=False) for m in self.modules(): if isinstance(m, nn.Conv2d): nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu') elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)): nn.init.constant_(m.weight, 1) nn.init.constant_(m.bias, 0) # Zero-initialize the last BN in each residual branch, # so that the residual branch starts with zeros, and each residual block behaves # like an identity. This improves the model by 0.2~0.3% according to: # https://arxiv.org/abs/1706.02677 if zero_init_residual: for m in self.modules(): if isinstance(m, Bottleneck): nn.init.constant_(m.bn3.weight, 0) elif isinstance(m, BasicBlock): nn.init.constant_(m.bn2.weight, 0) def _make_layer(self, block, planes, num_blocks, stride): strides = [stride] + [1] * (num_blocks - 1) layers = [] for i in range(num_blocks): stride = strides[i] layers.append(block(self.in_planes, planes, stride)) self.in_planes = planes * block.expansion return nn.Sequential(*layers) def forward(self, img1, img2, layer=100): x1_1 = F.relu(self.bn1(self.conv1_diff(img1))) x1_2 = F.relu(self.bn1(self.conv1_diff(img2))) # x1_cont = torch.abs(x1_1 - x1_2) x11_1 = self.layer1(x1_1) x11_2 = self.layer1(x1_2) x1_cont = torch.abs(x11_1 - x11_2) # x2 = self.layer2(x1) x2_1 = self.layer2(x11_1) x2_2 = self.layer2(x11_2) x2_cont = torch.abs(x2_1 - x2_2) x3_1 = self.layer3(x2_1) x3_2 = self.layer3(x2_2) x3_cont = torch.abs(x3_1 - x3_2) x4_1 = self.layer4(x3_1) x4_2 = self.layer4(x3_2) x4_cont = torch.abs(x4_1 - x4_2) x4_cont = self.avgpool(x4_cont) x_feats = torch.flatten(x4_cont, 1) x_aspp = self.aspp(x4_cont) x = self.up1(x_aspp, x4_cont) x = self.drop1(x) x = self.up2(x, x3_cont) # x = self.drop2(x) x = self.up3(x, x2_cont) # x = self.drop3(x) x = self.up4(x, x1_cont) x = self.outconv(x) # classifer = self.fc(x) return x # , x_feats # def forward(self, x, layer=100): # # x1 = F.relu(self.bn1(self.conv1(x))) # x1 = F.relu(self.bn1(self.conv1_cat(x))) # # x1 = self._aff(x, x1) # x1 = self.layer1(x1) # x2 = self.layer2(x1) # x3 = self.layer3(x2) # x4 = self.layer4(x3) # x4 = self.avgpool(x4) # x_feats = torch.flatten(x4, 1) # x_aspp = self.aspp(x4) # x = self.up1(x_aspp, x4) # x = self.up2(x, x3) # x = self.up3(x, x2) # x = self.up4(x, x1) # x = self.outconv(x) # # classifer = self.fc(x) # return x, x_feats def resnet18(**kwargs): return ResNet(BasicBlock, [2, 2, 2, 2], **kwargs) def resnet34(pretrained=False, **kwargs): """Constructs a ResNet-34 model. Args: pretrained (bool): If True, returns a model pre-trained on ImageNet """ model = ResNet(BasicBlock, [3, 4, 6, 3], **kwargs) # if pretrained: # model.load_state_dict(model_zoo.load_url(model_urls['resnet50'])) if pretrained: model_dict = model.state_dict() # /home/native/projects/data/smart_watch/models/experiments_onera/tasks_experiments_onera_2021-10-18-13:27/experiments_epoch_0_loss_11.28138166103723_valmF1_0.6866047574166068_valChangeF1_0.49019877611815305_time_2021-10-18-14:15:27.pth # # pretrained_path = "/home/native/projects/data/smart_watch/models/experiments_onera/tasks_experiments_onera_2021-10-07-10:23/experiments_epoch_8_loss_3394.9326448260613_valmIoU_0.5388350590429163_time_2021-10-07-22:05:00.pth" pretrained_path = "/home/native/projects/data/smart_watch/models/experiments_onera/tasks_experiments_onera_2021-10-18-13:27/experiments_epoch_0_loss_11.28138166103723_valmF1_0.6866047574166068_valChangeF1_0.49019877611815305_time_2021-10-18-14:15:27.pth" pretrained_dict = torch.load(pretrained_path)['model'] # pretrained_dict = model_zoo.load_url(model_urls['resnet50']) overlap_dict = {k[7:]: v for k, v in pretrained_dict.items() if k[7:] in model_dict} # for k, v in overlap_dict.items(): # v.requires_grad=False model_dict.update(overlap_dict) model.load_state_dict(model_dict) print(f"loaded {len(overlap_dict)}/{len(pretrained_dict)} layers") return model def resnet50(**kwargs): return ResNet(Bottleneck, [3, 4, 6, 3], **kwargs) def resnet101(**kwargs): return ResNet(Bottleneck, [3, 4, 23, 3], **kwargs) model_dict = { 'resnet18': [resnet18, 512], 'resnet34': [resnet34, 512], 'resnet50': [resnet50, 2048], 'resnet101': [resnet101, 2048], }