在原始RNN中求取梯度时,要多次乘以同一矩阵,这就容易导致梯度消失或梯度爆炸,而长短期记忆网络(LSTM)能解决这个问题。
这个过拟合的LSTM模型应该会得到小于0.5的损失。
np.random.seed(231) # 每个epoch有50个数据 small_data = load_coco_data(max_train=50) small_lstm_model = CaptioningRNN( cell_type='lstm', word_to_idx=data['word_to_idx'], input_dim=data['train_features'].shape[1], hidden_dim=512, wordvec_dim=256, dtype=np.float32, ) # 每个epoch有50个数据,batch_size是25,所以每个epoch迭代2次 # 乘以50个epoch,总共迭代100次 small_lstm_solver = CaptioningSolver(small_lstm_model, small_data, update_rule='adam', num_epochs=50, batch_size=25, optim_config={ 'learning_rate': 5e-3, }, lr_decay=0.995, verbose=True, print_every=10, ) small_lstm_solver.train() # 画出训练损失 plt.plot(small_lstm_solver.loss_history) plt.xlabel('Iteration') plt.ylabel('Loss') plt.title('Training loss history') plt.show()使用训练好的模型生成图片描述,并和真实描述对比。
for split in ['train', 'val']: # 在训练集和验证集里分别选两张图片 minibatch = sample_coco_minibatch(small_data, split=split, batch_size=2) gt_captions, features, urls = minibatch # 把真实描述从数字ID转换成单词 gt_captions = decode_captions(gt_captions, data['idx_to_word']) # 使用训练好的模型生成图片描述 sample_captions = small_lstm_model.sample(features) sample_captions = decode_captions(sample_captions, data['idx_to_word']) # 展示图片和描述(生成的和真实的) for gt_caption, sample_caption, url in zip(gt_captions, sample_captions, urls): plt.figure(figsize=(5, 0.5)) # 本来应该直接展示图片的"plt.imshow(image_from_url(url))",但是展示不出来,只能打印链接 print(url) plt.title('%s\n%s\nGT:%s' % (split, sample_caption, gt_caption)) plt.axis('off') plt.show()验证集图片生成的描述完全和图片不相关…… 使用BLEU评价指标来评价模型(可以看看这篇文章),该评价指标在0到1之间,越接近1代表结果越好。
import nltk def BLEU_score(gt_caption, sample_caption): """ gt_caption: 真实的描述 sample_caption: 生成的描述 返回unigram BLEU得分 """ # 把描述分成一个个单词,开始、结束、未知token不用于计算得分 reference = [x for x in gt_caption.split(' ') if ('<END>' not in x and '<START>' not in x and '<UNK>' not in x)] hypothesis = [x for x in sample_caption.split(' ') if ('<END>' not in x and '<START>' not in x and '<UNK>' not in x)] # 只求unigram BLEU得分 BLEUscore = nltk.translate.bleu_score.sentence_bleu([reference], hypothesis, weights = [1]) return BLEUscore def evaluate_model(model): """ model: 训练好的模型 分别采样1000张训练、验证图片,输出平均unigram BLEU得分 """ BLEUscores={} for split in ['train', 'val']: # 采样用于评估的图片 minibatch = sample_coco_minibatch(med_data, split=split, batch_size=1000) gt_captions, features, urls = minibatch # 真实的描述 gt_captions = decode_captions(gt_captions, data['idx_to_word']) # 生成的描述 sample_captions = model.sample(features) sample_captions = decode_captions(sample_captions, data['idx_to_word']) total_score = 0.0 for gt_caption, sample_caption, url in zip(gt_captions, sample_captions, urls): total_score += BLEU_score(gt_caption, sample_caption) BLEUscores[split] = total_score / len(sample_captions) for split in BLEUscores: print('Average BLEU score for %s: %f' % (split, BLEUscores[split])) med_data = small_data evaluate_model(small_lstm_model) # Average BLEU score for train: 1.000000 # Average BLEU score for val: 0.177189训练自己的模型,如果得到的BLEU unigram>0.3就能获得额外的分数。 这要求看起来不是很高?我随手改了一下几个数字就得到0.257的分数——只用了40分之一的数据,使用全部数据的话应该会更好。
max_train=10000 num_epochs=5 batch_size=100 # Average BLEU score for train: 0.261801 # Average BLEU score for val: 0.257333虽然把男人认成女人,把前面的柜子(大概)认成花瓶,但是至少有点符合图片了,不是完全胡言乱语了。
sigmoid函数,防止x为负数时,exp(x)过大,溢出。
def sigmoid(x): pos_mask = (x >= 0) neg_mask = (x < 0) z = np.zeros_like(x) z[pos_mask] = np.exp(-x[pos_mask]) z[neg_mask] = np.exp(x[neg_mask]) top = np.ones_like(x) top[neg_mask] = z[neg_mask] return top / (1 + z)单步LSTM的前向和反向传播,第一个式子是矩阵乘法,第二三个式子是对应元素相乘
def lstm_step_forward(x, prev_h, prev_c, Wx, Wh, b): """ Inputs: - x: 输入数据(N, D) - prev_h: 前一隐藏状态,(N, H) - prev_c: 前一单元状态,(N, H) - Wx: 输入的转移矩阵,(D, 4H) - Wh: 隐藏状态的转移矩阵,(H, 4H) - b: 偏置,(4H,) Returns a tuple of: - next_h: 下一隐藏状态,(N, H) - next_c: 下一单元状态,(N, H) - cache: 反向传播要用的数据 """ next_h, next_c, cache = None, None, None H = prev_h.shape[1] z = prev_h@Wh + x@Wx + b # (N,H) i = sigmoid(z[:,0:H]) f = sigmoid(z[:,H:2*H]) o = sigmoid(z[:,2*H:3*H]) g = np.tanh(z[:,3*H:4*H]) next_c = f*prev_c + i*g next_h = o*np.tanh(next_c) cache = (i,f,o,g, x, prev_h, prev_c, next_c, Wx, Wh) return next_h, next_c, cache def lstm_step_backward(dnext_h, dnext_c, cache): dx, dprev_h, dprev_c, dWx, dWh, db = None, None, None, None, None, None i,f,o,g, x, prev_h, prev_c, next_c, Wx, Wh = cache # next_h = o*np.tanh(next_c) dz_o = dnext_h*np.tanh(next_c)*o*(1-o)# dsigmoid(x) = sigmoid(x)(1-sigmoid(x)) dnext_c += dnext_h*o*(1-np.tanh(next_c)**2)# dtanh(x) = 1-np.tanh(x)**2 # next_c = f*prev_c + i*g dz_f = dnext_c*prev_c*f*(1-f) dprev_c = dnext_c*f dz_i = dnext_c*g*i*(1-i) dz_g = dnext_c*i*(1-g**2) # (N,4H) dz = np.hstack((dz_i,dz_f,dz_o,dz_g)) # z = prev_h@Wh + x@Wx + b dprev_h = dz@Wh.T dWh = prev_h.T@dz dx = dz@Wx.T dWx = x.T@dz db = np.sum(dz,axis=0) return dx, dprev_h, dprev_c, dWx, dWh, db多步LSTM的前向传播和反向传播
def lstm_forward(x, h0, Wx, Wh, b): h, cache = None, None T = x.shape[1] N,H = h0.shape h = np.zeros((N,T,H)) prev_h = h0 # 初始单元状态为0 prev_c = np.zeros_like(h0) cache={} for t in range(T): # 循环调用单步LSTM next_h, next_c,cache[t] = lstm_step_forward(x[:,t,:], prev_h, prev_c, Wx, Wh, b) h[:,t,:] = next_h prev_h = next_h prev_c = next_c return h, cache def lstm_backward(dh, cache): dx, dh0, dWx, dWh, db = None, None, None, None, None _,_,_,_, x,_,_,_,_,_ = cache[0] N,T,H = dh.shape D = x.shape[1] dx = np.zeros((N,T,D)) dWx = np.zeros((D,4*H)) dWh = np.zeros((H,4*H)) db = np.zeros((4*H)) # 在dh前插入dh0 dh = np.insert(dh,0,0,axis=1) dnext_c = np.zeros((N,H)) for t in range(T-1,-1,-1): dx[:,t,:], dht, dprev_c, dWxt, dWht, dbt = lstm_step_backward(dh[:,t+1,:], dnext_c, cache[t]) dh[:,t,:] += dht dnext_c = dprev_c dWx += dWxt dWh += dWht db += dbt dh0 = dh[:,0,:] return dx, dh0, dWx, dWh, db初始化CaptioningSolver
from __future__ import print_function, division from builtins import range from builtins import object import numpy as np from cs231n import optim from cs231n.coco_utils import sample_coco_minibatch class CaptioningSolver(object): """ CaptioningSolver类使用optim.py里的梯度下降方法。 它同时接受训练数据、验证数据和它们的标签,用于监控模型是否过拟合(不过实际上好像没写……) 在train()后,model.params里是在验证集上表现最好的参数(没有实现) 另外,solver.loss_history会包含训练过程的损失。solver.train_acc_history和 solver.val_acc_history包含每个epoch的准确率(这两个并没有实现) 该类的使用样例(MyAwesomeModel就算了,我自己写的什么东西自己清楚): data = load_coco_data() model = MyAwesomeModel(hidden_dim=100) solver = CaptioningSolver(model, data, update_rule='sgd', optim_config={ 'learning_rate': 1e-3, }, lr_decay=0.95, num_epochs=10, batch_size=100, print_every=100) solver.train() 输入CaptioningSolver的模型必须包含下列API: - model.params必须是字典类型,key是参数的名字 - model.loss(features, captions)必须计算训练时的损失和梯度,它的输入输出为: 输入: - features: 小批量图像特征,(N, D) - captions: 图像整数IDs,(N, T),每个ID的范围是(0, V] 返回: - loss: 损失 - grads: 梯度,key和self.params里的一样 """ def __init__(self, model, data, **kwargs): """ 必须参数: - model: 一个包含上述API的模型实例 - data: 从load_coco_data()返回的数据 可选参数: - update_rule: optim.py里的更新规则,默认为'sgd' - optim_config: 一个字典,更新规则需要的参数,每种更新规则要求的参数不同, 详细请查看optim.py。但所有更新规则都需要'learning_rate' - lr_decay: 学习率衰减,每个epoch后学习率要乘以该参数 - batch_size: 训练时的批量大小 - num_epochs: 总共的epoch数 - print_every: 整数,训练时每隔print_every次迭代,打印损失 - verbose: 布尔型,true表示在训练时打印损失 """ self.model = model self.data = data # 取出可选参数,如果没有,则用默认参数 self.update_rule = kwargs.pop('update_rule', 'sgd') self.optim_config = kwargs.pop('optim_config', {}) self.lr_decay = kwargs.pop('lr_decay', 1.0) self.batch_size = kwargs.pop('batch_size', 100) self.num_epochs = kwargs.pop('num_epochs', 10) self.print_every = kwargs.pop('print_every', 10) self.verbose = kwargs.pop('verbose', True) # 如果还有参数,抛出异常 if len(kwargs) > 0: extra = ', '.join('"%s"' % k for k in list(kwargs.keys())) raise ValueError('Unrecognized arguments %s' % extra) # 确保更新规则存在 if not hasattr(optim, self.update_rule): raise ValueError('Invalid update_rule "%s"' % self.update_rule) # 然后把字符换成真正的函数 self.update_rule = getattr(optim, self.update_rule) self._reset()其他函数
def _reset(self): """ 初始化一些优化所需的参数。不要手动调用该参数。""" self.epoch = 0 self.best_val_acc = 0 self.best_params = {} self.loss_history = [] self.train_acc_history = [] self.val_acc_history = [] # 为每个模型参数深复制一份optim_config self.optim_configs = {} for p in self.model.params: d = {k: v for k, v in self.optim_config.items()} self.optim_configs[p] = d def _step(self): """单步梯度下降。该函数会被train()调用,不应该手动调用。""" # 在训练数据里采样一个批次的数据 minibatch = sample_coco_minibatch(self.data, batch_size=self.batch_size, split='train') captions, features, urls = minibatch # 计算损失和梯度 loss, grads = self.model.loss(features, captions) # 保存损失 self.loss_history.append(loss) # 实现参数更新 for p, w in self.model.params.items(): dw = grads[p] config = self.optim_configs[p] next_w, next_config = self.update_rule(w, dw, config) self.model.params[p] = next_w self.optim_configs[p] = next_config # TODO: 还没有实现,可能会实现BLEU def check_accuracy(self, X, y, num_samples=None, batch_size=100): return 0.0 def train(self): num_train = self.data['train_captions'].shape[0] iterations_per_epoch = max(num_train // self.batch_size, 1) num_iterations = self.num_epochs * iterations_per_epoch for t in range(num_iterations): self._step() # 输出训练时的损失 if self.verbose and t % self.print_every == 0: print('(Iteration %d / %d) loss: %f' % ( t + 1, num_iterations, self.loss_history[-1])) # 在每个epoch结束后,增加epoch计数,使学习率衰减 epoch_end = (t + 1) % iterations_per_epoch == 0 if epoch_end: self.epoch += 1 for k in self.optim_configs: self.optim_configs[k]['learning_rate'] *= self.lr_decay # TODO:周期性地(在第一次迭代、最后一次迭代和每个epoch结束)检测训练和验证准确率 # 训练完后,保存在验证集上表现最好的参数 # self.model.params = self.best_params