pytorch例子-強化學習(DQN)

一、前言

本案例通過採用DQN模型來訓練一個AI玩CartPole-v0的遊戲。

強化學習演算法

強化學習強調如何基於環境而行動,以取得最大化的預期利益,即有機體如何在環境給予的獎勵或懲罰的刺激下,逐步形成對刺激的預期,產生能獲得最大利益的習慣性行為。

在強化學習的世界裡,演算法稱之為Agent,與環境發生交互,Agent從環境中獲取狀態state,並決定自己要做出的動作action,環境會根據自身的邏輯給Agent予以獎勵(reward)。有正向和反向的激勵。

馬爾科夫決策過程

環境是隨機的,AI可以在環境中做出某些特定的動作,行為可以改變環境,並帶來新的狀態,代理可以再執行下一個動作。我們在選擇這些動作的規則就叫做策略。

狀態與動作的集合加上改變狀態的規則,就組成了一個馬爾科夫決策過程。這個過程的一個情節(episode)形成了狀態、動作與獎勵的有限序列。

s_0, a_0, r_1, s_1, a_1, r_2, s_2, ... ,s_{n-1}, a_{n-1}, r_n, s_n

策略policy

狀態State和動作Action存在映射關係,也就是一個state對應一個action,或者對應不同的概率(概率最高的就是最值得執行的動作)。狀態與動作的關係其實就是輸入與輸出關係,而狀態State到動作Action的過程就稱之為一個策略Policy,一般用ππ來表示,需要找到以下關係:

a=pi(s)或pi(a|s)

增強學習的任務就是找到一個最優的策略policy從而使reward最多。

折扣未來的獎勵

我們在決定動作的時候,不僅要考慮即時的獎勵,也需要考慮得到的未來的獎勵。對於給定的馬爾科夫決策過程的一次運行,我們可以很容易的計算一個情節的總獎勵:

R = r_1 + r_2 + r_3 + ... + r_n

因此,時間t的未來總回報為:

R_t = r_t + r_{t+1} + r_{t+2} + r_{t+3}+...+r_{t+n}

由於環境是隨機的,無法確定相同的動作得到一樣的獎勵,因此需要利用折扣未來獎勵來代替:

R_t = r_t + gamma r_{t+1} + gamma^2 r_{t+2} + ... + gamma^{n-t}r_n

在這裡γ為貼現因子,數值在0,1之間。用遞歸的方式表示:

