Это продолжение предыдущего поста, где я делаю полное пошаговое руководство по созданию автономного симулятора грузовика с использованием fast.ai, но в конечном итоге эти методы могут работать в любом случае, когда вам нужно настроить предварительно обученные модели или разработать модели, которые позволяют прогнозировать ограничивающие рамки и классы вместе.
Теперь моя цель - пройтись по некоторым техническим аспектам процессов обучения и логического вывода и объяснить детали того, как они реализованы в PyTorch. Вы также можете сослаться на кодовую базу в этом репозитории Github.
Напомним из прошлого поста, что здесь работают две нейронные сети.
- DNN для прогнозирования направления поворота.
- DNN для прогнозирования ограничивающих рамок и классов автомобилей, людей и т. Д.
Тонкая настройка модели направлений поворота
Обе сети начинаются с предварительно обученной сети resnet34 и настраиваются на соответствующую задачу.
Предварительно обученный реснет34 можно получить из torchvision.models
import torchvision.models as models arch = models.resnet34(pretrained=True)
Все предварительно обученные модели были предварительно обучены на наборе данных Imagenet с классом 1000.
Чтобы настроить предварительно обученную сеть, мы, по сути, просто начинаем с набора весов, в которые уже встроен большой объем информации о наборе данных Imagenet. Итак, мы можем сделать это одним из двух способов. Один из способов - заморозить все ранние слои, установив requires_grad=False
, а затем оставить только requires_grad=True
для последних слоев. Другой способ - просто использовать все веса в качестве инициализации и продолжить обучение на наших новых обучающих данных.
Для варианта 1, где мы замораживаем ранние слои и обучаем только последние слои, мы можем установить requires_grad=False
для всех слоев, а затем удалить и заменить последние слои (всякий раз, когда вы назначаете слой сети, он автоматически устанавливает для атрибута requires_grad
значение Верно).
class Flatten(nn.Module): def __init__(self): super(Flatten, self).__init__() def forward(self, x): x = x.view(x.size(0), -1) return x class normalize(nn.Module): def __init__(self): super(normalize, self).__init__() def forward(self, x): x = F.normalize(x, p=2, dim=1) return x layer_list = list(arch.children())[-2:] arch = nn.Sequential(*list(arch.children())[:-2]) arch.avgpool = nn.AdaptiveAvgPool2d(output_size=(1,1)) arch.fc = nn.Sequential( Flatten(), nn.Linear(in_features=layer_list[1].in_features, out_features=3, bias=True), normalize() ) arch = arch.to(device)
Если вы посмотрите на архитектуру resnet34, вы увидите, что за последним блоком conv следует AdaptiveAvgPool2d
и Linear
слой.
(2): BasicBlock( (conv1): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) (bn1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=nn.Sequential, track_running_stats=True) (relu): ReLU(inplace=True) (conv2): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) (bn2): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) ) ) (avgpool): AdaptiveAvgPool2d(output_size=(1, 1)) (fc): Linear(in_features=512, out_features=1000, bias=True) )
Мы можем удалить два последних слоя с помощью nn.Sequential(*list(arch.children())[:-2])
, а затем снова прикрепить их к концу с помощью arch.avgpool = nn.AdaptiveAvgPool2d(output_size=(1,1))
и еще nn.Sequential
со слоями aFlatten
, Linear
и normalize
. В конечном итоге мы хотим предсказать 3 класса: левый, правый, прямой, поэтому наш out_features
будет 3.
Теперь мы создадим наш набор данных и загрузчик данных для модели направлений. Поскольку наши данные - это просто изображения и классы [слева, справа, прямо], мы могли бы просто использовать встроенный класс набора данных torch, но мне нравится использовать настраиваемый класс независимо от того, потому что я могу точно увидеть, как данные извлекаются легче.
class DirectionsDataset(Dataset): """Directions dataset.""" def __init__(self, csv_file, root_dir, transform=None): """ Args: csv_file (string): Path to the csv file with labels. root_dir (string): Directory with all the images. transform (callable, optional): Optional transform """ self.label = pd.read_csv(csv_file) self.root_dir = root_dir self.transform = transform def __len__(self): return len(self.label) def __getitem__(self, idx): img_name = os.path.join(self.root_dir, self.label.iloc[idx, 0]) image = io.imread(img_name+'.jpg') sample = image label = self.label.iloc[idx, 1] if self.transform: sample = self.transform(sample) return sample, label
Имена моих изображений в CSV-файле не имеют расширений, поэтому img_name+’.jpg’
.
tensor_dataset = DirectionsDataset(csv_file='data/labels_directions.csv', root_dir='data/train3/', transform=transforms.Compose([ transforms.ToTensor(), transforms.Normalize( (0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])) dataloader = DataLoader(tensor_dataset, batch_size=16, shuffle=True)
Итак, мы готовы приступить к обучению модели.
def train_model(model, criterion, optimizer, scheduler, dataloader, num_epochs=25): since = time.time() FT_losses = [] best_model_wts = copy.deepcopy(model.state_dict()) best_acc = 0.0 iters = 0 for epoch in range(num_epochs): print('Epoch {}/{}'.format(epoch, num_epochs - 1)) print('-' * 10) scheduler.step() model.train() # Set model to training mode running_loss = 0.0 running_corrects = 0 # Iterate over data. for i, (inputs, labels) in enumerate(dataloader): #set_trace() inputs = inputs.to(device) labels = labels.to(device) # zero the parameter gradients optimizer.zero_grad() # forward # track history if only in train model.eval() # Set model to evaluate mode with torch.no_grad(): outputs = model(inputs) #set_trace() _, preds = torch.max(outputs, 1) outputs = model(inputs) loss = criterion(outputs, labels) # backward + optimize only if in training phase loss.backward() optimizer.step() FT_losses.append(loss.item()) # statistics running_loss += loss.item() * inputs.size(0) running_corrects += torch.sum(preds == labels.data) #set_trace() iters += 1 if iters % 2 == 0: print('Prev Loss: {:.4f} Prev Acc: {:.4f}'.format( loss.item(), torch.sum(preds == labels.data) / inputs.size(0))) epoch_loss = running_loss / dataset_size epoch_acc = running_corrects.double() / dataset_size print('Loss: {:.4f} Acc: {:.4f}'.format( epoch_loss, epoch_acc)) # deep copy the model if epoch_acc > best_acc: best_acc = epoch_acc best_model_wts = copy.deepcopy(model.state_dict()) time_elapsed = time.time() - since print('Training complete in {:.0f}m {:.0f}s'.format( time_elapsed // 60, time_elapsed % 60)) print('Best val Acc: {:4f}'.format(best_acc)) # load best model weights model.load_state_dict(best_model_wts) return model, FT_losses
В этом цикле обучения мы можем отслеживать лучшие веса модели, если точность по эпохе на данный момент является наилучшей. Мы также можем отслеживать потери на каждой итерации и каждой эпохе и возвращать это в конце, чтобы построить график и посмотреть, как это выглядит для отладки или презентации.
Имейте в виду, что модель обучается на каждой итерации, и если вы остановите цикл обучения, она сохранит эти веса, и обучение можно продолжить снова, просто запустив команду train_model()
еще раз. Чтобы начать с самого начала, снова вернитесь и повторно инициализируйте веса с предварительно обученной архитектурой.
criterion = nn.CrossEntropyLoss() # Observe that all parameters are being optimized optimizer_ft = optim.SGD(arch.parameters(), lr=1e-2, momentum=0.9) # Decay LR by a factor of *gamma* every *step_size* epochs exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1) arch, FT_losses = train_model(arch, criterion, optimizer_ft, exp_lr_scheduler, dataloader, num_epochs=5)
Тонкая настройка модели ограничивающего прямоугольника
Опять же, мы будем использовать предварительно обученную архитектуру resnet34. Однако на этот раз нам придется отредактировать его более существенно, чтобы вывести как прогнозы класса, так и значения ограничивающей рамки. Кроме того, это проблема прогнозирования нескольких классов, поэтому может быть 1 ограничивающая рамка или может быть 15 - также 1 или 15 классов.
Мы создадим настраиваемый заголовок для архитектуры аналогично тому, как мы заменили слои в модели направлений.
class StdConv(nn.Module): def __init__(self, nin, nout, stride=2, drop=0.1): super().__init__() self.conv = nn.Conv2d(nin, nout, 3, stride=stride, padding=1) self.bn = nn.BatchNorm2d(nout) self.drop = nn.Dropout(drop) def forward(self, x): return self.drop(self.bn(F.relu(self.conv(x)))) def flatten_conv(x,k): bs,nf,gx,gy = x.size() x = x.permute(0,2,3,1).contiguous() return x.view(bs,-1,nf//k) class OutConv(nn.Module): def __init__(self, k, nin, bias): super().__init__() self.k = k self.oconv1 = nn.Conv2d(nin, (len(id2cat)+1)*k, 3, padding=1) self.oconv2 = nn.Conv2d(nin, 4*k, 3, padding=1) self.oconv1.bias.data.zero_().add_(bias) def forward(self, x): return [flatten_conv(self.oconv1(x), self.k), flatten_conv(self.oconv2(x), self.k)] drop=0.4 class SSD_MultiHead(nn.Module): def __init__(self, k, bias): super().__init__() self.drop = nn.Dropout(drop) self.sconv0 = StdConv(512,256, stride=1, drop=drop) self.sconv1 = StdConv(256,256, drop=drop) self.sconv2 = StdConv(256,256, drop=drop) self.sconv3 = StdConv(256,256, drop=drop) self.out0 = OutConv(k, 256, bias) self.out1 = OutConv(k, 256, bias) self.out2 = OutConv(k, 256, bias) self.out3 = OutConv(k, 256, bias) def forward(self, x): x = self.drop(F.relu(x)) x = self.sconv0(x) x = self.sconv1(x) o1c,o1l = self.out1(x) x = self.sconv2(x) o2c,o2l = self.out2(x) x = self.sconv3(x) o3c,o3l = self.out3(x) return [torch.cat([o1c,o2c,o3c], dim=1), torch.cat([o1l,o2l,o3l], dim=1)]
Итак, теперь мы хотим подключить эту настраиваемую головку к архитектуре resnet34, и у нас есть удобная функция, которая делает это.
class ConvnetBuilder(): def __init__(self, f, c, is_multi, is_reg, ps=None, xtra_fc=None, xtra_cut=0, custom_head=None,pretrained=True): self.f,self.c,self.is_multi,self.is_reg,self.xtra_cut = f,c,is_multi,is_reg,xtra_cut xtra_fc = [512] ps = [0.25]*len(xtra_fc) + [0.5] self.ps,self.xtra_fc = ps,xtra_fc cut,self.lr_cut = [8,6] # specific to resnet_34 arch cut-=xtra_cut layers = cut_model(f(pretrained), cut) self.nf = num_features(layers)*2 self.top_model = nn.Sequential(*layers) n_fc = len(self.xtra_fc)+1 self.ps = [self.ps]*n_fc fc_layers = [custom_head] self.n_fc = len(fc_layers) self.fc_model = nn.Sequential(*fc_layers).to(device) self.model = nn.Sequential(*(layers+fc_layers)).to(device) def cut_model(m, cut): return list(m.children())[:cut] if cut else [m] def num_features(m): c=children(m) if len(c)==0: return None for l in reversed(c): if hasattr(l, 'num_features'): return l.num_features res = num_features(l) if res is not None: return res def children(m): return m if isinstance(m, (list, tuple)) else list(m.children())
Используя этот ConvnetBuilder
класс, мы можем объединить настраиваемую головку и архитектуру resnet34.
k = len(anchor_scales) head_reg4 = SSD_MultiHead(k, -4.) f_model = models.resnet34 modelss = ConvnetBuilder(f_model, 0, 0, 0, custom_head=head_reg4)
k
is 9
Теперь мы можем получить доступ к модели через атрибут model
на modelss
.
Функция потерь должна принимать как классификации (классы), так и непрерывные значения (ограничивающие прямоугольники) и выводить единственное значение потерь.
def ssd_loss(pred,targ,print_it=False): lcs,lls = 0.,0. for b_c,b_bb,bbox,clas in zip(*pred,*targ): loc_loss,clas_loss = ssdtorchvision.models
loss(b_c,b_bb,bbox,clas,print_it) lls += loc_loss lcs += clas_loss if print_it: print(f'loc: {lls.data.item()}, clas: {lcs.data.item()}') return lls+lcs def ssdtorchvision.models
loss(b_c,b_bb,bbox,clas,print_it=False): bbox,clas = get_y(bbox,clas) a_ic = actn_to_bb(b_bb, anchors) overlaps = jaccard(bbox.data, anchor_cnr.data) gt_overlap,gt_idx = map_to_ground_truth(overlaps,print_it) gt_clas = clas[gt_idx] pos = gt_overlap > 0.4 pos_idx = torch.nonzero(pos)[:,0] gt_clas[1-pos] = len(id2cat) gt_bbox = bbox[gt_idx] loc_loss = ((a_ic[pos_idx] - gt_bbox[pos_idx]).abs()).mean() clas_loss = loss_f(b_c, gt_clas) return loc_loss, clas_loss def one_hot_embedding(labels, num_classes): return torch.eye(num_classes)[labels.data.long().cpu()] class BCE_Loss(nn.Module): def __init__(self, num_classes): super().__init__() self.num_classes = num_classes def forward(self, pred, targ): t = one_hot_embedding(targ, self.num_classes+1) t = V(t[:,:-1].contiguous()).cpu() x = pred[:,:-1] w = self.get_weight(x,t) return F.binary_cross_entropy_with_logits(x, t, w, size_average=False)/self.num_classes def get_weight(self,x,t): return None loss_f = BCE_Loss(len(id2cat)) def get_y(bbox,clas): bbox = bbox.view(-1,4)/sz bb_keep = ((bbox[:,2]-bbox[:,0])>0).nonzero()[:,0] return bbox[bb_keep],clas[bb_keep] def actn_to_bb(actn, anchors): actn_bbs = torch.tanh(actn) actn_centers = (actn_bbs[:,:2]/2 * grid_sizes) + anchors[:,:2] actn_hw = (actn_bbs[:,2:]/2+1) * anchors[:,2:] return hw2corners(actn_centers, actn_hw) def intersect(box_a, box_b): max_xy = torch.min(box_a[:, None, 2:], box_b[None, :, 2:]) min_xy = torch.max(box_a[:, None, :2], box_b[None, :, :2]) inter = torch.clamp((max_xy - min_xy), min=0) return inter[:, :, 0] * inter[:, :, 1] def box_sz(b): return ((b[:, 2]-b[:, 0]) * (b[:, 3]-b[:, 1])) def jaccard(box_a, box_b): inter = intersect(box_a, box_b) union = box_sz(box_a).unsqueeze(1) + box_sz(box_b).unsqueeze(0) - inter return inter / union
Мы можем протестировать функцию потерь на пакетном выходе из нашей модели bbox, как только мы настроим наш набор данных и загрузчик данных.
Здесь нам действительно нужен класс настраиваемого набора данных для работы с этими типами данных.
class BboxDataset(Dataset): """Bbox dataset.""" def __init__(self, csv_file, root_dir, transform=None): """ Args: csv_file (string): Path to csv file with bounding boxes. root_dir (string): Directory with all the images. transform (callable, optional): Optional transform. """ self.label = pd.read_csv(csv_file) self.root_dir = root_dir self.transform = transform self.sz = 224 def __len__(self): return len(self.label) def __getitem__(self, idx): img_name = os.path.join(self.root_dir, self.label.iloc[idx, 0]) image = io.imread(img_name) sample = image h, w = sample.shape[:2]; new_h, new_w = (224,224) bb = np.array([float(x) for x in self.label.iloc[idx, 1].split(' ')], dtype=np.float32) bb = np.reshape(bb, (int(bb.shape[0]/2),2)) bb = bb * [new_h / h, new_w / w] bb = bb.flatten() bb = T(np.concatenate((np.zeros((189*4) - len(bb)), bb), axis=None)) # 189 is 21 * 9 where 9 = k if self.transform: sample = self.transform(sample) return sample, bb
Этот класс настраиваемого набора данных имеет дело с ограничивающими прямоугольниками, но нам нужен класс набора данных, который будет иметь дело как с классами, так и с ограничивающими прямоугольниками.
bb_dataset = BboxDataset(csv_file='data/pascal/tmp/mbb.csv', root_dir='data/pascal/VOCdevkit2/VOC2007/JPEGImages/', transform=transforms.Compose([ transforms.ToPILImage(), transforms.Resize((224,224)), transforms.ToTensor(), transforms.Normalize( (0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])) bb_dataloader = DataLoader(bb_dataset, batch_size=16, shuffle=True)
Здесь мы можем объединить два класса наборов данных, чтобы с каждым изображением возвращались классы и ограничивающие рамки.
class ConcatLblDataset(Dataset): def __init__(self, ds, y2): self.ds,self.y2 = ds,y2 self.sz = ds.sz def __len__(self): return len(self.ds) def __getitem__(self, i): self.y2[i] = np.concatenate((np.zeros(189 - len(self.y2[i])), self.y2[i]), axis=None) x,y = self.ds[i] return (x, (y,self.y2[i])) trn_ds2 = ConcatLblDataset(bb_dataset, mcs)
Где mcs
- это множество массивов с классами каждого обучающего образа.
PATH_pascal = Path('data/pascal') trn_j = json.load((PATH_pascal / 'pascal_train2007.json').open()) cats = dict((o['id'], o['name']) for o in trn_j['categories']) mc = [[cats[p[1]] for p in trn_anno[o]] for o in trn_ids] id2cat = list(cats.values()) cat2id = {v:k for k,v in enumerate(id2cat)} mcs = np.array([np.array([cat2id[p] for p in o]) for o in mc])
Теперь мы можем проверить нашу индивидуальную потерю.
sz=224 x,y = next(iter(bb_dataloader2)) batch = modelss.model(x) ssd_loss(batch, y, True) tensor([0.6254]) tensor([0.6821, 0.7257, 0.4922]) tensor([0.9563]) tensor([0.6522, 0.5276, 0.6226]) tensor([0.6811, 0.3338]) tensor([0.7008]) tensor([0.5316, 0.2926]) tensor([0.9422]) tensor([0.5487, 0.7187, 0.3620, 0.1578]) tensor([0.6546, 0.3753, 0.4231, 0.4663, 0.2125, 0.0729]) tensor([0.3756, 0.5085]) tensor([0.2304, 0.1390, 0.0853]) tensor([0.2484]) tensor([0.6419]) tensor([0.5954, 0.5375, 0.5552]) tensor([0.2383]) loc: 1.844399333000183, clas: 79.79206085205078
Выход [1024]:
tensor(81.6365, grad_fn=<AddBackward0>)
Теперь обучим модель ssd.
beta1 = 0.5 optimizer = optim.Adam(modelss.model.parameters(), lr=1e-3, betas=(beta1, 0.99)) # Decay LR by a factor of *gamma* every *step_size* epochs exp_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)
Мы можем использовать по существу ту же функцию train_model()
, что и раньше, но на этот раз мы передаем список ограничивающих прямоугольников и классов функции потерь ssd_loss()
.
Теперь у нас есть обе наши модели, обученные на наших новых наборах данных для обучения, и мы готовы использовать их для вывода в нашей игре-симуляторе грузовика.
Я рекомендую вам ознакомиться с этим репозиторием Github, чтобы увидеть полную реализацию, в которой вы можете обучать модели, записывать данные обучения и тестировать реализацию в видеоигре.
Развлекайся!