TensorFlow学习笔记(4): Tensorflow tf.data.Dataset
由ypyu创建,最终由ypyu 被浏览 7 用户
Datasets和Estimators是Tensorflow中两个最重要的模块:
- Dataset是创造input pipeline的最佳实践;
- Estimator是一个封装好的比较高层的创建Tensorflow模型的方法,Estimator包括预先训练好的模型,也可以用来创建自己的定制化的模型。
在Tensorflow框架中,Dataset和Estimator如下所示,通过两者的结合,我们能够很容易的创建Tensorflow模型,并且将数据喂给模型,本篇介绍Dataset,下一篇介绍Estimator。
Tensorflow architecture
Dataset介绍
自从Tensorflow1.4发布之后,Datasets就成为了新的给Tensorflow模型创建input pipelines的方法。这个API比用feed_dict或者queue-based pipelines性能更好,也更易于使用。
Datasets模块由以下五个类构成:
其中
- Dataset 是基类,表示一串元素(elements),其中每个元素包含了一或多个Tensor对象。例如:在一个图片pipeline中,一个元素可以是单个训练样本,它们带有一个表示图片数据的tensors和一个label组成的pair。包括了创造和变换(transform)datasets的方法,同时也允许从内存中的数据来初始化dataset。
- TextLineDataset 从文本文件中读取行数据
- TFRecordDataset 从TFRecord文件中读取records
- FixLengthRecordDataset 从二进制文件中读取固定长度records
- Iterator 它提供了主要的方式来从一个dataset中抽取元素。通过Iterator.get_next() 返回的该操作会yields出Datasets中的下一个元素,作为输入pipeline和模型间的接口使用
一个dataset由element组成,它们每个都具有相同的结构,一个element包含了一个或多个tf.Tensor对象,称为component,每个component都具有
- 一个tf.DType 表示在tensor中的元素的类型,对应Dataset.output_types
- tf.TensorShape: 表示每个element的静态shape,对应Dataset.output_shapes
创建Dataset
@staticmethod
from_tensor_slices(tensors)
# 创建Dataset
a = np.random.uniform(size=(100,2))
dataset = tf.data.Dataset.from_tensor_slices(a)
iterator = dataset.make_one_shot_iterator()
one_element = iterator.get_next()
with tf.Session() as sess:
for i in range(10):
print(sess.run([one_element]))
********输出***********
[array([0.11397362, 0.68389881])]
[array([0.33010397, 0.01920068])]
[array([0.99258612, 0.30668152])]
[array([0.62999354, 0.96661998])]
[array([0.26922582, 0.29277836])]
[array([0.70142808, 0.82017049])]
[array([0.08068107, 0.37464286])]
[array([0.70070917, 0.62077841])]
[array([0.36669648, 0.8481603 ])]
[array([0.45951399, 0.79220773])]
其中参数tensors必须是一个tensors,在0维的dize都相同。tf.data.Dataset.from_tensor_slices的真正作用是切分传入Tensor的第一个维度,生成相应的dataset。例如上面传入的是一个矩阵(100,2), tf.data.Dataset.from_tensor_slices就会切分它形状上的第一个维度,最后生成的dataset中一个含有100个元素,每个元素的形状是(2, ),即每个元素是矩阵的一行,如上所示。
处理dict输入
同样支持dict类型的输入,例如,在图像识别问题中,一个元素可以是{"image": image_tensor, "label": label_tensor}的形式,image_tensor_包含了多个图片的信息列表,labeltensor包含了多个图片的标签,tf.data.Dataset.from_tensor_slices可以处理得到每个element是一个dict,如下所示:
b = {"a":np.array([1.0,2.0,3.0,4.0,5.0]),
"b": np.random.uniform(size=(5,2))}
dataset = tf.data.Dataset.from_tensor_slices(b)
iterator = dataset.make_one_shot_iterator()
one_element = iterator.get_next()
with tf.Session() as sess:
for i in range(5):
print(sess.run([one_element]))
********输出***********
[{'a': 1.0, 'b': array([0.17629646, 0.98159967])}]
[{'a': 2.0, 'b': array([0.62656944, 0.41537445])}]
[{'a': 3.0, 'b': array([0.94459501, 0.09661302])}]
[{'a': 4.0, 'b': array([0.66029436, 0.40497688])}]
[{'a': 5.0, 'b': array([0.67671157, 0.95346658])}]
这时函数会分别切分"a"中的数值以及"b"中的数值,最终dataset中的一个元素就是类似于{"a": 1.0, "b": [0.9, 0.1]}的形式,如上所示
处理tuple输入
同样支持tuple类型的输入,tuple中每个元素的第一维都要size相同,这样处理结果得到如下所示5个元素,每个元素都是一个tuple。
dataset = tf.data.Dataset.from_tensor_slices(
(np.array([1.0, 2.0, 3.0, 4.0, 5.0]), np.random.uniform(size=(5, 2)))
)
iterator = dataset.make_one_shot_iterator()
one_element = iterator.get_next()
with tf.Session() as sess:
for i in range(5):
print(sess.run([one_element]))
********输出***********
[(1.0, array([0.31577073, 0.21829554]))]
[(2.0, array([0.1872871 , 0.56726053]))]
[(3.0, array([0.32354807, 0.2709601 ]))]
[(4.0, array([0.61253432, 0.55664856]))]
[(5.0, array([0.75801247, 0.34546886]))]
创建iterator
一旦你已经构建了一个Dataset来表示你的输入数据,下一步是创建一个Iterator来访问dataset的elements。Dataset API当前支持四种iterator,复杂度依次递增:
- one-shot
- initializable
- reinitializable
- feedable
Dataset.make_one_shot_iterator()
one-shot iterator是最简单的iterator,它只支持在一个dataset上迭代一次的操作,不需要显式初始化。One-shot iterators可以处理几乎所有的己存在的基于队列的input pipeline支持的情况,但它们不支持参数化(parameterization)
a = np.random.uniform(size=(100,2))
dataset = tf.data.Dataset.from_tensor_slices(a)
iterator = dataset.make_one_shot_iterator() # one-shot
one_element = iterator.get_next()
with tf.Session() as sess:
for i in range(10):
print(sess.run([one_element]))
********输出***********
[array([0.11397362, 0.68389881])]
[array([0.33010397, 0.01920068])]
[array([0.99258612, 0.30668152])]
[array([0.62999354, 0.96661998])]
[array([0.26922582, 0.29277836])]
[array([0.70142808, 0.82017049])]
[array([0.08068107, 0.37464286])]
[array([0.70070917, 0.62077841])]
[array([0.36669648, 0.8481603 ])]
[array([0.45951399, 0.79220773])]
Dataset.make_initializable_iterator()
initializable iterator允许对dataset的定义进行参数话,即使用一个或多个tf.placeholder()
定义的tensors,但是需要使用之前返回一个显式的iterator.initializer操作,如下所示:
max_value = tf.placeholder(tf.int64, shape=[])
dataset = tf.data.Dataset.range(max_value)
iterator = dataset.make_initializable_iterator()
next_element = iterator.get_next()
sess = tf.Session()
sess.run(tf.global_variables_initializer())
# Initialize an iterator over a dataset with 10 elements.
sess.run(iterator.initializer, feed_dict={max_value: 10})
for i in range(10):
value = sess.run(next_element)
assert i == value
print value
# Initialize the same iterator over a dataset with 100 elements.
sess.run(iterator.initializer, feed_dict={max_value: 100})
for i in range(100):
value = sess.run(next_element)
assert i == value
print value
其中dataset采用了max_value这个采用了tf.placeholder()来定义的tensor进行初始化。
Transformation
Dataset支持transformation这类操作,一个dataset通过transformation变成一个新的dataset,通常我们可以通过transformation完成数据变换、打乱、组成batch、生成epoch等一系列操作。
常用的transformation有
- map
- batch
- shuffle
- repeat
map
map接收一个函数,Dataset中的每个元素都会被当作这个函数的输入,并将函数返回值作为新的Dataset,例如我们对dict中的元素+1,如下所示
b = {"a":np.array([1.0,2.0,3.0,4.0,5.0]),
"b": np.random.uniform(size=(5,2))}
# 创建dataset
dataset = tf.data.Dataset.from_tensor_slices(b)
dataset = dataset.map(lambda x:{'a':x['a']+1,'b':x['b']})
# 创建Iterator读取数据
iterator = dataset.make_one_shot_iterator()
one_element = iterator.get_next()
with tf.Session() as sess:
for i in range(5):
print(sess.run([one_element]))
*************输出*************
[(1.0, array([0.31577073, 0.21829554]))]
[(2.0, array([0.1872871 , 0.56726053]))]
[(3.0, array([0.32354807, 0.2709601 ]))]
[(4.0, array([0.61253432, 0.55664856]))]
[(5.0, array([0.75801247, 0.34546886]))]
batch
batch是机器学习中批量梯度下降法(Batch Gradient Descent, BGD)的概念,在每次梯度下降的时候取batch-size的数据量做平均来获取梯度下降方向,例如我们将batch-size设为2,那么每次iterator都会得到2个数据,如下所示
b = {"a":np.array([1.0,2.0,3.0,4.0,5.0]),
"b": np.random.uniform(size=(5,2))}
# 创建dataset
dataset = tf.data.Dataset.from_tensor_slices(b)
# dataset = dataset.map(lambda x:{'a':x['a']+1,'b':x['b']})
dataset = dataset.batch(32)
# 创建Iterator读取数据
iterator = dataset.make_one_shot_iterator()
one_element = iterator.get_next()
with tf.Session() as sess:
for i in range(5):
print(sess.run([one_element]))
*************输出*************
[{'a': array([1., 2.]), 'b': array([[0.70024193, 0.72232312],
[0.64863353, 0.42703828]])}]
[{'a': array([3., 4.]), 'b': array([[0.02262048, 0.19707233],
[0.82844146, 0.15107684]])}]
[{'a': array([5.]), 'b': array([[0.60242078, 0.44079629]])}]
---------------------------------------------------------------------------
OutOfRangeError Traceback (most recent call last)
<ipython-input-65-ccf04cdeb41d> in <module>()
8 with tf.Session() as sess:
9 for i in range(5):
---> 10 print(sess.run([one_element]))
由于dataset有5个element,不能被2整除,因此报了OutOfRangeError异常。
shuffle
shuffle的功能为打乱dataset中的元素, 它会维持一个固定大小的buffer,并从该buffer中随机均匀地选择下一个元素
shuffle(
buffer_size, # shuffle的size,表示从现有dataset中采样元素的个数
seed=None, # random seed
reshuffle_each_iteration=None # boolean 表示在每次迭代结束都需要reshuffle
)
b = {"a":np.array([1.0,2.0,3.0,4.0,5.0]),
"b": np.random.uniform(size=(5,2))}
# 创建dataset
dataset = tf.data.Dataset.from_tensor_slices(b)
# dataset = dataset.batch(2)
dataset = dataset.shuffle(4)
# 创建Iterator读取数据
iterator = dataset.make_one_shot_iterator()
one_element = iterator.get_next()
with tf.Session() as sess:
for i in range(5):
print(sess.run([one_element]))
*************输出*************
[{'a': 4.0, 'b': array([0.82844146, 0.15107684])}]
[{'a': 2.0, 'b': array([0.64863353, 0.42703828])}]
[{'a': 1.0, 'b': array([0.70024193, 0.72232312])}]
[{'a': 5.0, 'b': array([0.60242078, 0.44079629])}]
[{'a': 3.0, 'b': array([0.02262048, 0.19707233])}]
repeat
repeat的功能就是将整个序列重复多次,主要用来处理机器学习中的epoch,假设原先的数据是一个epoch,使用repeat(5)就可以将之变成5个epoch
b = {"a":np.array([1.0,2.0,3.0,4.0,5.0]),
"b": np.random.uniform(size=(5,2))}
# 创建dataset
dataset = tf.data.Dataset.from_tensor_slices(b)
dataset = dataset.repeat(5)
dataset = dataset.shuffle(4)
# 创建Iterator读取数据
iterator = dataset.make_one_shot_iterator()
one_element = iterator.get_next()
with tf.Session() as sess:
for i in range(5):
print(sess.run([one_element]))
*************输出*************
[{'a': 4.0, 'b': array([0.82844146, 0.15107684])}]
[{'a': 1.0, 'b': array([0.70024193, 0.72232312])}]
[{'a': 2.0, 'b': array([0.64863353, 0.42703828])}]
[{'a': 5.0, 'b': array([0.60242078, 0.44079629])}]
[{'a': 3.0, 'b': array([0.02262048, 0.19707233])}]
[{'a': 3.0, 'b': array([0.02262048, 0.19707233])}]
[{'a': 5.0, 'b': array([0.60242078, 0.44079629])}]
[{'a': 1.0, 'b': array([0.70024193, 0.72232312])}]
[{'a': 4.0, 'b': array([0.82844146, 0.15107684])}]
[{'a': 2.0, 'b': array([0.64863353, 0.42703828])}]
[{'a': 3.0, 'b': array([0.02262048, 0.19707233])}]
[{'a': 2.0, 'b': array([0.64863353, 0.42703828])}]
[{'a': 1.0, 'b': array([0.70024193, 0.72232312])}]
[{'a': 5.0, 'b': array([0.60242078, 0.44079629])}]
[{'a': 4.0, 'b': array([0.82844146, 0.15107684])}]
[{'a': 4.0, 'b': array([0.82844146, 0.15107684])}]
[{'a': 1.0, 'b': array([0.70024193, 0.72232312])}]
[{'a': 3.0, 'b': array([0.02262048, 0.19707233])}]
[{'a': 2.0, 'b': array([0.64863353, 0.42703828])}]
[{'a': 5.0, 'b': array([0.60242078, 0.44079629])}]
[{'a': 4.0, 'b': array([0.82844146, 0.15107684])}]
[{'a': 2.0, 'b': array([0.64863353, 0.42703828])}]
[{'a': 5.0, 'b': array([0.60242078, 0.44079629])}]
[{'a': 1.0, 'b': array([0.70024193, 0.72232312])}]
[{'a': 3.0, 'b': array([0.02262048, 0.19707233])}]
实验部分
该实验部分来自于[1]中介绍的Iris花的分类问题,数据集在CSV文件中,我们将从CSV文件中读取数据,每行包括五个数值,其中前四个表示训练集,最后一个表示花的类别
![](data:image/svg+xml;utf8,<svg%20xmlns='http://www.w3.org/2000/svg' width='1030' height='212'></svg>)
其中label
- 0代表Iris Setosa
- 1代表Versicolor
- 2代表Virginica
数据样例如下所示
6.4 2.8 5.6 2.2 2
5 2.3 3.3 1 1
4.9 2.5 4.5 1.7 2
4.9 3.1 1.5 0.1 0
5.7 3.8 1.7 0.3 0
4.4 3.2 1.3 0.2 0
5.4 3.4 1.5 0.4 0
我们处理数据部分代码如下所示:
import os
import six.moves.urllib.request as request
import tensorflow as tf
# Check that we have correct TensorFlow version installed
tf_version = tf.__version__
print("TensorFlow version: {}".format(tf_version))
assert "1.4" <= tf_version, "TensorFlow r1.4 or later is needed"
# Windows users: You only need to change PATH, rest is platform independent
PATH = "/tmp/tf_dataset_and_estimator_apis"
# Fetch and store Training and Test dataset files
PATH_DATASET = PATH + os.sep + "dataset"
FILE_TRAIN = PATH_DATASET + os.sep + "iris_training.csv"
FILE_TEST = PATH_DATASET + os.sep + "iris_test.csv"
URL_TRAIN = "http://download.tensorflow.org/data/iris_training.csv"
URL_TEST = "http://download.tensorflow.org/data/iris_test.csv"
def download_dataset(url, file):
if not os.path.exists(PATH_DATASET):
os.makedirs(PATH_DATASET)
if not os.path.exists(file):
data = request.urlopen(url).read()
with open(file, "wb") as f:
f.write(data)
f.close()
download_dataset(URL_TRAIN, FILE_TRAIN)
download_dataset(URL_TEST, FILE_TEST)
tf.logging.set_verbosity(tf.logging.INFO)
# The CSV features in our training & test data
feature_names = [
'SepalLength',
'SepalWidth',
'PetalLength',
'PetalWidth']
# Create an input function reading a file using the Dataset API
# Then provide the results to the Estimator API
def my_input_fn(file_path, perform_shuffle=False, repeat_count=1):
def decode_csv(line):
parsed_line = tf.decode_csv(line, [[0.], [0.], [0.], [0.], [0]])
label = parsed_line[-1] # Last element is the label
del parsed_line[-1] # Delete last element
features = parsed_line # Everything but last elements are the features
d = dict(zip(feature_names, features)), label
return d
dataset = (tf.data.TextLineDataset(file_path) # Read text file
.skip(1) # Skip header row
.map(decode_csv)) # Transform each elem by applying decode_csv fn
if perform_shuffle:
# Randomizes input using a window of 256 elements (read into memory)
dataset = dataset.shuffle(buffer_size=256)
dataset = dataset.repeat(repeat_count) # Repeats dataset this # times
dataset = dataset.batch(32) # Batch size to use
iterator = dataset.make_one_shot_iterator()
batch_features, batch_labels = iterator.get_next()
return batch_features, batch_labels
next_batch = my_input_fn(FILE_TRAIN, True) # Will return 32 random elements
# Now let's try it out, retrieving and printing one batch of data.
# Although this code looks strange, you don't need to understand
# the details.
with tf.Session() as sess:
first_batch = sess.run(next_batch)
print(first_batch)
*************输出*************
({'SepalLength': array([5.1, 7.7, 4.8, 5.8, 4.9, 5.1, 6.1, 5.2, 5. , 4.4, 4.8, 7.3, 7.7,
6.5, 5.1, 5.6, 5.8, 5.7, 6.8, 5.7, 5.5, 5.7, 5.4, 4.9, 6.5, 7.4,
4.9, 5.4, 6.2, 6.3, 6.4, 5.8], dtype=float32), 'PetalWidth': array([0.3, 2.3, 0.2, 1.9, 1. , 0.4, 1.2, 1.4, 1. , 0.2, 0.2, 1.8, 2.3,
2. , 0.3, 1.3, 0.2, 1.3, 2.3, 0.3, 1. , 1.3, 0.4, 0.2, 2. , 1.9,
1.7, 0.4, 1.5, 1.3, 2.3, 1.9], dtype=float32), 'PetalLength': array([1.4, 6.9, 1.6, 5.1, 3.3, 1.9, 4.7, 3.9, 3.5, 1.3, 1.6, 6.3, 6.1,
5.1, 1.5, 4.2, 1.2, 4.1, 5.9, 1.7, 3.7, 4.5, 1.5, 1.4, 5.2, 6.1,
4.5, 1.7, 4.5, 4.4, 5.3, 5.1], dtype=float32), 'SepalWidth': array([3.5, 2.6, 3.1, 2.7, 2.4, 3.8, 2.8, 2.7, 2. , 3.2, 3.4, 2.9, 3. ,
3.2, 3.8, 2.7, 4. , 2.8, 3.2, 3.8, 2.4, 2.8, 3.4, 3. , 3. , 2.8,
2.5, 3.9, 2.2, 2.3, 3.2, 2.7], dtype=float32)}, array([0, 2, 0, 2, 1, 0, 1, 1, 1, 0, 0, 2, 2, 2, 0, 1, 0, 1, 2, 0, 1, 1,
0, 0, 2, 2, 2, 0, 1, 1, 2, 2], dtype=int32))
这样我们每次执行next_batch都会得到32个样本的结果,组成一个tuple,其中tuple第一个元素是feature构成的dict,第二个元素是label构成的向量,每个elemento都包含了32个样本的信息,如上所示。代码请见https://github.com/tensorflow/models/blob/master/samples/outreach/blogs/blog_estimators_dataset.py
参考
[1]https://developers.googleblog.com/2017/09/introducing-tensorflow-datasets.html
[2]https://www.leiphone.com/news/201711/zV7yM5W1dFrzs8W5.html
[3]https://github.com/tensorflow/models/blob/master/samples/outreach/blogs/blog_estimators_dataset.py