一个建立强化学习模型的教程
DeepMind在2013年出版的一份名为《深度强化学习》(Playing Atari)的出版物介绍了一种新的深度学习模式,用于强化学习,并证明了它能够掌握Atari 2600电脑游戏的复杂控制策略,只使用原始像素作为输入。在本教程中,我将使用Keras实现本文。我们将从强化学习的基础开始,然后深入到代码中,以获得实际的理解。
步骤:
在浏览器(JavaScript)和模型(Python)之间构建双向接口
捕获和预处理图像
训练模型
评估
开始
在您设置了环境之后,克隆GitHub库,并在jupyter notebook上工作。
git clone https://github.com/Paperspace/DinoRunTutorial.git
Reinforcement Learning Dino Run.ipynb
确保运行init_cache()第一次初始化文件系统结构。
强化学习
对于许多人来说,这可能是个新词,但我们每个人都学会了使用强化学习的概念,而这就是我们的大脑如何运作的。奖励系统是任何RL算法的基础。如果我们再回到孩子走路的比喻上,一个积极的奖励就是父母的掌声,或者是有能力得到糖果,而消极的奖励就不是糖果了。孩子在开始走路前先学会站起来。在人工智能方面,在我们的例子中,一个代理的主要目标是通过在环境中执行特定的操作序列来最大化一个特定的数字奖励。RL最大的挑战是缺乏监督(标记数据)来指导代理。它必须自己探索和学习。代理从随机执行动作开始,观察每个动作所带来的奖励,并学会在面对类似的环境状态时预测最佳的动作。
Q-learning
我们使用Q-learning,一种RL技术,在这里我们尝试近似一个特殊的函数,它驱动任何环境状态序列的动作选择策略。Q- Learning是一种无模型的强化学习的实现,在这种情况下,对每个状态、所采取的操作和得到的奖励都保持Q值表。一个示例q表应该告诉我们数据是如何构造的。在我们的例子中,状态是游戏屏幕截图和动作,什么都不做,然后跳转[0,1]
我们利用深度神经网络通过回归来解决这个问题,并选择具有最高预测Q值的动作。
Setup
让我们设置我们的环境来开始训练过程。
1.选择虚拟机:我们需要一个完整的桌面环境,在这里我们可以捕获和利用屏幕截图进行培训。我选择了一个Paperspace ML-in-a-box(MLIAB)Ubuntu镜像。MLIAB的优势在于它预装了Anaconda和许多其他ML库。
2. 配置和安装Keras以使用GPU:我们需要安装keras和tensorflow的GPU版本Paperspace的VM具有预先安装的这些VM,但是如果没有,请安装下述进行安装
pip install keras
pip install tensorflow
另外,确保GPU可以被设置识别。执行下面的python代码,你应该看到可用的GPU设备
from keras import backend as K
K.tensorflow_backend._get_available_gpus()
3. 安装依赖关系
游戏框架
您可以通过将浏览器指向chrome:// dino或仅通过拔下网络插头来启动游戏。如果我们打算修改游戏代码,则从chromium的开源存储库中提取游戏。
我们的模型是用python编写的,游戏是用JavaScript编写的,我们需要一些接口工具让他们相互沟通。
Selenium是一种流行的浏览器自动化工具,用于向浏览器发送操作,并获取当前分数等不同的游戏参数。
现在我们有一个接口来发送动作到游戏中,我们需要一种机制来捕获游戏屏幕
Selenium和OpenCV分别为屏幕捕获和图像预处理提供了最佳性能,实现了6-7 fps的下降帧率。
我们每个时间帧只需要4帧,足以将速度作为一项功能来学习
游戏模块
我们使用这个模块实现了Python和JavaScript之间的接口。下面的代码片段会给你一个关于模块中发生的事情的要点。
class Game:
def __init__(self):
self._driver = webdriver.Chrome(executable_path = chrome_driver_path)
self._driver.set_window_position(x=-10,y=0)
self._driver.get(game_url)
def restart(self):
self._driver.execute_script("Runner.instance_.restart()")
def press_up(self):
self._driver.find_element_by_tag_name("body").send_keys(Keys.ARROW_UP)
def get_score(self):
score_array = self._driver.execute_script("return Runner.instance_.distanceMeter.digits")
score = ''.join(score_array).
return int(score)
代理模块
我们使用代理模块来封装所有的接口。我们使用此模块控制Dino,并获取环境中的代理状态。
class DinoAgent:
def __init__(self,game): #takes game as input for taking actions
self._game = game;
self.jump(); #to start the game, we need to jump once
def is_crashed(self):
return self._game.get_crashed()
def jump(self):
self._game.press_up()
游戏状态模块
为了将动作发送到模块并获得由于该动作而导致环境过渡的结果状态,我们使用游戏状态模块。它通过接收和执行操作来简化流程,决定奖励并返回经验元组。
class Game_sate:
def __init__(self,agent,game):
self._agent = agent
self._game = game
def get_state(self,actions):
score = self._game.get_score()
reward = 0.1 #survival reward
is_over = False #game over
if actions[1] == 1: #else do nothing
self._agent.jump()
image = grab_screen(self._game._driver)
if self._agent.is_crashed():
self._game.restart()
reward = -1
is_over = True
return image, reward, is_over #return the Experience tuple
图像管道
图像捕获
我们可以通过多种方式捕获游戏屏幕,例如使用PIL和MSS python库截取整个屏幕和裁剪区域。然而,最大的缺点是对屏幕分辨率和窗口位置的敏感度。幸运的是,游戏使用了HTML Canvas。我们可以使用JavaScript轻松获得base64格式的图像。我们使用selenium来运行这个脚本。
#javascript code to get the image data from canvas
var canvas = document.getElementsByClassName('runner-canvas')[0];
var img_data = canvas.toDataURL()
return img_data
def grab_screen(_driver = None):
image_b64 = _driver.execute_script(getbase64Script)
screen = np.array(Image.open(BytesIO(base64.b64decode(image_b64))))
image = process_img(screen)#processing image as required
return image
图像处理
采集的原始图像的分辨率约为600x150,具有3个(RGB)通道。我们打算使用4个连续的屏幕截图作为模型的单个输入。这使得我们的尺寸为600x150x3x4的单一输入。这在计算上是昂贵的,并不是所有的功能都可用于玩游戏。所以我们使用OpenCV库来调整,裁剪和处理图像。最终的处理输入仅为80x80像素和单通道(灰度)。
def process_img(image):
image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
image = image[:300, :500]
return image
模型架构
所以我们得到了输入和一种方法来利用模型的输出来玩游戏,让我们看看模型架构。
在将它们flattening 成dense layers和输出层之前,我们使用一系列三个卷积层。仅限CPU的模型不包含池化层,因为我已经删除了许多功能,并且添加池化层会导致已稀疏功能的显着损失。但借助GPU的强大功能,我们可以容纳更多功能,而不会降低帧频。
最大池图层显着改善了密集特征集的处理。
我们的输出图层由两个神经元组成,每个神经元代表每个动作的最大预测回报。然后我们选择最大回报(Q值)的行动:
def buildmodel():
print("Now we build the model")
model = Sequential()
model.add(Conv2D(32, (8, 8), padding='same',strides=(4, 4),input_shape=(img_cols,img_rows,img_channels))) #80*80*4
model.add(MaxPooling2D(pool_size=(2,2)))
model.add(Activation('relu'))
model.add(Conv2D(64, (4, 4),strides=(2, 2), padding='same'))
model.add(MaxPooling2D(pool_size=(2,2)))
model.add(Activation('relu'))
model.add(Conv2D(64, (3, 3),strides=(1, 1), padding='same'))
model.add(MaxPooling2D(pool_size=(2,2)))
model.add(Activation('relu'))
model.add(Flatten())
model.add(Dense(512))
model.add(Activation('relu'))
model.add(Dense(ACTIONS))
adam = Adam(lr=LEARNING_RATE)
model.compile(loss='mse',optimizer=adam)
print("We finish building the model")
return model
训练
这些是训练阶段发生的事情
以无动作开始并获得初始状态(s_t)
观察游戏中观察的步骤数量
预测并执行操作
在Replay Memory中存储体验
从Replay Memory中随机选择一个批次并在其上训练模型
如果游戏结束重新开始
这个Python代码很长,但很容易理解:
def trainNetwork(model,game_state):
# store the previous observations in replay memory
D = deque() #experience replay memory
# get the first state by doing nothing
do_nothing = np.zeros(ACTIONS)
do_nothing[0] =1 #0 => do nothing,
#1=> jump
x_t, r_0, terminal = game_state.get_state(do_nothing) # get next step after performing the action
s_t = np.stack((x_t, x_t, x_t, x_t), axis=2).reshape(1,20,40,4) # stack 4 images to create placeholder input reshaped 1*20*40*4
OBSERVE = OBSERVATION
epsilon = INITIAL_EPSILON
t = 0
while (True): #endless running
loss = 0
Q_sa = 0
action_index = 0
r_t = 0 #reward at t
a_t = np.zeros([ACTIONS]) # action at t
q = model.predict(s_t) #input a stack of 4 images, get the prediction
max_Q = np.argmax(q) # chosing index with maximum q value
action_index = max_Q
a_t[action_index] = 1 # o=> do nothing, 1=> jump
#run the selected action and observed next state and reward
x_t1, r_t, terminal = game_state.get_state(a_t)
x_t1 = x_t1.reshape(1, x_t1.shape[0], x_t1.shape[1], 1) #1x20x40x1
s_t1 = np.append(x_t1, s_t[:, :, :, :3], axis=3) # append the new image to input stack and remove the first one
D.append((s_t, action_index, r_t, s_t1, terminal))# store the transition
#only train if done observing; sample a minibatch to train on
trainBatch(random.sample(D, BATCH)) if t > OBSERVE else 0
s_t = s_t1
t += 1
请注意,我们正在从重放记忆中抽取32次随机体验回放,并使用批量的训练方法。其原因是游戏结构中的动作分配不平衡,以及避免过度拟合。
def trainBatch(minibatch):
for i in range(0, len(minibatch)):
loss = 0
inputs = np.zeros((BATCH, s_t.shape[1], s_t.shape[2], s_t.shape[3])) #32, 20, 40, 4
targets = np.zeros((inputs.shape[0], ACTIONS)) #32, 2
state_t = minibatch[i][0] # 4D stack of images
action_t = minibatch[i][1] #This is action index
reward_t = minibatch[i][2] #reward at state_t due to action_t
state_t1 = minibatch[i][3] #next state
terminal = minibatch[i][4] #wheather the agent died or survided due the action
inputs[i:i + 1] = state_t
targets[i] = model.predict(state_t) # predicted q values
Q_sa = model.predict(state_t1) #predict q values for next step
if terminal:
targets[i, action_t] = reward_t # if terminated, only equals reward
else:
targets[i, action_t] = reward_t + GAMMA * np.max(Q_sa)
loss += model.train_on_batch(inputs, targets)
结果
我们应该能够通过使用这种架构获得良好的结果。GPU显着改善了结果,可以通过平均分数的提高进行验证。下图显示了训练开始时的平均分数。训练结束时,每10场比赛的平均得分远远高于1000。
最高记录是4000+,远远超出了之前250的模型(而且超出了大多数人的能力!)该图表显示了训练期间游戏的最高分数的进步(scale = 10)。
恐龙的速度与分数成正比,使它更难被发现并以更高的速度决定一个动作。整个游戏都是在恒定速度下进行的。