我们回到 tensorflow 中来, 之前我们使用 tensorflow 实现了比较简单的 dnn 和 cnn。 了解了 tf 大概的玩法。 我们在做这些 demo 的时候使用的是比较简单的数据, dnn 中我们是使用 numpy 创造的多维数组来做演示的。 cnn 中我们使用了官方的 mnist 来演示, tensorflow 中针对 minist 写了一个比较好的库,能帮助我们实现 mini batch,多轮迭代等等。但是实际场景中我们的逻辑会复杂的多。 也没有这种专门针对 mnist 做的数据读取的库, 那么我们怎么来应对这些问题呢? 接来我们来演示一下 tf 中最新的数据流 API, Dataset
dataset 是 tf 目前比较推荐的数据处理管道 API。例如,图片模型的管道可能会汇聚分布式文件系统中的文件中的数据、对每个图片应用随机扰动,并将随机选择的图片合并成用于训练的批次。文本模型的管道可能包括从原始文本数据中提取符号、根据对照表将其转换为嵌入标识符,以及将不同长度的序列组合成批次数据。使用 tf.data API 可以轻松处理大量数据、不同的数据格式以及复杂的转换。
接下来我们从一个例子中看一下,我们如何创建一个 dataset 并使用它。
table = get_table('test/transaction.table')
df = table.to_pandas()
# 将数据转换成dataset
data = df[["tran_no","card_no", "pos_entry_mode"]].as_matrix().reshape(-1, 3)
label = df[["label"]].as_matrix().reshape(-1, 1)
dataset = tf.data.Dataset.from_tensor_slices((data, label))
我把如果获取一个数据的部分先隐藏掉。 现在我们直接从 get_table 方法中拿到了一份数据,并转换为 pandas 的 dataframe 了 (如果不了解 pandas 的同学可以 google 一下)。 那么通过之前的学习我们知道我们是不能够直接使用这样一份数据的。因为在训练过程中我们要训练不同的轮数, 要做随机打乱顺序, 要做 mini batch 等等。 所以我们不能可能直接使用一份没有经过处理的数据。 所以我们在使用 pd 把训练样本和 label 都抽取出来以后。 使用 tf.data.Dataset.from_tensor_slices 方法把这两份数据都加入到了 dataset 中管理。 tf.data.Dataset 对象上的方法调用将其转换为新的 Dataset。 我们可以把数组或者已经存在的 tensor 都交给它处理。
熟悉函数式编程的人一定对 map 方法很熟悉。 我之前写 spark 的教程的时候也写过关于 map reduce 这些操作。 那么 dataset 也有类似的操作,也叫 map。 我们可以看看下面的应用。
def deal_data(data, label):
data = tf.cast(data, dtype=tf.float32)
label = tf.cast(label, dtype=tf.float32)
return data, label
dataset = dataset.repeat(10).map(deal_data).batch(
32).shuffle(buffer_size=5)
看上面我们这个比较简单的做法。 我们现实定义了一个 deal_data 的方法用来做数据处理, 传递 data 和 label 两个数据进来。 刚才我们把 df 中的数据放入 dataset 的时候是把训练样本和 label 一起了进来。 这个方法里面做的事情很简单。 就是把之前的数据类型都使用 tf.cast 方法做类型转换,变成 float32. 这样我们之后的训练过程才能够做统一处理。 然后在下面的使用 dataset 对数据做处理的方法中,我们看到了 map 这个调用。 关于其他的方法大家可以暂时先不去看。 就先看 map 的调用。 这个 map 跟 python 中的 map 还有 js 中的还有 spark 中的 map 都是一样的。 都是使用函数式编程的方式, 针对数据集合中的每一个数据进行处理。map 中我们传递的是一个函数, 就是我们上面的针对每个数据做类型转换的方法。
在上面的例子中我们说明了 map 是做什么用的。但是我们还有一些其他的数据流处理方式来达到在训练过程中对数据的需求。比如说处理多个周期。
在机器学习中,我们经常能听到训练多少轮。 也有人叫迭代轮数,迭代次数。我们之前讲逻辑回归的原理的时候说过。 在这种算法中, 公式是 y= w1x1 + w2x2 + .... wnxn +b 。 其中 x 是特征,w 是权重参数。 我们机器学习,学的就是其中的 w 和 b。通过名字叫梯度下降的算法更新 w 和 b 的值。 我们在训练数据上执行一次全量的计算, 就是一轮迭代。 就会产生一轮 w 和 b 的值。 然后我们可以根据这个 w 和 b 的值计算出预测值和真实值的误差,我们叫损失函数。 之后我们可以使用同样的数据在上一次计算好的 w 和 b 的值的基础上,在进行一轮计算。 这里我们就叫做再训练一轮。 然后会计算出新的 w 和 b 的值。 再根据损失函数继续计算出误差。 这样我们可以在一份数据上这样一轮一轮的计算迭代下去。 这就是我们的迭代计算。 所以我们一般在做模型训练的时候,都会有一个超参数叫训练轮数。 这就需要我们的算发在训练数据中执行多次。 或者说,在 tf 中我们需要通过 dataset 的 API 去获取数据, 但是我们不希望有复杂的针对这些数据的处理逻辑。 我们希望的就是在训练函数中,每一次都向 dataset 取固定数量的数据。 我不关心它怎么取的。 那怎么做呢, 我们就用到了 dataset 中的 repeat 这个函数。
上面我们看到,在数据处理流中最先调用的就是 dataset.repeat(10)。 可以认为我们取数据的时候把这份数据重复了 10 次。 这样在之后我们的训练函数去用 for 循环读取数据做训练的时候。 就能拿到 10 份这样的数据。 也就是实现了我们训练 10 轮的目的了。 我们把这种行为叫做训练轮数,迭代次数。 也有人叫 epoch(虽然在吴恩达的视频中管 mini batch 才叫 epoch)
之前讲逻辑回归的时候也说过为了增加迭代速度和节省资源开销。 我们不会一下子把所有的数据都读入内存中做整体的训练过程,而是走 mini batch 的过程。先读取一小部分数据,走一次迭代训练,并更新 w 和 b 的参数。然后再读取另一小部分根据之前训练的参数的基础上继续走梯度下降的算法更新参数。 这样的过程就叫做 mini batch。 那么在 tf 中的 dataset 中,也是可以很容易的实现这个操作的。还是上面的例子,再经过了 repeat 和 map 后。我们调用的就是 batch 函数,这个函数跟他的名字一样,就是为了 mini batch 而生的。 它规定了之后没读取一份数据的时候,都只能读取 32 条数据 (因为我们参数写的是 32)。
shuffle 意思为洗牌, 随机打乱之前的数据的顺序。我们在训练中肯定希望数据是随机分布的,所以用 shuffle 这个函数可以很方面的实现这一点。
消耗 dataset 中的数据的方式我们也很熟悉。 就是迭代器 (Iterator)。 跟我们数值的迭代器一样, 他会消耗掉 dataset 中一个 mini batch 的数据 (之前设置的是 32, 所以取 32 条数据), 我们可以通过下面的方式取得一个 batch 的数据
dataset = dataset.repeat(10).map(deal_data).batch(
32).shuffle(buffer_size=5)
iterator = dataset.make_initializable_iterator()
batch_features_op, batch_label_op = iterator.get_next()
可以看到上面的 demo, 从 dataset 中初始化一个迭代器, 然后使用 get_next 函数就可以取出一份 batch,也就是 32 条数据了。 包含了 label 和 data。
# 定义每一层神经网络的参数
w1 = tf.Variable(tf.random_normal([3,3], stddev=1, seed=1))
w2 = tf.Variable(tf.random_normal([3,1], stddev=1, seed=1))
b1 = tf.Variable(tf.zeros([3]), dtype=tf.float32)
b2 = tf.Variable(tf.zeros([1]), dtype=tf.float32)
a = tf.nn.relu(tf.matmul(batch_features_op,w1) + b1)
y = tf.matmul(a, w2) + b2
# print(y.shape)
# print(label_next_element.shape)
# print(train_next_element.shape)
cost = tf.nn.sigmoid_cross_entropy_with_logits(logits=y, labels=batch_label_op, name=None)
train_op = tf.train.GradientDescentOptimizer(0.01).minimize(cost)
with tf.Session() as sess:
sess.run(iterator.initializer)
init = tf.initialize_all_variables()
sess.run(init)
while True:
try:
sess.run(train_op)
except tf.errors.OutOfRangeError:
break
训练代码跟以前的一样。就是一个简单的单隐藏层的神经网络。 使用 sigmoid 作为输出层的激活函数来处理 2 分类的问题。 需要注意的就是我们在运行的时候传递的是从 dataset 中取得的一个 batch 的 data 和 label 在做。 而且在训练过程中我们使用的是一个死循环而不是之前的在 for 训练中指定训练的次数。 这时候我们把控制权全放在了 dataset 中。 只要通过迭代器能从 dataset 中取得数据,我们会一直训练下去。 因为之前我们在 dataset 中使用 repeat 来重复了 10 次数据,间接的达到了训练 10 轮的目的, 通过 batch 函数让每次只取 32 条数据来达到 mini batch 的目的 。 所以我们在训练的时候就不需要像以前一样自己写训练轮数和 mini batch 的逻辑了。 当迭代器把 dataset 中的数据消耗干净之后, 就会抛出 outofRageError。 所以我们捕获一下就可以了。 这样我们就完成了对模型训练中针对训练轮数和 mini batch 的操控。
dataset 其实还有一些比较细节的 API。 像是迭代器有几种不同的迭代器的用法。 dataset 的创建方式可以从我演示的从内存中来, 也可以从文本文件中来, 也可以从 TFRecord 文件中来, 我没有详细说明。 之后慢慢讲