将数据导入TensorFlow Estimator模型
机器学习就是数据的数量和质量。所述数据通常以各种来源提供:
- 文本文件(CSV,TSV,Excel)
- 数据库
- 流媒体来源
TensorFlow estimators 使用输入函数。输入函数返回一组特征和标签。特征是特征名称和数值数组的字典。标签是一系列值。需要进行一些管理,例如对数据进行分组并分批返回。
让我们从简单的选项开始吧。如果您将数据放在一个文件中,您可以将其完全读入内存,并且文件采用文本分隔格式(CSV,TSV等),则所需的工作量最小。您可以使用numpy或pandas读取文件,通常就是这种情况。
当您使用tf.estimatorAPI时,您需要在训练期间传入输入函数。
train(
input_fn,
hooks=None,
steps=None,
max_steps=None,
saving_listeners=None
)
我们的重点是input_fn!我们将使用Boston Housing数据(https://www.kaggle.com/c/boston-housing)。
如果您的数据采用numpy格式,则可以使用它tf.estimator.inputs.numpy_input_function来获取数据。首先,您需要为您的特征定义字典,Python代码如下:
# extract numpy data from a DataFrame
crim = train_df['crim'].values
zn = train_df['zn'].values
indus = train_df['indus'].values
chas = train_df['chas'].values
nox = train_df['nox'].values
rm = train_df['rm'].values
age = train_df['age'].values
dis = train_df['dis'].values
rad = train_df['rad'].values
tax = train_df['tax'].values
ptratio = train_df['ptratio'].values
black = train_df['black'].values
lstat = train_df['lstat'].values
medv = train_df['medv'].values
# create a dictionary
x_dict = {
'crim': crim,
'zn': zn,
'indus': indus,
'chas': chas,
'nox': nox,
'rm': rm,
'age': age,
'dis': dis,
'rad': rad,
'tax': tax,
'ptratio': ptratio,
'black': black,
'lstat': lstat
}
随着我们的字典到位,我们可以继续定义我们的输入函数,Python代码如下:
def np_training_input_fn(x, y):
return tf.estimator.inputs.numpy_input_fn(
x= x,
y= y,
batch_size= 32,
num_epochs= 5, # this way you can leave out steps from training
shuffle= True,
queue_capacity= 5000
)
在函数中,我们传入x,这是字典,y是标签。我们还可以传入我们的批大小、epoch的数量以及是否shuffle 数据。批次大小是您应当根据经验确定的一个超参数。epochs的数量是你想查看数据的次数。为了训练,设置任何数字。对于测试,将这个设置为1。
创建estimator之前,需要设置特征列。Python示例如下:
feature_cols = [tf.feature_column.numeric_column(k) for k in x_dict.keys()]
lin_model = tf.estimator.LinearRegressor(feature_columns=feature_cols)
lin_model.train(np_training_input_fn(x_dict, medv), steps=10)
对于DataFrame,您将继续定义输入函数,Python代码:
def pd_input_fn(df, y_label):
return tf.estimator.inputs.pandas_input_fn(
x=df,
y=df[y_label],
batch_size = 32,
num_epochs = 5,
shuffle = True,
queue_capacity = 1000,
num_threads = 1
)
注意,在上面的方法中,我们继续传递DataFrame,并在其中完成标签。如果标签不在传递给x的内容中,就会出现错误。把一个series 传递给y,其他的参数和你处理numpy的时候是一样的。
模型在未来的发展中会得到同样的处理。您创建模型并指定特征列。然后继续训练模式。
lin_model = tf.estimator.LinearRegressor(feature_columns=feature_cols)
lin_model.train(pd_input_fn(train_df, 'medv'), steps=10)
当你能把数据读入内存时,一切都很好。但是,当你做不到的时候会发生什么呢?当您的训练数据集是100GB时会发生什么?
好消息是这样的数据集通常是由分布式系统生成的,所以您的文件将被分片。这意味着数据将存储在不同的文件中,这些文件的名称类似于data-0001- 1000。
如果你从未接触过大数据,你的第一个想法可能是使用glob。不要这样做,否则你会耗尽你的内存,训练也会停止。
这些类型的文件通常没有标题,这是一件好事。您将从定义列名称列表开始,列名称列表的顺序应该与您的列在文件中的顺序一致。其次,定义标签列。最后,定义一个默认值列表,以便在读取时遇到缺失值时处理它们
CSV_COLUMNS = ['medv', 'crim', 'zn', 'lstat', 'tax', 'rad', 'chas', 'nox', 'indus', 'ptratio', 'age', 'black', 'rm', 'dis']
LABEL_COLUMN = 'medv'
DEFAULTS = [[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0]]
接下来,我们定义一个函数来读取文本数据并返回我们的格式,就像我们之前的函数处理它们一样。创建函数的方法的一个优点是它可以处理通配符,例如data-* 。Python代码如下:
def read_dataset(filename, mode, batch_size = 512):
def _input_fn():
def decode_csv(value_column):
columns = tf.decode_csv(value_column, record_defaults = DEFAULTS)
features = dict(zip(CSV_COLUMNS, columns))
label = features.pop(LABEL_COLUMN)
return features, label
# Create list of files that match pattern
file_list = tf.gfile.Glob(filename)
# Create dataset from file list
dataset = tf.data.TextLineDataset(file_list).map(decode_csv)
if mode == tf.estimator.ModeKeys.TRAIN:
num_epochs = None # indefinitely
dataset = dataset.shuffle(buffer_size = 10 * batch_size)
else:
num_epochs = 1 # end-of-input after this
dataset = dataset.repeat(num_epochs).batch(batch_size)
return dataset.make_one_shot_iterator().get_next()
return _input_fn
该函数接受三个参数:一个模式(以便我们能够匹配多个文件)、mode(训练或评估)和一个batch size。注意read_dataset返回一个函数。我们已经调用了函数_input_fn。在这个函数中,我们有一个名为decode_csv的函数,它将创建一个字典、提取一个series,并以我们在本文开头提到的tuple 格式返回它们。
其次,我们的函数使用glob创建了一个文件名列表。是的,仍然使用glob,但是我们没有将结果传递给pandas.read_csv()。相反,可以使用tf.data.TextLineDataset()。它需要三个参数:文件名列表、压缩格式(none、ZLIB或GZIP)和缓冲区大小。read_csv和TextLineDataset的主要区别在于前者将内容读入内存(我们可以分批读入),而后者返回一个迭代器。
因此,我们的函数TextLineDataset通过调用函数创建一个数据集map,传入decode_csv。它接下来要做的是检查我们是否处于训练模式。如果我们不是,那么我们的epochs被设置为1.如果我们是,那么它将设置为我们想要的许多epochs。我们的训练数据集也被shuffled。然后将我们的数据集设置为重复我们想要的epochs,并为我们的批量大小进行配置。
最后,我们返回一次性迭代器,然后调用get_next()。所有这些工作都是通过我们之前看到的函数在幕后处理的。我们可以使用以下方法创建我们的训练,评估和测试输入函数:
def get_train():
return read_dataset('./train-.*', mode = tf.estimator.ModeKeys.TRAIN)
def get_valid():
return read_dataset('./valid.csv', mode = tf.estimator.ModeKeys.EVAL)
def get_test():
return read_dataset('./test.csv', mode = tf.estimator.ModeKeys.EVAL)
其余的过程与我们看到的完全相同。我们可以创建我们的estimator 并像往常一样训练它。
对于实际项目,您将首先使用pandas和tf.estimator.inputs。读取您的一个训练文件。但是,要在训练中使用所有文件,您将需要使用tf.data.TextLineDataset。