重拾基礎 - PolicyGradient (策略梯度)

重拾基礎 - PolicyGradient (策略梯度)

PolicyGradient

本文以及代碼的jupyter-notebook在

Ceruleanacg/Learning-Notes?

github.com圖標

問題設定

在小車倒立桿(CartPole)遊戲中,我們希望通過強化學習訓練一個智能體(agent),儘可能不斷地左右移動小車,使得小車上的桿不倒,我們首先定義CartPole遊戲:

CartPole遊戲即是強化學習模型的enviorment,它與agent交互,實時更新state,內部定義了reward function,其中state有以下定義:

state in mathbb{R}^4

state每一個維度分別代表了:

  • 小車位置,它的取值範圍是-2.4到2.4
  • 小車速度,它的取值範圍是負無窮到正無窮
  • 桿的角度,它的取值範圍是-41.8°到41.8°
  • 桿的角速,它的取值範圍是負無窮到正無窮

action是一個2維向量,每一個維度分別代表向左和向右移動。

action in mathbb{R}^2

小車每一次向左或向右移動都會加1分,這即是reward function,但是如果桿角度大於±12°、小車位置大於±2.4、行動次數大於200次,遊戲將會結束。我們希望在遊戲結束時得分儘可能大。

策略梯度

設計一個網路,其輸入是state,輸出是對應各個action的概率,並策略梯度(PolicyGradient)進行迭代訓練。

我們首先定義 	au 為一次回合的跡:

	au = {s_1, a_1, r_1, cdots, s_T, a_T, r_T }

R(	au) 是這次跡的獎勵值之和:

R(	au) = sum^{T}_{t=1} r_t

直觀地,我們希望最大化:

ar{R}_{	heta} = sum_{	au} R(	au) P(	au lvert 	heta) approx frac{1}{N} sum^{N}_{n=1} R(	au^{n})

則首先對 ar{R}_{	heta} 求梯度:

egin{align} 
abla ar{R}_{	heta} &= sum_{	au} R(	au) 
abla P(	au lvert 	heta) \ &= sum_{	au} R(	au) P(	au lvert 	heta) cdot frac{
abla P(	au lvert 	heta)}{P(	au lvert 	heta)} \ &= sum_{	au} R(	au) P(	au lvert 	heta) cdot 
abla log P(	au lvert 	heta) \ &approx frac{1}{N} sum^{N}_{n=1} R(	au^n) cdot 
abla log P(	au^n lvert 	heta) end{align}

而對於 P(	au^n lvert 	heta) ,則可以展開成以下形式:

egin{align} p(	au^n lvert 	heta) &= p(s_1)p(a_1 lvert s_1, 	heta)p(r_1, s_2 lvert s_1, a_1)p(a_2 lvert s_2, 	heta) cdots p(a_t lvert s_t, 	heta)p(r_t, s_{t+1} lvert s_t, a_t) \ &= p(s_1) prod_{t} p(a_t lvert s_t, 	heta)p(r_t, s_{t+1} lvert s_t, a_t) end{align}

將上式帶入 log P(	au^n lvert 	heta) 中:

egin{align} 
abla log P(	au^n lvert 	heta) &= 
abla log left (p(s_1) prod_{t} p(a_t lvert s_t, 	heta)p(r_t, s_{t+1} lvert s_t, a_t) 
ight) \ &= 
abla log p(s_1) + sum^{T}_{t=1} 
abla log p(a_t lvert s_t, 	heta) + sum^{T}_{t=1} 
abla p(r_t, s_{t+1} lvert s_t, a_t) \ &= sum^{T}_{t=1} 
abla log p(a_t lvert s_t, 	heta) end{align}

最終 
abla ar{R}_{	heta} 將改寫為:

egin{align} 
abla ar{R}_{	heta} &approx frac{1}{N} sum^{1}_{N} R(	au^n) cdot 
abla log P(	au^n lvert 	heta) \ &= frac{1}{N} sum^{N}_{n=1} R(	au^n) sum^{T_n}_{t=1} 
abla log p(a_t lvert s_t, 	heta) \ &= frac{1}{N} sum^{N}_{n=1} sum^{T_n}_{t=1} R(	au^n) 
abla log p(a_t lvert s_t, 	heta) end{align}

本質上是最小化N回合採樣出的action與網路輸出的action的交叉熵的基礎上乘以 R(	au^n)

- sum^{N}_{n=1} R(	au^n) cdot a_i log p_i

需要注意的是, R(	au^n) 對於不同的問題計算方式是不同的,在CartPole中,我們更關注回合開始時的獎勵,因為他們直接影響了我們是否有機會進行更可能多的動作,所以在這個問題中, R(	au^n) 是這樣計算的:

