基于encoder-decoder和DCGAN的轨迹压缩研究
目录
基于encoder-decoder和DCGAN的轨迹压缩研究
内容介绍
首先,本文的实验结果为:失败。
本文仅实现了程序编码和结果可视化,但是没实现的本文的目标。
理论可行的一个课题研究,但是经过初步尝试,似乎不可行,我也不知道哪些地方错了,或者需要加入那些数据处理步骤,或者程序本身就那里逻辑错了。
本文给出了完整的程序和实验思路,用来交流。
由于我对经典的算法比如小波分析,稀疏编码等知识不太了解(本文并不是这个领域的学者,只是一个菜鸟学生)。所以有可能下面的讨论想法可能都是错的,所以希望大家轻喷。
更多的目的只是希望大家交流,本来想写一篇小论文的,但是经过一天尝试,似乎不太行(感觉存在很多不知道怎么处理的问题)。索性写出来交流(虽然不知道有没有人看)。
问题介绍
AIS数据(可以理解为船舶传感器数据)存在很多的冗余情况,AIS信息由于具有重要的研究价值,不能轻易的删除。但是存储一直累计的海量的AIS数据带来了一些存储上的困难。但是目前没有一种可以一种有效的压缩方法(可以实现压缩和还原)。
因此提出课题:能不能向图片压缩一样,向小波分析一样,找到一个完备的字典,能够将AIS数据实现压缩和还原。
在目前的研究中,较为热门的就是用自编码神经网络,设置合适地约束条件,在不停的迭代学习中,来完成字典的寻找。
本文思路
网络模型
构建一个编码器encoder
,一个解码器decoder
,和一个判别器discriminator
。
其中encoder
用来实现压缩编码,decoder
实现解码,discriminator
用来判断解码出来的轨迹是否和原轨迹一样。
代码如下:
def make_encoder(): model = tf.keras.Sequential() model.add(tf.keras.layers.Flatten()) model.add(tf.keras.layers.Dense(200,activation='relu')) model.add(tf.keras.layers.Dense(200,activation='tanh')) model.add(tf.keras.layers.Dense(50,activation='relu')) model.add(tf.keras.layers.Dense(25,activation='relu')) model.add(tf.keras.layers.Dense(20,activation='tanh')) model.add(tf.keras.layers.Reshape((100,2))) return model def make_decoder(): model = tf.keras.Sequential() model.add(tf.keras.layers.Flatten()) model.add(tf.keras.layers.Dense(20, activation='relu')) model.add(tf.keras.layers.Dense(25, activation='relu')) model.add(tf.keras.layers.Dense(50, activation='relu')) model.add(tf.keras.layers.Dense(200, activation='relu')) model.add(tf.keras.layers.Dense(200, activation=None)) model.add(tf.keras.layers.Reshape((100,2))) return model def make_discriminator_model(): model= tf.keras.Sequential() model.add(tf.keras.layers.Flatten()) model.add(tf.keras.layers.Dense(200,activation='relu')) model.add(tf.keras.layers.Dense(100,activation='relu')) model.add(tf.keras.layers.Dense(50,activation='relu')) model.add(tf.keras.layers.Dense(25,activation='tanh')) return model
代价函数
较为重要的是构建代价函数,基于GAN的思想,encoder
和decoder
可以看成一个整体的生成器,于是两个网络的loss
都设置为了判别器把生成的轨迹判别为真的的交叉熵。
而discriminator
的loss
设置为把真实轨迹识别为真的交叉熵和把生成轨迹识别为假的交叉熵的和。基本思路仍然是两个网络在对抗训练中互相收敛。
代码如下
def discriminator_loss(real_output,fake_output): real_loss = cross_entropy(tf.ones_like(real_output),real_output) fake_loss = cross_entropy(tf.zeros_like(fake_output),fake_output) total_loss = real_loss + fake_loss return total_loss def encode_loss(fake_output): return cross_entropy(tf.ones_like(fake_output),fake_output) def decode_loss(fake_output): return cross_entropy(tf.ones_like(fake_output),fake_output)
梯度下降
encoder_optimizer=tf.keras.optimizers.Adam(1e-4) decoder_optimizer=tf.keras.optimizers.Adam(1e-4) discriminator_optimizer=tf.keras.optimizers.Adam(1e-4) ## 该注解将会使下面的函数被“编译” @ tf.function def train_step(track): with tf.GradientTape() as encoder_tape, tf.GradientTape() as decoder_tape, tf.GradientTape() as disc_tape: en_track = encoder(track,training = True) de_track = decoder(en_track,training = True) real_output = discriminator(track,training = True) fake_output = discriminator(de_track,training =True) en_loss = encode_loss(fake_output) de_loss = decode_loss(fake_output) disc_loss = discriminator_loss(real_output,fake_output) gradients_of_encoder = encoder_tape.gradient(en_loss,encoder.trainable_variables) gradients_of_decoder = decoder_tape.gradient(de_loss,decoder.trainable_variables) gradients_of_disc = disc_tape.gradient(disc_loss,discriminator.trainable_variables) encoder_optimizer.apply_gradients(zip(gradients_of_encoder,encoder.trainable_variables)) decoder_optimizer.apply_gradients(zip(gradients_of_decoder,decoder.trainable_variables)) discriminator_optimizer.apply_gradients(zip(gradients_of_disc,discriminator.trainable_variables)) return en_loss, de_loss , disc_loss
可视化
def plot_curve(var): var = tf.squeeze(var) x = tf.squeeze(var[:,0]) y=tf.squeeze(var[:,1]) plt.cla() plt.scatter(x,y) plt.xlabel('longitude') plt.ylabel('latitude') plt.show() plt.pause(0.1)
完整代码如下:
tensorflow:2.0
import time import pickle import numpy as np import matplotlib.pyplot as plt import tensorflow as tf import os # from IPython import display print(tf.__version__) data_path = '/home/jason/PycharmProjects/AIS_ZIP/data/data_02.pickle' f= open('/home/jason/PycharmProjects/AIS_ZIP/data/data_02.pickle','rb') try: data=pickle.load(f) except: pass data = np.array(data) def make_encoder(): model = tf.keras.Sequential() model.add(tf.keras.layers.Flatten()) model.add(tf.keras.layers.Dense(200,activation='relu')) model.add(tf.keras.layers.Dense(200,activation='tanh')) model.add(tf.keras.layers.Dense(50,activation='relu')) model.add(tf.keras.layers.Dense(25,activation='relu')) model.add(tf.keras.layers.Dense(20,activation='tanh')) model.add(tf.keras.layers.Reshape((100,2))) return model def make_decoder(): model = tf.keras.Sequential() model.add(tf.keras.layers.Flatten()) model.add(tf.keras.layers.Dense(20, activation='relu')) model.add(tf.keras.layers.Dense(25, activation='relu')) model.add(tf.keras.layers.Dense(50, activation='relu')) model.add(tf.keras.layers.Dense(200, activation='relu')) model.add(tf.keras.layers.Dense(200, activation=None)) model.add(tf.keras.layers.Reshape((100,2))) return model def make_discriminator_model(): model= tf.keras.Sequential() model.add(tf.keras.layers.Flatten()) model.add(tf.keras.layers.Dense(200,activation='relu')) model.add(tf.keras.layers.Dense(100,activation='relu')) model.add(tf.keras.layers.Dense(50,activation='relu')) model.add(tf.keras.layers.Dense(25,activation='tanh')) model.add(tf.keras.layers.Dense(1)) return model def plot_curve(var): var = tf.squeeze(var) x = tf.squeeze(var[:,0]) y=tf.squeeze(var[:,1]) plt.cla() plt.scatter(x,y) plt.show() plt.pause(0.1) encoder=make_encoder() decoder = make_decoder() discriminator =make_discriminator_model() data1 = tf.convert_to_tensor(np.reshape(data[5,:,:],(-1,100,2)),dtype=tf.float32) # enl = encoder(data1) # decl = decoder(enl) print(data1) plot_curve(data1) # decision = discriminator(decl) # # plot_curve(decl) cross_entropy =tf.keras.losses.BinaryCrossentropy(from_logits=True) def discriminator_loss(real_output,fake_output): real_loss = cross_entropy(tf.ones_like(real_output),real_output) fake_loss = cross_entropy(tf.zeros_like(fake_output),fake_output) total_loss = real_loss + fake_loss return total_loss def encode_loss(fake_output): return cross_entropy(tf.ones_like(fake_output),fake_output) def decode_loss(fake_output): return cross_entropy(tf.ones_like(fake_output),fake_output) encoder_optimizer=tf.keras.optimizers.Adam(1e-4) decoder_optimizer=tf.keras.optimizers.Adam(1e-4) discriminator_optimizer=tf.keras.optimizers.Adam(1e-4) checkpointer_dir = './training_checkpoints' checkpointer_prefix = os.path.join(checkpointer_dir,"ckpt") checkpoint = tf.train.Checkpoint(encoder_optimizer=encoder_optimizer, decoder_optimizer=decoder_optimizer, discriminator_optimizer=discriminator_optimizer, encoder=encoder, decoder=decoder, discriminator=discriminator ) EPOCHS = 500 ## 该注解将会使下面的函数被“编译” @ tf.function def train_step(track): with tf.GradientTape() as encoder_tape, tf.GradientTape() as decoder_tape, tf.GradientTape() as disc_tape: en_track = encoder(track,training = True) de_track = decoder(en_track,training = True) real_output = discriminator(track,training = True) fake_output = discriminator(de_track,training =True) en_loss = encode_loss(fake_output) de_loss = decode_loss(fake_output) disc_loss = discriminator_loss(real_output,fake_output) gradients_of_encoder = encoder_tape.gradient(en_loss,encoder.trainable_variables) gradients_of_decoder = decoder_tape.gradient(de_loss,decoder.trainable_variables) gradients_of_disc = disc_tape.gradient(disc_loss,discriminator.trainable_variables) encoder_optimizer.apply_gradients(zip(gradients_of_encoder,encoder.trainable_variables)) decoder_optimizer.apply_gradients(zip(gradients_of_decoder,decoder.trainable_variables)) discriminator_optimizer.apply_gradients(zip(gradients_of_disc,discriminator.trainable_variables)) return en_loss, de_loss , disc_loss def train(dataset,epochs): for epoch in range(epochs): start = time.time() for batch in dataset: en_loss, de_loss, disc_loss=train_step(batch) print(en_loss, de_loss , disc_loss) generate_and_save_images(encoder,decoder,epoch + 1,data1) if (epochs+1)%15==0: checkpoint.save(file_prefix = checkpointer_prefix) print('Time for epochs {} is {} sec'.format(epoch+1,time.time()-start)) generate_and_save_images(encoder,decoder, epoch + 1, data1) def generate_and_save_images(model1, model2, epoch, test_input): # 注意 training` 设定为 False # 因此,所有层都在推理模式下运行(batchnorm)。 entrack = model1(test_input, training=False) detrack = model2(entrack, training=False) for i in range(detrack.shape[0]): plot_curve(detrack[i, :, :]) plt.savefig('/pics/image_at_epoch_{:04d}.png'.format(epoch)) plt.ion() BUFFER_SIZE =93 BATCH_SIZE = 31 train_dataset = tf.data.Dataset.from_tensor_slices(data).shuffle(BUFFER_SIZE).batch(BATCH_SIZE) train(train_dataset,EPOCHS)