策略梯度
正如前面所讨论的,PG 算法通过遵循更高回报的梯度来优化策略参数。一种流行的 PG 算法,称为增强算法,在 1929 由 Ronald Williams 提出。这是一个常见的变体:
首先,让神经网络策略玩几次游戏,并在每一步计算梯度,这使得智能体更可能选择行为,但不应用这些梯度。
运行几次后,计算每个动作的得分(使用前面段落中描述的方法)。
如果一个动作的分数是正的,这意味着动作是好的,可应用较早计算的梯度,以便将来有更大的的概率选择这个动作。但是,如果分数是负的,这意味着动作是坏的,要应用负梯度来使得这个动作在将来采取的可能性更低。我们的方法就是简单地将每个梯度向量乘以相应的动作得分。
最后,计算所有得到的梯度向量的平均值,并使用它来执行梯度下降步骤。
让我们使用 TensorFlow 实现这个算法。我们将训练我们早先建立的神经网络策略,让它学会平衡车上的平衡杆。让我们从完成之前编码的构造阶段开始,添加目标概率、代价函数和训练操作。因为我们的意愿是选择的动作是最好的动作,如果选择的动作是动作 0(左),则目标概率必须为 1,如果选择动作 1(右)则目标概率为 0:
y = 1. - tf.to_float(action)
现在我们有一个目标概率,我们可以定义损失函数(交叉熵)并计算梯度:
learning_rate = 0.01
cross_entropy = tf.nn.sigmoid_cross_entropy_with_logits( labels=y, logits=logits)
optimizer = tf.train.AdamOptimizer(learning_rate)
grads_and_vars = optimizer.compute_gradients(cross_entropy)
注意,我们正在调用优化器的compute_gradients()
方法,而不是minimize()
方法。这是因为我们想要在使用它们之前调整梯度。compute_gradients()
方法返回梯度向量/变量对的列表(每个可训练变量一对)。让我们把所有的梯度放在一个列表中,以便方便地获得它们的值:
gradients = [grad for grad, variable in grads_and_vars]
好,现在是棘手的部分。在执行阶段,算法将运行策略,并在每个步骤中评估这些梯度张量并存储它们的值。在多次运行之后,它如先前所解释的调整这些梯度(即,通过动作分数乘以它们并使它们归一化),并计算调整后的梯度的平均值。接下来,需要将结果梯度反馈到优化器,以便它可以执行优化步骤。这意味着对于每一个梯度向量我们需要一个占位符。此外,我们必须创建操作去应用更新的梯度。为此,我们将调用优化器的apply_gradients()
函数,该函数接受梯度向量/变量对的列表。我们不给它原始的梯度向量,而是给它一个包含更新梯度的列表(即,通过占位符递送的梯度):
gradient_placeholders = []
grads_and_vars_feed = []
for grad, variable in grads_and_vars:
gradient_placeholder = tf.placeholder(tf.float32, shape=grad.get_shape())
gradient_placeholders.append(gradient_placeholder)
grads_and_vars_feed.append((gradient_placeholder, variable))
training_op = optimizer.apply_gradients(grads_and_vars_feed)
让我们后退一步,看看整个运行过程:
n_inputs = 4
n_hidden = 4
n_outputs = 1
initializer = tf.contrib.layers.variance_scaling_initializer()
learning_rate = 0.01
X = tf.placeholder(tf.float32, shape=[None, n_inputs])
hidden = fully_connected(X, n_hidden, activation_fn=tf.nn.elu,weights_initializer=initializer)
logits = fully_connected(hidden, n_outputs, activation_fn=None, weights_initializer=initializer)
outputs = tf.nn.sigmoid(logits)
p_left_and_right = tf.concat(axis=1, values=[outputs, 1 - outputs])
action = tf.multinomial(tf.log(p_left_and_right), num_samples=1)
y = 1. - tf.to_float(action)
cross_entropy = tf.nn.sigmoid_cross_entropy_with_logits(labels=y, logits=logits)
optimizer = tf.train.AdamOptimizer(learning_rate)
grads_and_vars = optimizer.compute_gradients(cross_entropy)
gradients = [grad for grad, variable in grads_and_vars]
gradient_placeholders = []
grads_and_vars_feed = []
for grad, variable in grads_and_vars:
gradient_placeholder = tf.placeholder(tf.float32, shape=grad.get_shape()) gradient_placeholders.append(gradient_placeholder)
grads_and_vars_feed.append((gradient_placeholder, variable))
training_op = optimizer.apply_gradients(grads_and_vars_feed)
init = tf.global_variables_initializer()
saver = tf.train.Saver()
到执行阶段了!我们将需要两个函数来计算总折扣奖励,给予原始奖励,以及归一化多次循环的结果:
def discount_rewards(rewards, discount_rate):
discounted_rewards = np.empty(len(rewards))
cumulative_rewards = 0
for step in reversed(range(len(rewards))):
cumulative_rewards = rewards[step] + cumulative_rewards * discount_rate discounted_rewards[step] = cumulative_rewards
return discounted_rewards
def discount_and_normalize_rewards(all_rewards, discount_rate):
all_discounted_rewards = [discount_rewards(rewards) for rewards in all_rewards]
flat_rewards = np.concatenate(all_discounted_rewards)
reward_mean = flat_rewards.mean()
reward_std = flat_rewards.std()
return [(discounted_rewards - reward_mean)/reward_std for discounted_rewards in all_discounted_rewards]
让我们检查一下运行的如何:
>>> discount_rewards([10, 0, -50], discount_rate=0.8)
array([-22., -40., -50.])
>>> discount_and_normalize_rewards([[10, 0, -50], [10, 20]], discount_rate=0.8)
[array([-0.28435071, -0.86597718, -1.18910299]), array([ 1.26665318, 1.0727777 ])]
对discount_rewards()
的调用正好返回我们所期望的(见图 16-6)。你也可以验证函数iscount_and_normalize_rewards()
确实返回了两个步骤中每个动作的标准化分数。注意第一步比第二步差很多,所以它的归一化分数都是负的;从第一步开始的所有动作都会被认为是坏的,反之,第二步的所有动作都会被认为是好的。
我们现在有了训练策略所需的一切:
n_iterations = 250 # 训练迭代次数
n_max_steps = 1000 # 每一次的最大步长
n_games_per_update = 10 # 每迭代十次训练一次策略网络
save_iterations = 10 # 每十次迭代保存模型
discount_rate = 0.95
with tf.Session() as sess:
init.run()
for iteration in range(n_iterations):
all_rewards = [] # 每一次的所有奖励
all_gradients = [] # 每一次的所有梯度
for game in range(n_games_per_update):
current_rewards = [] # 当前步的所有奖励
current_gradients = [] # 当前步的所有梯度
obs = env.reset()
for step in range(n_max_steps):
action_val, gradients_val = sess.run([action, gradients],
feed_dict={X: obs.reshape(1, n_inputs)}) # 一个obs
obs, reward, done, info = env.step(action_val[0][0]) current_rewards.append(reward)
current_gradients.append(gradients_val)
if done:
break
all_rewards.append(current_rewards)
all_gradients.append(current_gradients)
# 此时我们每10次运行一次策略,我们已经准备好使用之前描述的算法去更新策略,注:即使用迭代10次的结果来优化当前的策略。
all_rewards = discount_and_normalize_rewards(all_rewards)
feed_dict = {}
for var_index, grad_placeholder in enumerate(gradient_placeholders):
# 将梯度与行为分数相乘,并计算平均值
mean_gradients = np.mean([reward * all_gradients[game_index][step][var_index] for game_index, rewards in enumerate(all_rewards) for step, reward in enumerate(rewards)],axis=0)
feed_dict[grad_placeholder] = mean_gradients
sess.run(training_op, feed_dict=feed_dict)
if iteration % save_iterations == 0:
saver.save(sess, "./my_policy_net_pg.ckpt")
每一次训练迭代都是通过运行10次的策略开始的(每次最多 1000 步,以避免永远运行)。在每一步,我们也计算梯度,假设选择的行动是最好的。在运行了这 10 次之后,我们使用discount_and_normalize_rewards()
函数计算动作得分;我们遍历每个可训练变量,在所有次数和所有步骤中,通过其相应的动作分数来乘以每个梯度向量;并且我们计算结果的平均值。最后,我们运行训练操作,给它提供平均梯度(对每个可训练变量提供一个)。我们继续每 10 个训练次数保存一次模型。
我们做完了!这段代码将训练神经网络策略,它将成功地学会平衡车上的平衡杆(你可以在 Juyter notebook 上试用)。注意,实际上有两种方法可以让玩家游戏结束:要么平衡可以倾斜太大,要么车完全脱离屏幕。在 250 次训练迭代中,策略学会平衡极点,但在避免脱离屏幕方面还不够好。额外数百次的训练迭代可以解决这一问题。
研究人员试图找到一种即使当智能体最初对环境一无所知时也能很好地工作的算法。然而,除非你正在写论文,否则你应该尽可能多地将先前的知识注入到智能体中,因为它会极大地加速训练。例如,你可以添加与屏幕中心距离和极点角度成正比的负奖励。此外,如果你已经有一个相当好的策略,你可以训练神经网络模仿它,然后使用策略梯度来改进它。
尽管它相对简单,但是该算法是非常强大的。你可以用它来解决更难的问题,而不仅仅是平衡一辆手推车上的平衡杆。事实上,AlgPaGo 是基于类似的 PG 算法(加上蒙特卡罗树搜索,这超出了本书的范围)。
现在我们来看看另一个流行的算法。与 PG 算法直接尝试优化策略以增加奖励相反,我们现在看的算法是间接的:智能体学习去估计每个状态的未来衰减奖励的期望总和,或者在每个状态中的每个行为未来衰减奖励的期望和。然后,使用这些知识来决定如何行动。为了理解这些算法,我们必须首先介绍马尔可夫决策过程(MDP)。