# Copy r_bufferr_buffer = self.r_buffer# Init r_taur_tau = 0# Calculate r_taufor index in reversed(range(0, len(r_buffer))): r_tau = r_tau * self.gamma + r_buffer[index]

R(	au^n) 的正負直接決定了梯度下降的方向,對於訓練過程的收斂至關重要。

代碼實現

首先導入必要包:

import tensorflow as tfimport numpy as npimport gymimport syssys.path.append(.)

實現Agent類:

class Agent(object): def __init__(self, a_space, s_space, **options): self.session = tf.Session() self.a_space, self.s_space = a_space, s_space self.s_buffer, self.a_buffer, self.r_buffer = [], [], [] self._init_options(options) self._init_input() self._init_nn() self._init_op() def _init_input(self): self.s = tf.placeholder(tf.float32, [None, self.s_space]) self.r = tf.placeholder(tf.float32, [None, ]) self.a = tf.placeholder(tf.int32, [None, ]) def _init_nn(self): # Kernel init. w_init = tf.random_normal_initializer(.0, .3) # Dense 1. dense_1 = tf.layers.dense(self.s, 32, tf.nn.relu, kernel_initializer=w_init) # Dense 2. dense_2 = tf.layers.dense(dense_1, 32, tf.nn.relu, kernel_initializer=w_init) # Action logits. self.a_logits = tf.layers.dense(dense_2, self.a_space, kernel_initializer=w_init) # Action prob. self.a_prob = tf.nn.softmax(self.a_logits) def _init_op(self): # One hot action. action_one_hot = tf.one_hot(self.a, self.a_space) # Calculate cross entropy. cross_entropy = tf.nn.softmax_cross_entropy_with_logits(labels=action_one_hot, logits=self.a_logits) self.loss_func = tf.reduce_mean(cross_entropy * self.r) self.train_op = tf.train.AdamOptimizer(self.learning_rate).minimize(self.loss_func) self.session.run(tf.global_variables_initializer()) def _init_options(self, options): try: self.learning_rate = options[learning_rate] except KeyError: self.learning_rate = 0.001 try: self.gamma = options[gamma] except KeyError: self.gamma = 0.95 def predict(self, state): action_prob = self.session.run(self.a_prob, feed_dict={self.s: state[np.newaxis, :]}) return np.random.choice(range(action_prob.shape[1]), p=action_prob.ravel()) def save_transition(self, state, action, reward): self.s_buffer.append(state) self.a_buffer.append(action) self.r_buffer.append(reward) def train(self): # Copy r_buffer r_buffer = self.r_buffer # Init r_tau r_tau = 0 # Calculate r_tau for index in reversed(range(0, len(r_buffer))): r_tau = r_tau * self.gamma + r_buffer[index] self.r_buffer[index] = r_tau # Minimize loss. _, loss = self.session.run([self.train_op, self.loss_func], feed_dict={ self.s: self.s_buffer, self.a: self.a_buffer, self.r: self.r_buffer, }) self.s_buffer, self.a_buffer, self.r_buffer = [], [], []

實驗結果

通過gym初始化CartPole遊戲環境並執行訓練:

import matplotlib.pyplot as plt%matplotlib inlineenv = gym.make(CartPole-v0)env.seed(1)env = env.unwrappedmodel = Agent(env.action_space.n, env.observation_space.shape[0])r_sum_list, r_episode_sum = [], Nonefor episode in range(500): # Reset env. s, r_episode = env.reset(), 0 # Start episode. while True: # if episode > 80: # env.render() # Predict action. a = model.predict(s) # Iteration. s_n, r, done, _ = env.step(a) if done: r = -5 r_episode += r # Save transition. model.save_transition(s, a, r) s = s_n if done: if r_episode_sum is None: r_episode_sum = sum(model.r_buffer) else: r_episode_sum = r_episode_sum * 0.99 + sum(model.r_buffer) * 0.01 r_sum_list.append(r_episode_sum) break # Start train. model.train() if episode % 50 == 0: print("Episode: {} | Reward is: {}".format(episode, r_episode))

Episode: 0 | Reward is: 17.0Episode: 50 | Reward is: 71.0Episode: 100 | Reward is: 26.0Episode: 150 | Reward is: 50.0Episode: 200 | Reward is: 102.0Episode: 250 | Reward is: 194.0Episode: 300 | Reward is: 197.0Episode: 350 | Reward is: 71.0Episode: 400 | Reward is: 147.0Episode: 450 | Reward is: 182.0

最後繪製出回合與獎勵函數的曲線:

plt.plot(np.arange(len(r_sum_list)), r_sum_list)plt.title(Actor Only on CartPole)plt.xlabel(Episode)plt.ylabel(Total Reward)plt.show()

後續

  • 一堆坑還沒填。

推薦閱讀:

TAG:深度學習DeepLearning | 機器學習 | 強化學習ReinforcementLearning |