将数据导入TensorFlow Estimator模型

将数据导入TensorFlow Estimator模型

机器学习就是数据的数量和质量。所述数据通常以各种来源提供:

  • 文本文件(CSV,TSV,Excel)
  • 数据库
  • 流媒体来源

TensorFlow estimators 使用输入函数。输入函数返回一组特征和标签。特征是特征名称和数值数组的字典。标签是一系列值。需要进行一些管理,例如对数据进行分组并分批返回。

让我们从简单的选项开始吧。如果您将数据放在一个文件中,您可以将其完全读入内存,并且文件采用文本分隔格式(CSV,TSV等),则所需的工作量最小。您可以使用numpy或pandas读取文件,通常就是这种情况。

当您使用tf.estimatorAPI时,您需要在训练期间传入输入函数。

train(

input_fn,

hooks=None,

steps=None,

max_steps=None,

saving_listeners=None

)

我们的重点是input_fn!我们将使用Boston Housing数据(https://www.kaggle.com/c/boston-housing)。

如果您的数据采用numpy格式,则可以使用它tf.estimator.inputs.numpy_input_function来获取数据。首先,您需要为您的特征定义字典,Python代码如下:

# extract numpy data from a DataFrame

crim = train_df['crim'].values

zn = train_df['zn'].values

indus = train_df['indus'].values

chas = train_df['chas'].values

nox = train_df['nox'].values

rm = train_df['rm'].values

age = train_df['age'].values

dis = train_df['dis'].values

rad = train_df['rad'].values

tax = train_df['tax'].values

ptratio = train_df['ptratio'].values

black = train_df['black'].values

lstat = train_df['lstat'].values

medv = train_df['medv'].values

# create a dictionary

x_dict = {

'crim': crim,

'zn': zn,

'indus': indus,

'chas': chas,

'nox': nox,

'rm': rm,

'age': age,

'dis': dis,

'rad': rad,

'tax': tax,

'ptratio': ptratio,

'black': black,

'lstat': lstat

}

随着我们的字典到位,我们可以继续定义我们的输入函数,Python代码如下:

def np_training_input_fn(x, y):

return tf.estimator.inputs.numpy_input_fn(

x= x,

y= y,

batch_size= 32,

num_epochs= 5, # this way you can leave out steps from training

shuffle= True,

queue_capacity= 5000

)

在函数中,我们传入x,这是字典,y是标签。我们还可以传入我们的批大小、epoch的数量以及是否shuffle 数据。批次大小是您应当根据经验确定的一个超参数。epochs的数量是你想查看数据的次数。为了训练,设置任何数字。对于测试,将这个设置为1。

创建estimator之前,需要设置特征列。Python示例如下:

feature_cols = [tf.feature_column.numeric_column(k) for k in x_dict.keys()]

lin_model = tf.estimator.LinearRegressor(feature_columns=feature_cols)

lin_model.train(np_training_input_fn(x_dict, medv), steps=10)

对于DataFrame,您将继续定义输入函数,Python代码:

def pd_input_fn(df, y_label):

return tf.estimator.inputs.pandas_input_fn(

x=df,

y=df[y_label],

batch_size = 32,

num_epochs = 5,

shuffle = True,

queue_capacity = 1000,

num_threads = 1

)

注意,在上面的方法中,我们继续传递DataFrame,并在其中完成标签。如果标签不在传递给x的内容中,就会出现错误。把一个series 传递给y,其他的参数和你处理numpy的时候是一样的。

模型在未来的发展中会得到同样的处理。您创建模型并指定特征列。然后继续训练模式。

lin_model = tf.estimator.LinearRegressor(feature_columns=feature_cols)

lin_model.train(pd_input_fn(train_df, 'medv'), steps=10)

当你能把数据读入内存时,一切都很好。但是,当你做不到的时候会发生什么呢?当您的训练数据集是100GB时会发生什么?

好消息是这样的数据集通常是由分布式系统生成的,所以您的文件将被分片。这意味着数据将存储在不同的文件中,这些文件的名称类似于data-0001- 1000。

如果你从未接触过大数据,你的第一个想法可能是使用glob。不要这样做,否则你会耗尽你的内存,训练也会停止。

这些类型的文件通常没有标题,这是一件好事。您将从定义列名称列表开始,列名称列表的顺序应该与您的列在文件中的顺序一致。其次,定义标签列。最后,定义一个默认值列表,以便在读取时遇到缺失值时处理它们

CSV_COLUMNS = ['medv', 'crim', 'zn', 'lstat', 'tax', 'rad', 'chas', 'nox', 'indus', 'ptratio', 'age', 'black', 'rm', 'dis']

LABEL_COLUMN = 'medv'

DEFAULTS = [[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0]]

接下来,我们定义一个函数来读取文本数据并返回我们的格式,就像我们之前的函数处理它们一样。创建函数的方法的一个优点是它可以处理通配符,例如data-* 。Python代码如下:

def read_dataset(filename, mode, batch_size = 512):

def _input_fn():

def decode_csv(value_column):

columns = tf.decode_csv(value_column, record_defaults = DEFAULTS)

features = dict(zip(CSV_COLUMNS, columns))

label = features.pop(LABEL_COLUMN)

return features, label

# Create list of files that match pattern

file_list = tf.gfile.Glob(filename)

# Create dataset from file list

dataset = tf.data.TextLineDataset(file_list).map(decode_csv)

if mode == tf.estimator.ModeKeys.TRAIN:

num_epochs = None # indefinitely

dataset = dataset.shuffle(buffer_size = 10 * batch_size)

else:

num_epochs = 1 # end-of-input after this

dataset = dataset.repeat(num_epochs).batch(batch_size)

return dataset.make_one_shot_iterator().get_next()

return _input_fn

该函数接受三个参数:一个模式(以便我们能够匹配多个文件)、mode(训练或评估)和一个batch size。注意read_dataset返回一个函数。我们已经调用了函数_input_fn。在这个函数中,我们有一个名为decode_csv的函数,它将创建一个字典、提取一个series,并以我们在本文开头提到的tuple 格式返回它们。

其次,我们的函数使用glob创建了一个文件名列表。是的,仍然使用glob,但是我们没有将结果传递给pandas.read_csv()。相反,可以使用tf.data.TextLineDataset()。它需要三个参数:文件名列表、压缩格式(none、ZLIB或GZIP)和缓冲区大小。read_csv和TextLineDataset的主要区别在于前者将内容读入内存(我们可以分批读入),而后者返回一个迭代器。

因此,我们的函数TextLineDataset通过调用函数创建一个数据集map,传入decode_csv。它接下来要做的是检查我们是否处于训练模式。如果我们不是,那么我们的epochs被设置为1.如果我们是,那么它将设置为我们想要的许多epochs。我们的训练数据集也被shuffled。然后将我们的数据集设置为重复我们想要的epochs,并为我们的批量大小进行配置。

最后,我们返回一次性迭代器,然后调用get_next()。所有这些工作都是通过我们之前看到的函数在幕后处理的。我们可以使用以下方法创建我们的训练,评估和测试输入函数:

def get_train():

return read_dataset('./train-.*', mode = tf.estimator.ModeKeys.TRAIN)

def get_valid():

return read_dataset('./valid.csv', mode = tf.estimator.ModeKeys.EVAL)

def get_test():

return read_dataset('./test.csv', mode = tf.estimator.ModeKeys.EVAL)

其余的过程与我们看到的完全相同。我们可以创建我们的estimator 并像往常一样训练它。

对于实际项目,您将首先使用pandas和tf.estimator.inputs。读取您的一个训练文件。但是,要在训练中使用所有文件,您将需要使用tf.data.TextLineDataset。

相关推荐