VGG模型作为卷积神经网络的经典处理流程,为了更好的理解,因此自写一遍VGG模型,完成训练和识别全过程。
函数: 对应VGG模型,其主要有卷积核、偏执核、滑动窗口、池化窗口,全连接参数。 卷积核:w=[filter_height, filter_width, in_channels, out_channels] 偏执核: b=[out_channels] 滑动窗口: strides=[ batch滑动步长, height方向滑动步长, width方向滑动步长, channels方向滑动步长] 池化窗口: ksize=[ batch大小, height大小, width大小, channels大小] 全连接权重:w=[in_length, out_ length] 全连接偏执:b=[ out_ length] 输出窗口计算公式:
输入图片大小 W×W Filter大小 F×F 步长 S padding的像素数 P N = (W − F + 2P )/S+1模型层: 模型: 路径格式: VGG |——/17flowers # 训练数据集图像 |——/image # 测试图像 |——/log # 可视化数据保存路径 |——/model # 训练参数保存路径 |——create_tfrecords.py # 生成数据集 |——My_test.py # 测试 |——train.py # 训练 |——train.tfrecords # 测试集 |——VGG.py # 模型
核心代码VGG.py:
#coding -uft-8 import numpy as np import tensorflow as tf import os flag = False # 是否加载已训练数据标志位 model_dir = "./model/" base_num = 0 # 加载已训练数据次数 if os.listdir(model_dir): flag = True base_num = int(tf.train.get_checkpoint_state(model_dir).model_checkpoint_path[19:]) # 获取已训练次数 reader = tf.train.NewCheckpointReader(tf.train.get_checkpoint_state(model_dir).model_checkpoint_path) # 获取已训练参数 def conv2d(x, w, padding='SAME', s=1): '''卷积层''' '''输入:x=[batch, in_height, in_width, in_channels]''' '''卷积核:w=[filter_height, filter_width, in_channels, out_channels]''' '''滑动步长大小:strides=[batch滑动步长, height方向滑动步长, width方向滑动步长, channels方向滑动步长]''' x = tf.nn.conv2d(x, w, strides=[1, s, s, 1], padding=padding) return x def maxPoolLayer(x, k=2, s=2): '''池化层''' '''池化窗口大小:ksize=[1,height,width,1]''' '''滑动步长大小:strides=[batch滑动步长, height方向滑动步长, width方向滑动步长, channels方向滑动步长]''' return tf.nn.max_pool(x, ksize=[1, k, k, 1], strides=[1, s, s, 1], padding='SAME') def con2d_layer(x, in_chs, out_chs, ksize, layer_name, fineturn=False): '''卷积层''' '''fineturn:False:初始化;True:加载已训练数据''' with tf.variable_scope(layer_name): if fineturn and flag: # 判断是否加载已训练值 w = tf.get_variable('weight', initializer=reader.get_tensor(layer_name + '/weight')) # 加载已训练权重值 tf.summary.histogram(layer_name + '/weight', w) # 权重值加入到tensorboard b = tf.get_variable('bias', initializer=reader.get_tensor(layer_name + '/bias')) # 加载已训练偏执值 tf.summary.histogram(layer_name + '/bias', b) # 偏执值加入到tensorboard else: '''卷积核:shape=[filter_height, filter_width, in_channels, out_channels]''' w = tf.get_variable('weight', shape=[ksize, ksize, in_chs, out_chs], initializer=tf.truncated_normal_initializer(stddev=0.1)) # 初始化权重值 # w = get_weight_variable([ksize, ksize, in_chs, out_chs]) tf.summary.histogram(layer_name + '/weight', w) # 权重值加入到tensorboard '''shape=[out_channels]''' b = tf.get_variable('bias', shape=[out_chs], initializer=tf.constant_initializer(0.0)) # 初始化偏执值 # b = get_bias_variable([out_chs]) tf.summary.histogram(layer_name + '/bias', b) # 偏执值加入到tensorboard y = tf.nn.relu(tf.nn.bias_add(conv2d(x, w, padding='SAME', s=1), b)) # 卷积运算 return y def fc_layer(x, in_kernels, out_kernels, layer_name, fineturn=False): '''全连接层''' '''fineturn:False:初始化;True:加载已训练数据''' with tf.variable_scope(layer_name): if fineturn and flag: # 判断是否加载已训练值 w = tf.get_variable('weight', initializer=reader.get_tensor(layer_name + '/weight')) tf.summary.histogram(layer_name + '/weight', w) b = tf.get_variable('bias', initializer=reader.get_tensor(layer_name + '/bias')) tf.summary.histogram(layer_name + '/bias', b) else: w = tf.get_variable('weight', shape=[in_kernels, out_kernels], initializer=tf.truncated_normal_initializer(stddev=0.1)) tf.summary.histogram(layer_name + '/weight', w) # 权重值加入到tensorboard b = tf.get_variable('bias', shape=[out_kernels], initializer=tf.truncated_normal_initializer(stddev=0.1)) tf.summary.histogram(layer_name + '/bias', b) # 偏执值加入到tensorboard y = tf.nn.relu(tf.nn.bias_add(tf.matmul(x, w), b)) return y def VGG(x, _dropout, n_cls): ''' ******输出窗口大小计算公式******* 输入图片大小 W×W Filter大小 F×F 步长 S padding的像素数 P N = (W − F + 2P )/S+1 ''' '''输入:224*224*3''' '''输出:112*112*4''' # tf.reset_default_graph() conv1_1 = con2d_layer(x, 3, 4, 3, 'conv1_1', fineturn=True) pool_1 = maxPoolLayer(conv1_1, k=2, s=2) print(pool_1) '''输入:112*112*4''' '''输出:14*14*8''' conv2_1 = con2d_layer(pool_1, 4, 8, 3, 'conv2_1', fineturn=True) pool_2 = maxPoolLayer(conv2_1, k=2, s=2) pool_3 = maxPoolLayer(pool_2, k=2, s=2) pool_4 = maxPoolLayer(pool_3, k=2, s=2) '''将pool_4变换为一维向量''' pool4_flatten_dims = int(np.prod(pool_4.get_shape().as_list()[1:])) pool4_flatten = tf.reshape(pool_4, [-1, pool4_flatten_dims]) '''(14*14*8) * 64全连接层''' fc_1 = fc_layer(pool4_flatten, pool4_flatten_dims, 64, 'fc_1', fineturn=True) dropout1 = tf.nn.dropout(fc_1, _dropout) '''64 * num全连接层''' fc_2 = fc_layer(dropout1, 64, n_cls, 'fc_2', fineturn=True) return fc_2, base_num训练代码train.py:
# coding=utf-8 import tensorflow as tf import numpy as np import pdb from datetime import datetime from VGG import * import cv2 as cv batch_size = 16 lr = 0.0001 # 学习率 n_cls = 3 # 识别种类 max_steps = 5000 # 训练次数 def read_and_decode(filename): # 根据文件名生成一个队列 filename_queue = tf.train.string_input_producer([filename]) reader = tf.TFRecordReader() _, serialized_example = reader.read(filename_queue) # 返回文件名和文件 features = tf.parse_single_example(serialized_example, features={ 'label': tf.FixedLenFeature([], tf.int64), 'img_raw': tf.FixedLenFeature([], tf.string), }) img = tf.decode_raw(features['img_raw'], tf.uint8) img = tf.reshape(img, [224, 224, 3]) # 转换为float32类型,并做归一化处理 img = tf.cast(img, tf.float32) # * (1. / 255) label = tf.cast(features['label'], tf.int64) return img, label def train(): x = tf.placeholder(dtype=tf.float32, shape=[None, 224, 224, 3], name='input') y = tf.placeholder(dtype=tf.float32, shape=[None, n_cls], name='label') keep_prob = tf.placeholder(tf.float32) output, base_num = VGG(x, keep_prob, n_cls) # probs = tf.nn.softmax(output) loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=output, labels=y)) tf.summary.scalar('loss', loss) # 损失值加入到tensorboard # train_step = tf.train.AdamOptimizer(learning_rate=0.1).minimize(loss) train_step = tf.train.GradientDescentOptimizer(learning_rate=lr).minimize(loss) accuracy = tf.reduce_mean(tf.cast(tf.equal(tf.argmax(output, 1), tf.argmax(y, 1)), tf.float32)) tf.summary.scalar('accuracy', accuracy) # 训练精度加入到tensorboard images, labels = read_and_decode('./train.tfrecords') img_batch, label_batch = tf.train.shuffle_batch([images, labels], batch_size=batch_size, capacity=392, min_after_dequeue=200) label_batch = tf.one_hot(label_batch, n_cls, 1, 0) init = tf.global_variables_initializer() saver = tf.train.Saver() with tf.Session() as sess: merged = tf.summary.merge_all() writer = tf.summary.FileWriter('./log', sess.graph) # 创建并启动summary.FileWriter对象,接收两个参数,一是数据流图磁盘存放路径;二是Session对象graph属性。 sess.run(init) coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(sess=sess, coord=coord) for i in range(base_num + 1, max_steps + base_num + 1): batch_x, batch_y = sess.run([img_batch, label_batch]) # 获取batch数据集 _, loss_val = sess.run([train_step, loss], feed_dict={x: batch_x, y: batch_y, keep_prob: 0.8}) if i % 10 == 0: train_arr = accuracy.eval(feed_dict={x: batch_x, y: batch_y, keep_prob: 1.0}) print("%s: Step [%d] Loss : %f, training accuracy : %g" % (datetime.now(), i, loss_val, train_arr)) result = sess.run(merged, feed_dict={x: batch_x, y: batch_y, keep_prob: 1.0}) # i 就是记录的步数到tensorboard writer.add_summary(result, i) # 只指定了训练结束后保存模型,可以修改为每迭代多少次后保存模型 if (i + 1) % 100 == 0: saver.save(sess, './model/model.ckpt', global_step=i) # writer.close() # 关闭summary.FileWriter对象 coord.request_stop() coord.join(threads) writer.close() if __name__ == '__main__': train()数据集生成代码create_tfrecords.py:
# coding=utf-8 import os import tensorflow as tf from PIL import Image import sys def creat_tf(imgpath): cwd = os.getcwd() classes = os.listdir(cwd + imgpath) # 此处定义tfrecords文件存放 writer = tf.python_io.TFRecordWriter("train.tfrecords") for index, name in enumerate(classes): class_path = cwd + imgpath + name + "/" print(class_path) if os.path.isdir(class_path): for img_name in os.listdir(class_path): img_path = class_path + img_name img = Image.open(img_path) img = img.resize((224, 224)) img_raw = img.tobytes() example = tf.train.Example(features=tf.train.Features(feature={ 'label': tf.train.Feature(int64_list=tf.train.Int64List(value=[int(name)])), 'img_raw': tf.train.Feature(bytes_list=tf.train.BytesList(value=[img_raw])) })) writer.write(example.SerializeToString()) print(img_name) writer.close() def read_example(): # 简单的读取例子: for serialized_example in tf.python_io.tf_record_iterator("train.tfrecords"): example = tf.train.Example() example.ParseFromString(serialized_example) # image = example.features.feature['img_raw'].bytes_list.value label = example.features.feature['label'].int64_list.value # 可以做一些预处理之类的 print(label) if __name__ == '__main__': imgpath = './17flowers/' creat_tf(imgpath) # read_example()训练测试结果: 测试代码My_test.py:
# coding=utf-8 import tensorflow as tf import numpy as np import pdb from datetime import datetime from VGG import * import cv2 import os model_dir = "./model/" def test(path): x = tf.placeholder(dtype=tf.float32, shape=[None, 224, 224, 3], name='input') keep_prob = tf.placeholder(tf.float32) output, _ = VGG(x, keep_prob, 3) score = tf.nn.softmax(output) f_cls = tf.argmax(score, 1) sess = tf.InteractiveSession() sess.run(tf.global_variables_initializer()) saver = tf.train.Saver() # 训练好的模型位置 # saver.restore(sess, './model/model.ckpt-1499') # 自定义测试模型参数 saver.restore(sess, tf.train.get_checkpoint_state(model_dir).model_checkpoint_path) for i in os.listdir(path): imgpath = os.path.join(path, i) print(imgpath) im = cv2.imread(imgpath) im = cv2.resize(im, (224, 224)) # * (1. / 255) im = np.expand_dims(im, axis=0) # 测试时,keep_prob设置为1.0 pred, _score = sess.run([f_cls, score], feed_dict={x: im, keep_prob: 1.0}) prob = round(np.max(_score), 4) print("{} flowers class is: {}, score: {}".format(i, int(pred), prob)) sess.close() if __name__ == '__main__': # 测试图片保存在文件夹中了,图片前面数字为所属类别 path = './image' test(path) # # # 加载预训练模型打印及保存 # model_dir = "./model/" # reader = tf.train.NewCheckpointReader(tf.train.get_checkpoint_state(model_dir).model_checkpoint_path) # data_dict = reader.get_variable_to_shape_map() # for key in data_dict: # # Print tensor name and values # print("tensor_name: ", key) # print(reader.get_tensor(key)) # # # save # f = open('./model/tensor.txt', 'a') # f.write(key) # f.write(str(reader.get_tensor(key))) # f.write('\r\n') # f.close()项目下载链接:https://download.csdn.net/download/OEMT_301/12566047 VGG16模型下载:https://download.csdn.net/download/OEMT_301/12566935