深度学习实现自动生成图片字幕
介绍
本次项目使用深度学习自动生成图像字幕。如上图,模型自动生成“The person is riding a surfboard in the ocean”字幕。我们具体该如何实现呢?
如图所示,我们需要分别使用CNN和RNN模型来实现。
CNN模型:
利用卷积网络对图像特征提取的强大能力,来提取特征信息。我们的CNN模型需要有强大的识别能力,因此该模型需要使用过大量,多类别的训练集进行训练,并且识别准确率较高。本次,我们利用迁移学习使用Inception模型实现此功能。
通过迁移学习实现OCT图像识别 文章中有迁移学习的相关介绍。
RNN模型:
对于文本序列数据,目前我们最好的选择依然是RNN模型。为了提升模型预测能力,我们使用注意力机制实现文本预测。
注意力机制实现机器翻译 文章中有注意力机制的相关介绍。
对模型的细节要求我们将在对应代码实现里进行介绍。
数据集介绍
我们使用MS-COCO数据集进行训练,为方便理解,简单介绍下数据格式。COCO数据有5种类型,分别是: object detection, keypoint detection, stuff segmentation, panoptic segmentation,image captioning。基础数据结构如下图所示:
具体样例(部分):
本次项目使用的是Image Captioning其中,每张照片不少于5个字幕:
数据下载处理
import tensorflow as tf # 开启eager模式 tf.enable_eager_execution() import matplotlib.pyplot as plt from sklearn.model_selection import train_test_split from sklearn.utils import shuffle import re import numpy as np import os import time import json from glob import glob from PIL import Image import pickle annotation_zip=tf.keras.utils.get_file( # cache_dir(默认值): `~/.keras` # cache_subdir: `datasets`, # ~/.keras/datasets/captions.zip fname='captions.zip', cache_subdir=os.path.abspath('.'), origin='http://images.cocodataset.org/annotations/annotations_trainval2014.zip', # 解压 extract=True ) # 返回文件夹名,实现:split(file)[0] annotation_file = os.path.dirname(annotation_zip)+'/annotations/captions_train2014.json' name_of_zip='train2014.zip' if not os.path.exists(os.path.abspath('.')+"/"+name_of_zip): image_zip=tf.keras.utils.get_file( fname=name_of_zip, cache_subdir=os.path.abspath('.'), origin='http://images.cocodataset.org/zips/train2014.zip', extract=True ) PATH=os.path.dirname(image_zip)+'train2014/' else: PATH=os.path.abspath('.')+'/train2014/'
读取字幕和图片:
# 读取注释json文件 with open(annotation_file,'r') as f: annotations=json.load(f) # 保存全部字幕 all_captions=[] # 保存全部图片 all_img_name_vecotr=[] # json格式参考COCO数据集官网 for annot in annotations['annotations']: # 添加开始和结束标记 caption='<start>'+annot['caption']+'<end>' # 获取图片名字 image_id=annot['image_id'] # 参考文章开始给出的“具体样例” full_coco_image_path=PATH+'COCO_train2014_'+'%012d.jpg'%(image_id) all_img_name_vecotr.append(full_coco_image_path) all_captions.append(caption) # random_state 随机种子,确保每次数据一致 train_captions,img_name_vector=shuffle( all_captions, all_img_name_vecotr, random_state=1 ) # 使用训练集前30000样本 num_examples=30000 train_captions=train_captions[:num_examples] img_name_vector=img_name_vector[:num_examples]
重训练InceptionV3:
简单介绍下InceptionV3模型:
- Inception模型结构中最重要的思想就是卷积核分解。通过上图可知,5x5的卷积可由2个3x3的卷积代替,3x3卷积可由一个3x1卷积和一个1x3卷积代替,代替的好处是减少了权重参数量,增加了网络非线性(层增多)。比如,一个5x5卷积的权重参数量和2个3x3卷积的权重参数量分别是(5x5):(3x3)x2。InceptionV3中就将7x7的卷积分解成7x1卷积和1x7卷积。
- 批标准化(BN)正式提出是在InceptionV2,BN通过将输入分布转变成均值为0,标准差为1的正态分布,将值域处于激活函数敏感范围从而防止梯度消失问题。正因为梯度消失问题的解决,我们可以使用更大的学习率进行训练从而加快模型收敛。由于BN有类似Dropout的正则化作用,因此在训练的时候不使用或少使用Dropout,并减轻L2正则。
- 使用非对称卷积,如:1x3卷积,3x1卷积(论文作者指出在feature map的大小12x12~20x20之间效果最好)。
- 使用Label Smoothing对损失修正。下图是新损失函数:
- 网络各层信息如下图所示:
# 使用inception V3 要求图片分辨率:299,299 # 输入值范围[-1,1] def load_image(image_path): img=tf.image.decode_jpeg(tf.read_file(image_path)) img_reshape=tf.image.resize_images(img,(299,299)) # 像素范围[-1,1] # (-255)/255 img_range=tf.keras.applications.inception_v3.preprocess_input(img_reshape) return img_range,image_path
使用迁移学习构建新模型:
# 最后一层卷积输入shape(8*8*2048),并将结果向量保存为dict image_model=tf.keras.applications.InceptionV3( # 不使用最后全连接层 include_top=False, # inception模型的训练集是imagenet weigths='imagenet' ) # shape:(batch_size,299,299,3) new_input=image_model.input # hidden_layer shape:(batch_size,8,8,2048) hidden_layer=image_model.layers[-1].output # 创建新模型 image_features_extract_model=tf.keras.Model( new_input, hidden_layer )
保存通过使用InceptionV3获得的特征:
encode_train=sorted(set(img_name_vector)) # map:可以并行处理数据,默认读取的文件具有确定性顺序 # 取消顺序可以加快数据读取 # 通过设置参数num_parallel_calls实现 image_dataset=tf.data.Dataset.from_tensor_slices(encode_train).map(load_image).batch(16) for img,path in image_dataset: # inception v3得到的feature batch_features=image_features_extract_model(img) batch_features=tf.reshape( # shape:(batch_size,8,8,2048) reshape:(batch_size,64,2048) batch_features,shape=(batch_features.shape[0],-1,batch_features[3]) ) # 保存 for bf,p in zip(batch_features,path): path_of_feature=p.numpy().decode('utf-8') # 文件后缀.npy np.save(path_of_feature,bf.numpy())
文本处理
文本处理方式还是老规矩,先将文本转成字典表示然后创建字符转ID,ID转字符,最后补长到预设长度。
# 计算最大长度 def calc_max_length(tensor): return max(len(t)for t in tensor)
top_k=5000 tokenizer=tf.keras.preprocessing.text.Tokenizer( num_words=top_k, # 字典中没有的字符用<unk>代替 oov_token='<unk>', # 需要过滤掉的特殊字符 filters='!"#$%&()*+.,-/:;=?@[\]^_`{|}~' ) # 要用以训练的文本列表 tokenizer.fit_on_texts(train_captions) # 转为序列列表向量 train_seqs=tokenizer.texts_to_sequences((train_captions)) tokenizer.word_index['<pad>']=0 # 如果没有指定最大长度,pad_sequences会自动计算最大长度 cap_vector=tf.keras.preprocessing.sequence.pad_sequences( sequences=train_seqs, # 后置补长 padding='post' ) max_length=calc_max_length(train_seqs)
模型训练参数
拆分训练集,验证集:
img_name_train,img_name_val,cap_trian,cap_val=train_test_split( img_name_vector, cap_vector, # 验证数据集占20% test_size=0.2, # 确保每次数据一致 random_state=0
# 最好是2的次幂,更适合GPU运算(加快二进制运算) BATCH_SIZE=64 # shuffle 缓冲区大小 BUFFER_SIZE=1000 # 词嵌入维度 embedding_dim=256 units=512 vocab_size=len(tokenizer.word_index) # 后面会将(8,8,2048)转为(64,2048) # 维度一定要一致 feature_shape=2048 attention_features_shape=64 # 加载保存的之前feature文件 def map_func(img_name,cap): img_tensor=np.load(img_name.decode('utf-8')+'.npy') return img_tensor,cap dataset=tf.data.Dataset.from_tensor_slices((img_name_train,cap_trian)) # num_parallel_calls 根据自己的CPU而定 dataset=dataset.map(lambda item1,item2:tf.py_func( map_func,[item1,item2],[tf.float32,tf.int32] ),num_parallel_calls=4) # prefetch 可以合理利用CPU准备数据,GPU计算数据之间的空闲时间,加快数据读取 dataset=dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE).prefetch(1)
创建模型
编码器模型:
# 一层使用relu的全连接层 class CNN_Encoder(tf.keras.Model): def __init__(self,embedding_dim): super(CNN_Encoder, self).__init__() # fc shape:(batch_size,64,embedding_dim) self.fc=tf.keras.layers.Dense(embedding_dim) def __call__(self,x): x=self.fc(x) x=tf.nn.relu(x) return x
注意力层:
详细介绍可以查看文章开始给出的链接,这里给出计算方程式:
class BahdanauAttention(tf.keras.Model): def __init__(self,units): super(BahdanauAttention, self).__init__() self.W1=tf.keras.layers.Dense(units) self.W2=tf.keras.layers.Dense(units) self.V=tf.keras.layers.Dense(1) def __call__(self, features,hidden): # 参考注意力机制计算的方程 # feature shape:(batch_size,64,embedding_dim) # hidden_state shape:(batch_size,hidden_size) hidden_with_time_axis=tf.expand_dims(hidden,1) # score shape:(batch_size,64,hidden_size) score=tf.nn.tanh(self.W1(features)+self.W2(hidden_with_time_axis)) # attention_weights shape:(batch_size,64,1) attention_weights=tf.nn.softmax(self.V(score),axis=1) context_vector=tf.reduce_sum(attention_weights*features,axis=1) return context_vector,attention_weights
解码器中的GRU:
# 相比LSTM因为减少了一个门,参数少,收敛快 def gru(units): if tf.test.is_gpu_available(): # 使用GPU加速计算 return tf.keras.layers.CuDNNGRU( units=units, return_state=True, return_sequences=True, # 循环核的初始化方法 # glorot_uniform是sqrt(2 / (fan_in + fan_out))的正态分布产生 # 其中fan_in和fan_out是权重张量的扇入扇出(即输入和输出单元数目) recurrent_initializer='glorot_uniform' ) else: return tf.keras.layers.GRU( return_sequences=True, return_state=True, # 默认:hard_sigmoid <= -1 输出0,>=1 输出1 ,中间为线性 recurrent_activation='sigmoid', recurrent_initializer='glorot_uniform' )
解码器模型:
# 使用注意力模型 class RNN_Decoder(tf.keras.Model): def __init__(self,embedding_dim,units,vocab_size): super(RNN_Decoder, self).__init__() self.units=units # 词嵌入将高维离散数据转为低维连续数据,并表现出数据之间的相似性(向量空间) self.embedding=tf.keras.layers.Embedding(input_shape=vocab_size,output_dim=embedding_dim) self.gru=gru(units) self.fc1=tf.keras.layers.Dense(self.units) self.fc2=tf.keras.layers.Dense(vocab_size) self.attention=BahdanauAttention(self.units) def __call__(self,x,features,hidden): # 获取注意力模型输出 context_vector,attention_weights=self.attention(features,hidden) # x shape:(batch_size,1,embedding_dim) x=self.embedding(x) # 注意力,当前输入合并 # 注意力shape:(batch_size,1,hidden) x shape:(batch_size,1,embedding_size) # x shape:(batch_size, 1, embedding_dim + hidden_size) x=tf.concat([tf.expand_dims(context_vector,1),x],axis=-1) output,state=self.gru(x) # x shape:(batch_size,max_length,hidden_size) x=self.fc1(output) # x shape:(batch_size*max_length,hidden_size) x=tf.reshape(x,shape=(-1,x.shape[2])) # x shape:(batch_size*max_length,vocab_size) x=self.fc2(x) return x,state,attention_weights def reset_state(self, batch_size): return tf.zeros((batch_size, self.units))
模型训练
实例化模型:
encoder = CNN_Encoder(embedding_dim) decoder = RNN_Decoder(embedding_dim, units, vocab_size)
损失函数,优化器设置:
# InceptionV3模型使用的不是Adam优化器 # 各种优化器以后放到一篇单独的文章详细介绍 optimizer=tf.train.AdamOptimizer(learning_rate=0.0001) def loss_function(real,pred): mask=1-np.equal(real,0) # 带mask的交叉熵损失 loss_=tf.nn.sparse_softmax_cross_entropy_with_logits( labels=real, logits=pred )*mask return tf.reduce_mean(loss_)
训练:
- 将使用InceptionV3模型提取的特征作为编码器输入
- 编码器输出,hidden_state,字幕文本作为解码器输入
- 解码器hidden_state作为下一次输入,预测值用于计算模型损失
- 使用标签文本作为解码器输入(teacher-forcing模式)
- 梯度计算及应用
loss_plot=[] EPOCHS=20 for epoch in range(EPOCHS): start=time.time() total_loss=0 for (batch,(img_tensor,target)) in enumerate(dataset): loss=0 # 每迭代一次batch后重置 hidden_state hidden=decoder.reset_states(batch_size=target.shape[0]) # input维度是3维 dec_input=tf.expand_dims([tokenizer.word_index['<start>']*BATCH_SIZE],1) # eager模式下记录梯度 with tf.GradientTape() as tape: # inception模式提取的特征 features=encoder(img_tensor) # 每张照片不止一个captions for i in range(1,target.shape[1]): # attention_weights此处暂不需要 predictions,hidden,_=decoder(dec_input,features,hidden) loss+=loss_function(target[:,i],predictions) # teacher forcing 使用标签数据作为输入替代hidden-output dec_input=tf.expand_dims(target[:,i],1) total_loss+=(loss/int(target.shape[1])) # 总训练参数 variables=encoder.variables+decoder.variables # 梯度计算及应用 gradients=tape.gradient(loss,variables) optimizer.apply_gradients(zip(gradients,variables)) if batch%100 == 0: print('epoch{},batch{},loss{:.4}'.format( epoch+1, batch, loss.numpy()/int(target.shape[1]) )) loss_plot.append(total_loss/len(cap_vector)) plt.plot(loss_plot) plt.xlabel('epochs') plt.ylabel('loss') plt.show()
模型预测
模型预测不使用Teacher forcing模式,当遇到预设的结束标记“<end>”时模型结束训练。
def evaluate(image): attention_plot = np.zeros((max_length, attention_features_shape)) # 初始化hidden-state hidden = decoder.reset_state(batch_size=1) # shape:(1,299,299,3) temp_input = tf.expand_dims(load_image(image)[0], 0) # 特征提取 img_tensor_val = image_features_extract_model(temp_input) # shape:(1,8,8,2048) reshape:(1,64,2048) img_tensor_val = tf.reshape(img_tensor_val, (img_tensor_val.shape[0], -1, img_tensor_val.shape[3])) # shape:(1,64,256) features = encoder(img_tensor_val) # 增加batchsize维度 dec_input = tf.expand_dims([tokenizer.word_index['<start>']], 0) result = [] for i in range(max_length): predictions, hidden, attention_weights = decoder(dec_input, features, hidden) attention_plot[i] = tf.reshape(attention_weights, (-1, )).numpy() # 我们使用softmax归一化结果,使用argmax查询最大值 # 对于分类数量大于2,softmax和sigmoid的区别是 # 类别之间有相互关系的使用sigmoid,反之使用softmax predicted_id = tf.argmax(predictions[0]).numpy() # ID转字符,获取文本结果 result.append(tokenizer.index_word[predicted_id]) # 判断是否是预设的结束标记 if tokenizer.index_word[predicted_id] == '<end>': return result, attention_plot # 将预测值作为输入,预测下一个结果(teacher-forcing在这里使用数据标签作为输入) dec_input = tf.expand_dims([predicted_id], 0) attention_plot = attention_plot[:len(result), :] return result, attention_plot
以下用于可视化注意力机制训练过程:
此处代码主要是图像展示就不做过多介绍了。
def plot_attention(image, result, attention_plot): temp_image = np.array(Image.open(image)) fig = plt.figure(figsize=(10, 10)) len_result = len(result) for l in range(len_result): temp_att = np.resize(attention_plot[l], (8, 8)) ax = fig.add_subplot(len_result//2, len_result//2, l+1) ax.set_title(result[l]) img = ax.imshow(temp_image) ax.imshow(temp_att, cmap='gray', alpha=0.6, extent=img.get_extent()) plt.tight_layout() plt.show() rid = np.random.randint(0, len(img_name_val)) image = img_name_val[rid] real_caption = ' '.join([tokenizer.index_word[i] for i in cap_val[rid] if i not in [0]]) result, attention_plot = evaluate(image) print ('Real Caption:', real_caption) print ('Prediction Caption:', ' '.join(result)) plot_attention(image, result, attention_plot) Image.open(img_name_val[rid])
总结
想要对图像生成字幕,首先需要提取图像特征,本文我们利用迁移学习使用Inception模型来提取特征,对于Inception模型,我们重点理解卷积核分解。至于文本预测部分与使用注意力机制实现机器翻译大体一致。有一点想说的是,类似这样的项目维度转换会比较多,也是很容易出错的地方,这一点需要格外留意。
本文代码内容来自 Yash Katariya在此表示感谢。