R_t = r_t + gamma(r_{t+1} + gamma(r_{t+2} +...) = r_t + gamma R_{t+1}

因此,如果我們不考慮未來的收益,那麼γ應該為0,如果認為未來的環境是確定的,相同的動作導致相同的獎勵,可以將貼現因子定義為r=1,如果希望是平衡的思路,那麼γ=0.9。

在具體的投資過程中,市場的風格是經常多變的,所有我們的折扣因子不能為1,當然也不能為0,短期內假定市場風格不會劇烈的變動,那麼設置一個偏向於短期的γ似乎是比較合理的。

Value Function價值函數

我們做決策時有目的的,那就是為了最大化未來的回報Result。根據某個策略和當前狀態,我們選擇了某一個動作,這個動作使得未來的回報最大。

if 某一個決策的價值最大:

  • 選擇這個決策

我們也可以使用策略+價值評估的方法聯合給出決策,這種演算法就是所謂的Actor-Critic演算法。

Bellman方程

一般我們基於以下幾種情況來做評估:

  • 其他人的選擇:看到其他人的投資成功率,我們會降低我們投資股票的價值。
  • 自己反覆試驗:我們通常不是只做一次選擇,而是做了很多次選擇,從而收穫了所謂的「經驗」的東西。我們根據經驗來評估選擇的價值。比如我們做了好幾次投資樓市的選擇,結果大獲成功,因此我們就會認為投資樓市是不錯的選擇。
  • 基於理性分析:我們根據我們已有的知識對當前的情況做分析,從而做出一定的判斷。
  • 基於感性的邏輯:比如選擇投資自己到人工智慧領域。

計算機如何才能評估:

  • 其他人的選擇:沒有其他人
  • 基於理性分析:沒有人類的先驗知識,無法分析,可以先用監督學習然後再用增強學習。
  • 基於感性的邏輯:不行
  • 基於反覆的試驗:可行

價值等於未來的期望回報

v(s) = E[R_{t+1} + lambda v(S_{t+1})|S_t=s]

上面的公式為Bellman方程的基本形態,從公示上看,當前狀態的價值和下一步的價值以及當前的反饋Reward有關。

動作價值函數

如果在狀態轉移之間考慮動作,那麼採取不同的動作,會產生不同的序列,結果獲得的回報也不同,這表明,動作也是存在價值的。顯然,如果知道了每個動作的價值,那麼就可以選擇價值最大的一個動作去執行。這就是Action-Value function Q^{pi}(s,a)

那麼有了上面的定義,動作價值函數就為如下表示:

Q^{pi}(s,a)=E[r_{t+1} + lambda r_{t+2} + lambda^2 r_{t+3}+...|s,a] = E_{s^{}}[r + lambda Q^{pi}(s^{}, a^{})|s, a]

Optimal value function 最優價值函數

能計算動作價值函數是不夠的,因為我們需要的是最優策略,現在求解最優策略等價於求解最優的value function,找到了最優的value function,自然而然策略也就找到了。(DQN是value-base, 後面還有Policy-based, model-based)

Q^{*}(s, a) = max_{pi}Q^{pi}(s,a)

最優的動作價值函數為所有策略下的動作價值函數最大值。通過這樣的定義就可以使最優的動作價值函數唯一。 套用上一屆得到的Value function,可以得到

Q^{*}(s, a) = E_{s^{}}[r + lambda max_{a^{}}Q^{*}(s^{}, a^{})|s, a]

因為最優的Q值必然為最大值,等式右側的Q值必然為使a′取最大的Q值。

策略迭代 Policy Iteration

目的是通過迭代計算value function價值函數的方式來使得policy收斂到最優。

  • policy evaluation策略評估。目的是更新value function,或者說更好的估計基於當前策略的價值。
  • policy improvement策略改進。使用greedy policy產生新的樣本用於第一步的策略評估。

本質上就是使用當前策略產生新的樣本,然後使用新的樣本更好的估計策略的價值,然後利用策略的價值更新策略,然後不斷的反覆。理論上可以證明最終的策略將收斂到最優。

Q-learning演算法描述

  • 初始化Q(s, a),設置Q(terminal-state,.) = 0
  • 重複(對每一節episode):
    • 初始化狀態S
    • 重複 (對episode中的每一步):
      • 使用某一個policy比如(e - greedy)根據狀態S選取一個動作執行。
      • 執行結束後觀察reward和新的狀態S′
      • Q(S_t, A_t) <- Q(S_t, A_t) + alpha(R_{t+1} + lambda max_{a}Q(S_{t+1}, a)-Q(S_t, A_t))
      • S<- S^{}
    • 循環直到S終止

演算法中α是學習率,當α為1時,結果為貝爾曼方程。更新次數越多Q函數越接近真實的Q值。

探索與利用

Q-learning不需要優化policy,因此是off-policy的演算法,另一方面,因為Q-Learning完全不考慮Model模型也就是環境的具體情況,只考慮看到的環境及reward,因此是model-free的方法。(PS:比較適合金融市場)

那麼如何生成策略:

  • 隨機生成一個動作
  • 根據當前Q值計算出一個最優的動作,這個policy π 稱之為greedy policy貪婪策略,也就是 pi(S_{t+1}) = argmax_a Q(S_{t+1}, a)

使用隨機的動作就是exploration,探索未知的動作會產生的效果,有利於更新Q值,而使用greedy policy也就是target policy則是exploitation,利用policy,相對來說不好更新出更好的Q值,單可以得到更好的測試效果用於判斷演算法是否有效。將兩者結合起來就是所謂的ξ?greedy策略,ξ一般是一個很小的值,作為選取隨機動作的概率值。

Q-table更新案例

# 初始化Q-tableq_table = pd.DataFrame(0, columns=[a1, a2, a3, a4], index=[s1, s2, s3, s4])alpha = 1lambdas = 1# 第一步在s1隨機選擇一個動作,得到的reward為1,並進入s3random_action = np.random.choice([a1, a2, a3, a4])reward = 1# 更新Q值q_table.loc[s1, random_action] = q_table.loc[s1, random_action] + alpha * (reward + lambdas * q_table.loc[s3].max() - q_table.loc[s1, random_action])print(q_table)""" a1 a2 a3 a4s1 0 1 0 0s2 0 0 0 0s3 0 0 0 0s4 0 0 0 0"""

Q-network

我們只需要對高緯狀態進行降維,而不需要對動作也行降維處理。 Q(s)≈f(s,w)神經網路直接輸出各個動作的Q值。

二、本案例描述

任務

AI需要決定向左移動還是向右移動,來使得木棍保持穩定。

依賴包

  • 神經網路(torch.nn)
  • 優化器(torch.optim)
  • 自動求導(torch.autograd)
  • 遊戲包(gym)
  • 可視化(torchvision)

%pylab inlineimport gymimport mathimport randomimport numpy as npimport pandas as pdfrom collections import namedtuplefrom itertools import countfrom copy import deepcopyfrom PIL import Imageimport seaborn as snsis_ipython = inline in matplotlib.get_backend()if is_ipython: from IPython import display# 載入神經網路相關包import torchimport torch.nn as nnimport torch.optim as optimimport torch.nn.functional as Ffrom torch.autograd import Variableimport torchvision.transforms as Tfrom torch import FloatTensor, LongTensor, ByteTensorTensor = FloatTensor# 載入gym環境env = gym.make(CartPole-v0).unwrapped

保存回放

用於存儲遊戲的中間樣本。

Transition = namedtuple(Transition, (state, action, next_state, reward))class ReplayMemory(object): def __init__(self, capacity): self.capacity = capacity self.memory = [] self.position = 0 def push(self, *args): """保存一次交互 """ if len(self.memory) < self.capacity: self.memory.append(None) self.memory[self.position] = Transition(*args) self.position = (self.position + 1) % self.capacity def sample(self, batch_size): return random.sample(self.memory, batch_size) def __len__(self): return len(self.memory)

定義DQN網路

Batch Normalization層

一般,如果模型的輸入特徵不相關且滿足標準正態分布N(0,1)時,模型的表現一般較好。在訓練神經網路模型的時候,我們可以事先將特徵去相關,並使得他們滿足一個比較好的分布,這樣,模型的第一層網路一般都會有一個比較好的輸入特徵,但是隨著模型的層數加深,網路的非線性變換使得每一層結果變得相關了,且不再滿足N(0,1)分布,更糟糕的是,可能隱藏層的特徵分布已經發生了偏離。為了解決這個問題,提出在層與層之間加入batch_normalization層,維護所有mini-batch數據的均值方差,最後利用樣本的均值方差的無偏估計量用於測試時使用。

加入BN後,使得模型的訓練收斂速度更快,模型隱藏輸出特徵的分布更穩定,更利於模型的學習。

Variable

每個Variable有兩個變數:requires_grad和volatile.

  • requires_grad: 主要用於限制變數是否需要進行梯度下降。
  • volatile: 純輸入介面,不能進行梯度下降。

torch.cat

拼接數據

  • torch.cat((x, x, x), 0):縱向拼接
  • torch.cat((x, x, x), 1):橫向拼接

optim.step

  • 在確定好梯度之後,更新神經網路參數

(1)Q-network

class DQN(nn.Module): def __init__(self): super(DQN, self).__init__() self.conv1 = nn.Conv2d(3, 16, kernel_size=5, stride=2) self.bn1 = nn.BatchNorm2d(16) self.conv2 = nn.Conv2d(16, 32, kernel_size=5, stride=2) self.bn2 = nn.BatchNorm2d(32) self.conv3 = nn.Conv2d(32, 32, kernel_size=5, stride=2) self.bn3 = nn.BatchNorm2d(32) self.head = nn.Linear(448, 2) def forward(self, x): x = F.relu(self.bn1(self.conv1(x))) # 一層卷積 x = F.relu(self.bn2(self.conv2(x))) # 兩層卷積 x = F.relu(self.bn3(self.conv3(x))) # 三層卷積 return self.head(x.view(x.size(0), -1)) # 全連接層

(2)從環境中提取輸入數據

  • torchvision.transforms: 主要用於進行圖像轉換。

resize = T.Compose([ T.ToPILImage(), # 轉換成為PILImage T.Scale(40, interpolation=Image.CUBIC), # 縮小或放大 T.ToTensor() # 轉換成為tensor, (H X W X C) in range(255)=> (C X H X W) in range(1.0)])# 定義環境screen_width = 600def get_cart_location(): world_width = env.x_threshold * 2 scale = screen_width / world_width return int(env.state[0] * scale + screen_width / 2.0)def get_screen(): screen = env.render(mode=rgb_array).transpose((2, 0, 1)) # 轉化成torch序列(CHW) screen = screen[:, 169:320] view_width = 320 cart_location = get_cart_location() if cart_location < view_width // 2: slice_range = slice(view_width) elif cart_location > (screen_width - view_width // 2): slice_range = slice(-view_width, None) else: slice_range = slice(cart_location - view_width // 2, cart_location + view_width // 2) screen = screen[:, :, slice_range] # 將圖像轉化成為tensor screen = np.ascontiguousarray(screen, dtype=np.float32) / 255 screen = torch.from_numpy(screen) # 調整尺寸 return resize(screen).unsqueeze(0).type(Tensor)# 顯示其中一次案例env.reset()plt.imshow(get_screen().cpu().squeeze(0).permute(1, 2, 0).numpy(), interpolation=none)plt.title(一次遊戲截取案例);

訓練網路

(1)初始化參數

BATCH_SIZE = 128GAMMA = 0.999EPS_START = 0.9EPS_END = 0.05EPS_DECAY = 200

(2)建立模型

model = DQN() # 初始化對象optimizer = optim.RMSprop(model.parameters()) # 設置優化器memory = ReplayMemory(10000)

(3)設計中間函數

steps_done = 0def select_action(state): """選擇動作 """ global steps_done sample = random.random() eps_threshold = EPS_END + (EPS_START - EPS_END) * math.exp(-1. * steps_done / EPS_DECAY) steps_done += 1 if sample > eps_threshold: # 剛開始不採用DQN進行更新,採用隨機探索 return model(Variable(state, volatile=True).type(FloatTensor)).data .max(1)[1].view(1, 1) # 返回最優的動作 else: return LongTensor([[random.randrange(2)]])episode_durations = [] # 維持時間長度def plot_durations(): plt.figure(2) plt.clf() durations_t = torch.FloatTensor(episode_durations) plt.title(訓練中。。。) plt.xlabel(Episode) plt.ylabel(Duration) plt.plot(durations_t.numpy()) # 平均沒100次迭代畫出一幅圖 if len(durations_t) >= 100: means = durations_t.unfold(0, 100, 1).mean(1).view(-1) means = torch.cat((torch.zeros(99), means)) # 拼接數據 plt.plot(means.numpy()) plt.pause(0.001) if is_ipython: display.clear_output(wait=True) display.display(plt.gcf())

(4)設計訓練函數

last_sync = 0def optimize_model(): """訓練函數 """ global last_sync if len(memory) < BATCH_SIZE: # 如果樣本數小於最低批次大小返回 return # 轉化batch transitions = memory.sample(BATCH_SIZE) # 抽樣 batch = Transition(*zip(*transitions)) # 轉換成為一批次 non_final_mask = ByteTensor(tuple(map(lambda s: s is not None, batch.next_state))) non_final_next_states = Variable(torch.cat([s for s in batch.next_state if s is not None]), volatile=True) state_batch = Variable(torch.cat(batch.state)) action_batch = Variable(torch.cat(batch.action)) reward_batch = Variable(torch.cat(batch.reward)) # 計算Q(s_t, a),選擇動作 state_action_values = model(state_batch).gather(1, action_batch) # 計算下一步的所有動作的價值V(s_{t+1}) next_state_values = Variable(torch.zeros(BATCH_SIZE).type(Tensor)) next_state_values[non_final_mask] = model(non_final_next_states) .max(1)[0] next_state_values.volatile = False # 計算預期的Q值 expected_state_action_values = (next_state_values * GAMMA) + reward_batch # 計算Huber loss,損失函數採用smooth_ll_loss loss = F.smooth_l1_loss(state_action_values, expected_state_action_values) # 優化模型 optimizer.zero_grad() # 清理所有參數的梯度。 loss.backward() # 反向傳播 for param in model.parameters(): param.grad.data.clamp_(-1, 1) # 將所有的梯度限制在-1到1之間 optimizer.step() # 更新模型的參數

(5)開始訓練

num_episodes = 100 # 迭代次數for i_episode in range(num_episodes): # 初始化環境和狀態 env.reset() last_screen = get_screen() current_screen = get_screen() state = current_screen - last_screen # 定義狀態,即為當前狀態和最後的狀態差。 for t in count(): # 選擇並執行一個動作 action = select_action(state) _, reward, done, _ = env.step(action[0, 0]) # 從環境匯中獲取獎勵 reward = Tensor([reward]) # 將獎勵轉換成為tensor # 觀察新的狀態,確定下一個狀態(PS:在這一步裡面獲取了未來信息,引用在資本市場上,未來的狀態具有一定的概率分布特徵。) last_screen = current_screen current_screen = get_screen() if not done: next_state = current_screen - last_screen else: next_state = None # 將轉換保存起來 memory.push(state, action, next_state, reward) # 切換到下一狀態 state = next_state # 優化模型 optimize_model() # 一次遊戲結束,就畫圖顯示 if done: episode_durations.append(t + 1) plot_durations() breakprint(完成)env.render(close=True)env.close()

參考文章:

  • 深度強化學習 ( DQN ) 初探
  • DQN 從入門到放棄1 DQN與增強學習
  • DQN 從入門到放棄2 增強學習與MDP
  • DQN 從入門到放棄3 價值函數與Bellman方程
  • DQN 從入門到放棄4 動態規劃與Q-Learning
  • DQN從入門到放棄5 深度解讀DQN演算法
  • Batch Normalization學習筆記及其實現
  • Reinforcement Learning (DQN) tutorial

推薦閱讀:

學習強化學習需要哪些數學基礎?我學習神經網路基本就是複習了線性代數,微積分,那麼RL該學什麼呢?
想請教一下各位大神 強化學習如何在機器人導航中應用 把位置當做狀態么?
能總結下馬氏決策和強化學習之間的關係和異同點嘛?
強化學習(RL)在NLP的應用前景如何?
如何看待DeepMind發布升級版阿爾法狗:AlphaGo Zero?

TAG:PyTorch | 强化学习ReinforcementLearning |