AlphaZero實踐——中國象棋(附論文翻譯)

版權聲明:本文出自程世東的知乎,原創文章,轉載請註明出處:AlphaZero實踐——中國象棋(附論文翻譯)。

請安裝TensorFlow1.0,Python3.5,uvloop

項目地址:

chengstone/cchess-zero?

github.com圖標

關於AlphaGo和後續的版本AlphaGo Zero等新聞大家都耳熟能詳了,今天我們從論文的分析,並結合代碼來一起討論下AlphaZero在中國象棋上的實踐。

實際上在GitHub上能夠看到有很多關於AlphaGo的實踐項目,包括國際象棋圍棋五子棋黑白棋等等,我有個好友在實踐麻將。

從演算法上來說,大家都是基於AlphaGo Zero / AlphaZero的論文來實現的,差別在於不同Game的規則和使用了不同的trick。

論文分析

我們要參考的就是AlphaGo Zero的論文《Mastering the Game of Go without Human Knowledge》和AlphaZero的論文《Mastering Chess and Shogi by Self-Play with a General Reinforcement Learning Algorithm》。

小弟不才,獻醜翻譯了這兩篇論文,時間倉促,水平有限?(? ? ??),您要是看不慣英文,希望這兩篇翻譯能提供些許幫助。

《Mastering the Game of Go without Human Knowledge》

《Mastering Chess and Shogi by Self-Play with a General Reinforcement Learning Algorithm》

建議在本地用jupyter notebook打開看,我發現從GitHub上看的話,有些公式沒有顯示出來,另外圖片也沒有顯示出來。

Mastering the Game of Go without Human Knowledge

先從《Mastering the Game of Go without Human Knowledge》說起,演算法根據這篇論文來實現,AlphaZero只有幾點不同而已。

總的來說,AlphaGo Zero分為兩個部分,一部分是MCTS(蒙特卡洛樹搜索),一部分是神經網路。

我們是要拋棄人類棋譜的,學會如何下棋完全是通過自對弈來完成。

過程是這樣,首先生成棋譜,然後將棋譜作為輸入訓練神經網路,訓練好的神經網路用來預測落子和勝率。如下圖:

蒙特卡洛樹搜索演算法

MCTS就是用來自對弈生成棋譜的,結合論文中的圖示進行說明:

論文中的描述:

AlphaGo Zero中的蒙特卡洛樹搜索。

  • a.每次模擬通過選擇具有最大行動價值Q的邊加上取決於所存儲的先驗概率P和該邊的訪問計數N(每次訪問都被增加一次)的上限置信區間U來遍歷樹。
  • b.展開葉子節點,通過神經網路(P(s, ·), V (s)) = f_θ(s)來評估局面s;向量P的值存儲在葉子結點擴展的邊上。
  • c.更新行動價值Q等於在該行動下的子樹中的所有評估值V的均值。
  • d.一旦MCTS搜索完成,返回局面s下的落子概率π,與N^{1 /τ}成正比,其中N是從根狀態每次移動的訪問計數, τ是控制溫度的參數。

按照論文所述,每次MCTS使用1600次模擬。過程是這樣的,現在AI從白板一塊開始自己跟自己下棋,只知道規則,不知道套路,那隻好亂下。每下一步棋,都要通過MCTS模擬1600次上圖中的a~c,從而得出我這次要怎麼走子。

來說說a~c,MCTS本質上是我們來維護一棵樹,這棵樹的每個節點保存了每一個局面(situation)該如何走子(action)的信息。這些信息是,N(s, a)是訪問次數,W(s, a)是總行動價值,Q(s, a)是平均行動價值,P(s, a)是被選擇的概率。

a. Select

每次模擬的過程都一樣,從父節點的局面開始,選擇一個走子。比如開局的時候,所有合法的走子都是可能的選擇,那麼我該選哪個走子呢?這就是select要做的事情。MCTS選擇Q(s, a) + U(s, a)最大的那個action。Q的公式一會在Backup中描述。U的公式如下:

這個可以理解成:U(s, a) = c_puct × 概率P(s, a) × np.sqrt(父節點訪問次數N) / ( 1 + 某子節點action的訪問次數N(s, a) )

用論文中的話說,c_puct是一個決定探索水平的常數;這種搜索控制策略最初傾向於具有高先驗概率和低訪問次數的行為,但是漸近地傾向於具有高行動價值的行為。

計算過後,我就知道當前局面下,哪個action的Q+U值最大,那這個action走子之後的局面就是第二次模擬的當前局面。比如開局,Q+U最大的是當頭炮,然後我就Select當頭炮這個action,再下一次Select就從當頭炮的這個棋局選擇下一個走子。

b. Expand

現在開始第二次模擬了,假如之前的action是當頭炮,我們要接著這個局面選擇action,但是這個局面是個葉子節點。就是說當頭炮之後可以選擇哪些action不知道,這樣就需要expand了,通過expand得到一系列可能的action節點。這樣實際上就是在擴展這棵樹,從只有根節點開始,一點一點的擴展。

Expand and evaluate這個部分有個需要關注的地方。論文中說:在隊列中的局面由神經網路使用最小批量mini-batch 大小為8進行評估;搜索線程被鎖定,直到評估完成。葉子節點被展開,每個邊(s_L, a)被初始化為

然後值v被回傳(backed up)

如果我當前的局面沒有被expand過,不知道下一步該怎麼下,所以要expand,這個時候要用我們的神經網路出馬。把當前的局面作為輸入傳給神經網路,神經網路會返回給我們一個action向量p和當前勝率v。其中action向量是當前局面每個合法action的走子概率。當然,因為神經網路還沒有訓練好,輸出作為參考添加到我們的蒙特卡洛樹上。這樣在當前局面下,所有可走的action以及對應的概率p就都有了,每個新增的action節點都按照論文中說的對若干信息賦值,

。這些新增的節點作為當前局面節點的子節點。

c. Backup

接下來就是重點,evaluate和Backup一起說,先看看Backup做什麼事吧:邊的統計數據在每一步t≤L中反向更新。訪問計數遞增,

,並且動作價值更新為平均值,

。我們使用虛擬損失來確保每個線程評估不同的節點。

我們來整理一下思路,任意一個局面(就是節點),要麼被展開過(expand),要麼沒有展開過(就是葉子節點)。展開過的節點可以使用Select選擇動作進入下一個局面,下一個局面仍然是這個過程,如果展開過還是可以通過Select進入下下個局面,這個過程一直持續下去直到這盤棋分出勝平負了,或者遇到某個局面沒有被展開過為止。

如果沒有展開過,那麼執行expand操作,通過神經網路得到每個動作的概率和勝率v,把這些動作添加到樹上,最後把勝率v回傳(backed up),backed up給誰?

我們知道這其實是一路遞歸下去的過程,一直在Select,遞歸必須要有結束條件,不然就是死循環了。所以分出勝負和遇到葉子節點就是遞歸結束條件,把勝率v或者分出的勝平負value作為返回值,回傳給上一層。

這個過程就是evaluate,是為了Backup步驟做準備。因為在Backup步驟,我們要用v來更新W和Q的,但是如果只做了一次Select,棋局還沒有結束,此時的v是不明確的,必須要等到一盤棋完整的下完才能知道v到底是多少。就是說我現在下了一步棋,不管這步棋是好棋還是臭棋,只有下完整盤期分出勝負,才能給我下的這步棋評分。不管這步棋的得失,即使我這步棋丟了個車,但最後我贏了,那這個v就是積極的。同樣即使我這步棋吃了對方一個子,但最後輸棋了,也不能認為我這步棋就是好棋。

用一幅圖概括一下這個過程:

當值被回傳,就要做Backup了,這裡很關鍵。因為我們是多線程同時在做MCTS,由於Select演算法都一樣,都是選擇Q+U最大節點,所以很有可能所有的線程最終選擇的是同一個節點,這就尷尬了。我們的目的是儘可能在樹上搜索出各種不同的著法,最終選擇一步好棋,怎麼辦呢?論文中已經給出了辦法,「我們使用虛擬損失來確保每個線程評估不同的節點。」

就是說,通過Select選出某節點後,人為增大這個節點的訪問次數N,並減少節點的總行動價值W,因為平均行動價值Q = W / N,這樣分子減少,分母增加,就減少了Q值,這樣遞歸進行的時候,此節點的Q+U不是最大,避免被選中,讓其他的線程嘗試選擇別的節點進行樹搜索。這個人為增加和減少的量就是虛擬損失virtual loss。

現在MCTS的過程越來越清晰了,Select選擇節點,選擇後,對當前節點使用虛擬損失,通過遞歸繼續Select,直到分出勝負或Expand節點,得到返回值value。現在就可以使用value進行Backup了,但首先要還原W和N,之前N增加了虛擬損失,這次要減回去,之前減少了虛擬損失的W也要加回來。

然後開始做Backup,「邊的統計數據在每一步t≤L中反向更新。訪問計數遞增,

,並且動作價值更新為平均值,

」,這些不用我再解釋了吧?同時我們還要更新U,U的公式上面給出過。這個反向更新,其實就是遞歸的把值返回回去。有一點一定要注意,就是我們的返回值一定要符號反轉,怎麼理解?就是說對於當前節點是勝,那麼對於上一個節點一定是負,明白這個意思了吧?所以返回的是-value。

d. play

按照上述過程執行a~c,論文中是每步棋執行1600次模擬,那就是1600次的a~c,這個MCTS的過程就是模擬自我對弈的過程。模擬結束後,基本上能覆蓋大多數的棋局和著法,每步棋該怎麼下,下完以後勝率是多少,得到什麼樣的局面都能在樹上找到。然後從樹上選擇當前局面應該下哪一步棋,這就是步驟d.play:"在搜索結束時,AlphaGo Zero在根節點s0選擇一個走子a,與其訪問計數冪指數成正比,

,其中τ是控制探索水平的溫度參數。在隨後的時間步重新使用搜索樹:與所走子的動作對應的子節點成為新的根節點;保留這個節點下面的子樹所有的統計信息,而樹的其餘部分被丟棄。如果根節點的價值和最好的子節點價值低於閾值v_resign,則AlphaGo Zero會認輸。"

當模擬結束後,對於當前局面(就是樹的根節點)的所有子節點就是每一步對應的action節點,選擇哪一個action呢?按照論文所說是通過訪問計數N來確定的。這個好理解吧?實現上也容易,當前節點的所有節點是可以獲得的,每個子節點的信息N都可以獲得,然後從多個action中選一個,這其實是多分類問題。我們使用softmax來得到選擇某個action的概率,傳給softmax的是每個action的logits(N(s_0,a) ^(1/τ)),這其實可以改成1/τ * log(N(s_0,a))。這樣就得到了當前局面所有可選action的概率向量,最終選擇概率最大的那個action作為要下的一步棋,並且將這個選擇的節點作為樹的根節點。

按照圖1中a.Self-Play的說法就是,從局面s_t進行自我對弈的樹搜索(模擬),得到a_t~ π_t,a_t就是動作action,π_t就是所有動作的概率向量。最終在局面s_T的時候得到勝平負的結果z,就是我們上面所說的value。

至此MCTS演算法就分析完了。

神經網路

上面說過,通過MCTS算出該下哪一步棋。然後接著再經過1600次模擬算出下一步棋,如此循環直到分出勝負,這樣一整盤棋就下完了,這就是一次完整的自對弈過程,那麼MCTS就相當於人在大腦中思考。我們把每步棋的局面s_t 、算出的action概率向量 π_t 和勝率z_t (就是返回值value)保存下來,作為棋譜數據訓練神經網路。

神經網路的輸入是局面s,輸出是預測的action概率向量p和勝率v,公式:

。訓練目標是最小化預測勝率v和自我對弈的勝率z之間的誤差,並使神經網路走子概率p與搜索概率π的相似度最大化。按照論文中所說,「具體而言,參數θ通過梯度下降分別在均方誤差和交叉熵損失之和上的損失函數l進行調整,

,其中c是控制L2權重正則化水平的參數(防止過擬合)。」簡單點說就是讓神經網路的預測跟MCTS的搜索結果盡量接近。

勝率是回歸問題,優化自然用MSE損失,概率向量的優化要用softmax交叉熵損失,目標就是最小化這個聯合損失。

神經網路結構

網路結構沒什麼好說的,按照論文中的描述實現即可,下面是結構圖:

到此,這篇論文基本上介紹的差不多了,有些訓練和優化方面的細節這裡就不介紹了。過程就是神經網路先隨機初始化權重,使用MCTS下每一步棋,當樹中節點沒有被展開時通過神經網路預測出走子概率和勝率添加到樹上,然後使用自我對弈的數據訓練神經網路,在下一次自我對弈中使用新的訓練過的神經網路進行預測,MCTS和神經網路你中有我、我中有你,如此反覆迭代,網路預測的更準確,MCTS的結果更強大。實際上神經網路的預測可以理解為人的直覺。

Mastering Chess and Shogi by Self-Play with a General Reinforcement Learning Algorithm

接下來一起看看AlphaZero的論文。

演算法上沒有區別,只有幾個不同點:

  • 1、在AlphaGo Zero中,自我對弈是由以前所有迭代中最好的玩家生成的。每次訓練迭代之後,與最好玩家對弈測量新玩家的能力;如果以55%的優勢獲勝,那麼它將取代最好的玩家,而自我對弈將由這個新玩家產生。相反,AlphaZero只維護一個不斷更新的單個神經網路,而不是等待迭代完成。自我對弈是通過使用這個神經網路的最新參數生成的,省略了評估步驟和選擇最佳玩家的過程。
  • 2、比賽結果除了勝負以外,還有平局。
  • 3、圍棋是可以進行數據增強的,因為圍棋的規則是旋轉和反轉不變的。但是象棋、將棋等就不行。

好像也沒啥大變化,我們重點要考慮的是輸入特徵的表示。

輸入特徵的表示

剛剛介紹神經網路的結構時,沒有對輸入特徵進行說明,先看看論文中的圖示。

網路結構圖上能夠看出神經網路的輸入是19×19×17維度的圖像棧。包含17個二值(只有兩個值0和1)特徵平面,8個特徵平面X_t由二進位值組成,表示當前玩家存在的棋子(如果交點i在時間步t包含玩家顏色的棋子,那麼X_t^i = 1 ;如果交叉點是空的,包含對手棋子,或者t <0,X_t^i = 0)。另外8個特徵平面Y_t表示對手的棋子的相應特徵。為什麼每個玩家8個特徵平面呢?是因為這是8步歷史走子記錄,就是說最近走的8步棋作為輸入特徵。最後的特徵面C表示棋子顏色(當前的棋盤狀態),是常量,如果是黑色棋子,則為1,如果是白色棋子則為0。這些平面連接在一起,給出輸入特徵

國際象棋就不同了,加入了各種特徵平面,用來表示不同的情況,王車易位啦,多少回合沒有進展啦(沒有吃子),重複的局面啦(多次重複會被判平局)等等,這些不是我想說的,這些特徵可以根據不同的棋種自己去設計,我們重點關注的是棋子的特徵。

對於圍棋而言,每個棋子都是一樣的,都是一類。而國際象棋分為6種棋子:車、馬、象、後、王、兵,那在特徵平面上怎麼表示呢,總不能使用0~5吧。還是用0和1來表示棋盤上有子還是沒子,然後既然是6類棋子,想當然的使用one-hot編碼了,所以特徵平面分成了6個平面,每一個平面用來表示不同種類棋子在棋盤上的位置。

以上就是介紹的全部了,更多的細節,比如優化參數設為多少、學習率退火設為多少等等請閱讀論文。

中國象棋的實現

原理講了一大堆,該上代碼了,這裡根據論文中的演算法實現一個中國象棋程序。

完整代碼請參見項目地址

輸入特徵的設計

先實現神經網路的部分,那麼就要先設計輸入特徵。其實跟國際象棋差不多,棋子分為:車、馬、炮、象、士、將、兵,共7種棋子,那就是每個玩家7個特徵平面,一共14個特徵平面。至於論文中其他的特徵平面,比如顏色、回合數、重複局面、歷史走子記錄等等我沒有實現,只使用了當前棋盤上每個玩家每個棋子的位置特徵作為輸入,一共14個平面,當然論文中說的其他特徵平面您也可以實現一下試試。棋盤大小是9?10,所以輸入佔位符就是:

self.inputs_ = tf.placeholder(tf.float32, [None, 9, 10, 14], name=inputs)

接下來是定義輸入的概率向量pi(π),需要確定向量的長度,意味著需要確定所有合法走子的集合長度。函數如下:

# 創建所有合法走子UCI,size 2086def create_uci_labels(): labels_array = [] letters = [a, b, c, d, e, f, g, h, i] numbers = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] Advisor_labels = [d7e8, e8d7, e8f9, f9e8, d0e1, e1d0, e1f2, f2e1, d2e1, e1d2, e1f0, f0e1, d9e8, e8d9, e8f7, f7e8] Bishop_labels = [a2c4, c4a2, c0e2, e2c0, e2g4, g4e2, g0i2, i2g0, a7c9, c9a7, c5e7, e7c5, e7g9, g9e7, g5i7, i7g5, a2c0, c0a2, c4e2, e2c4, e2g0, g0e2, g4i2, i2g4, a7c5, c5a7, c9e7, e7c9, e7g5, g5e7, g9i7, i7g9] for l1 in range(9): for n1 in range(10): destinations = [(t, n1) for t in range(9)] + [(l1, t) for t in range(10)] + [(l1 + a, n1 + b) for (a, b) in [(-2, -1), (-1, -2), (-2, 1), (1, -2), (2, -1), (-1, 2), (2, 1), (1, 2)]] # 馬走日 for (l2, n2) in destinations: if (l1, n1) != (l2, n2) and l2 in range(9) and n2 in range(10): move = letters[l1] + numbers[n1] + letters[l2] + numbers[n2] labels_array.append(move) for p in Advisor_labels: labels_array.append(p) for p in Bishop_labels: labels_array.append(p) return labels_array

長度一共是2086。關於UCCI的資料可以參考:中國象棋通用引擎協議 版本:3.0

概率向量pi的佔位符定義:

self.pi_ = tf.placeholder(tf.float32, [None, 2086], name=pi)

勝率z的佔位符定義:

self.z_ = tf.placeholder(tf.float32, [None, 1], name=z)

學習率的定義:

self.learning_rate = tf.placeholder(tf.float32, name=learning_rate)

優化器使用Momentum:

self.momentum = 0.9optimizer = tf.train.MomentumOptimizer(learning_rate=self.learning_rate, momentum=self.momentum, use_nesterov=True)

這裡需要特殊說明一下,我實現的是多GPU訓練,關於多GPU訓練神經網路的實現可以參考TensorFlow官方的例子,和TensorFlow多GPU並行計算實例---MNIST

實現思想是把輸入數據按照使用的gpu數量均分:

inputs_batches = tf.split(self.inputs_, self.num_gpus, axis=0)pi_batches = tf.split(self.pi_, self.num_gpus, axis=0)z_batches = tf.split(self.z_, self.num_gpus, axis=0)tower_grads = [None] * self.num_gpusself.loss = 0self.accuracy = 0self.policy_head = []self.value_head = []with tf.variable_scope(tf.get_variable_scope()): """Build the core model within the graph.""" for i in range(self.num_gpus): with tf.device(/gpu:%d % i): # 不同的gpu分別使用不同的name scope with tf.name_scope(TOWER_{}.format(i)) as scope: # 將上面均分的輸入數據輸入給各自的gpu進行運算 inputs_batch, pi_batch, z_batch = inputs_batches[i], pi_batches[i], z_batches[i] # **劃重點!運算圖的構建一定要單獨寫在新的函數中,這樣運行才不會出錯,否則TensorFlow會提示不能重複使用變數。** loss = self.tower_loss(inputs_batch, pi_batch, z_batch, i) # 構建神經網路計算圖的函數,一會詳細說。 # reuse variable happens here tf.get_variable_scope().reuse_variables() grad = optimizer.compute_gradients(loss) tower_grads[i] = grad # 保存每一個gpu的梯度self.loss /= self.num_gpus # loss是多個gpu的loss總和,所以要取平均self.accuracy /= self.num_gpus # acc也是同理grads = self.average_gradients(tower_grads) # 同理,對所有梯度取平均self.train_op = optimizer.apply_gradients(grads, global_step=global_step)

實現神經網路計算圖

這裡完全是按照論文所述的神經網路結構實現的,大家可以對照上面的結構圖,是一一對應的。稍有不同的是,filters size我設為128,沒有使用256。另外殘差塊的數量我默認使用了7層,沒有使用19或者39,大家電腦給力的話可以嘗試修改一下。

def tower_loss(self, inputs_batch, pi_batch, z_batch, i): # 卷積塊 with tf.variable_scope(init): layer = tf.layers.conv2d(inputs_batch, self.filters_size, 3, padding=SAME) # filters 128(or 256) layer = tf.contrib.layers.batch_norm(layer, center=False, epsilon=1e-5, fused=True, is_training=self.training, activation_fn=tf.nn.relu) # epsilon = 0.25 # 殘差塊 with tf.variable_scope("residual_block"): for _ in range(self.res_block_nums): layer = self.residual_block(layer) # 策略頭 with tf.variable_scope("policy_head"): policy_head = tf.layers.conv2d(layer, 2, 1, padding=SAME) policy_head = tf.contrib.layers.batch_norm(policy_head, center=False, epsilon=1e-5, fused=True, is_training=self.training, activation_fn=tf.nn.relu) # print(self.policy_head.shape) # (?, 9, 10, 2) policy_head = tf.reshape(policy_head, [-1, 9 * 10 * 2]) policy_head = tf.contrib.layers.fully_connected(policy_head, self.prob_size, activation_fn=None) self.policy_head.append(policy_head) # 保存多個gpu的策略頭結果(走子概率向量) # 價值頭 with tf.variable_scope("value_head"): value_head = tf.layers.conv2d(layer, 1, 1, padding=SAME) value_head = tf.contrib.layers.batch_norm(value_head, center=False, epsilon=1e-5, fused=True, is_training=self.training, activation_fn=tf.nn.relu) # print(self.value_head.shape) # (?, 9, 10, 1) value_head = tf.reshape(value_head, [-1, 9 * 10 * 1]) value_head = tf.contrib.layers.fully_connected(value_head, 256, activation_fn=tf.nn.relu) value_head = tf.contrib.layers.fully_connected(value_head, 1, activation_fn=tf.nn.tanh) self.value_head.append(value_head) # 保存多個gpu的價值頭結果(勝率) # 損失 with tf.variable_scope("loss"): policy_loss = tf.nn.softmax_cross_entropy_with_logits(labels=pi_batch, logits=policy_head) policy_loss = tf.reduce_mean(policy_loss) # value_loss = tf.squared_difference(z_batch, value_head) value_loss = tf.losses.mean_squared_error(labels=z_batch, predictions=value_head) value_loss = tf.reduce_mean(value_loss) tf.summary.scalar(mse_tower_{}.format(i), value_loss) regularizer = tf.contrib.layers.l2_regularizer(scale=self.c_l2) regular_variables = tf.trainable_variables() l2_loss = tf.contrib.layers.apply_regularization(regularizer, regular_variables) # loss = value_loss - policy_loss + l2_loss loss = value_loss + policy_loss + l2_loss # softmax交叉熵損失 + MSE + l2損失 self.loss += loss # 多個gpu的loss總和 tf.summary.scalar(loss_tower_{}.format(i), loss) with tf.variable_scope("accuracy"): # Accuracy 這個準確率是預測概率向量和MCTS的概率向量的比較 correct_prediction = tf.equal(tf.argmax(policy_head, 1), tf.argmax(pi_batch, 1)) correct_prediction = tf.cast(correct_prediction, tf.float32) accuracy = tf.reduce_mean(correct_prediction, name=accuracy) self.accuracy += accuracy tf.summary.scalar(move_accuracy_tower_{}.format(i), accuracy) return lossdef residual_block(self, in_layer): orig = tf.identity(in_layer) layer = tf.layers.conv2d(in_layer, self.filters_size, 3, padding=SAME) # filters 128(or 256) layer = tf.contrib.layers.batch_norm(layer, center=False, epsilon=1e-5, fused=True, is_training=self.training, activation_fn=tf.nn.relu) layer = tf.layers.conv2d(layer, self.filters_size, 3, padding=SAME) # filters 128(or 256) layer = tf.contrib.layers.batch_norm(layer, center=False, epsilon=1e-5, fused=True, is_training=self.training) out = tf.nn.relu(tf.add(orig, layer)) return out

訓練網路

def train_step(self, positions, probs, winners, learning_rate): feed_dict = { self.inputs_: positions, self.training: True, self.learning_rate: learning_rate, self.pi_: probs, self.z_: winners } _, accuracy, loss, global_step, summary = self.sess.run([self.train_op, self.accuracy, self.loss, self.global_step, self.summaries_op], feed_dict=feed_dict) self.train_writer.add_summary(summary, global_step) return accuracy, loss, global_step

使用神經網路預測

預測的代碼稍微麻煩一些,因為我們自對弈訓練時是多線程在跑的,傳過來的輸入數據可能並不能被gpu數量均分,比如我有2個gpu,但是傳進來的輸入size是3,這樣的話就有一個gpu跑2個數據,一個gpu跑1個數據。可實際上這樣代碼是跑不起來的,會報錯,我google了半天也沒找到解決辦法。

我的解決方案是,先看看輸入數據的長度能否被gpu數量整除,如果能,那就一切正常,直接把輸入傳給網路就好,神經網路會將數據按照gpu數量均分。

一旦不能整除,那就把輸入數據分成兩部分,一部分是能被gpu數量整除的數據,一部分是餘下的數據。比如我有2個gpu,輸入數據的長度是5,那麼把這5份數據分成4份和1份。4份數據的處理就是正常處理,直接把數據傳給網路就好,神經網路會將數據按照gpu數量均分。

餘下的那部分數據怎麼處理呢?把餘下的數據不斷堆疊起來,直到數據能夠被gpu數量均分為止。假如剩下1份數據,那就複製1份,變成2份相同的數據,這樣正好被2個gpu數量均分。只不過這2個gpu處理後返回的數據,我們只要一個gpu的結果就行了,拋棄另外一個。

這段代碼我只在aws的2個gpu的環境下跑過,更多的gpu就沒試過了,也許有bug也不一定,您可以跑跑看:)

#@profiledef forward(self, positions): # , probs, winners # print("positions.shape : ", positions.shape) positions = np.array(positions) batch_n = positions.shape[0] // self.num_gpus alone = positions.shape[0] % self.num_gpus if alone != 0: # 判斷是否不能被gpu均分 if(positions.shape[0] != 1): # 如果不止1份數據。因為有可能輸入數據的長度是1,這樣肯定不能被多gpu均分了。 feed_dict = { self.inputs_: positions[:positions.shape[0] - alone], # 先將能均分的這部分數據傳入神經網路 self.training: False } action_probs, value = self.sess.run([self.policy_head, self.value_head], feed_dict=feed_dict) action_probs, value = np.vstack(action_probs), np.vstack(value) new_positions = positions[positions.shape[0] - alone:] # 取餘下的這部分數據 pos_lst = [] while len(pos_lst) == 0 or (np.array(pos_lst).shape[0] * np.array(pos_lst).shape[1]) % self.num_gpus != 0: pos_lst.append(new_positions) # 將餘下的這部分數據堆疊起來,直到數量的長度能被gpu均分 if(len(pos_lst) != 0): shape = np.array(pos_lst).shape pos_lst = np.array(pos_lst).reshape([shape[0] * shape[1], 9, 10, 14]) # 將數據傳入網路,得到不能被gpu均分的數據的計算結果 feed_dict = { self.inputs_: pos_lst, self.training: False } action_probs_2, value_2 = self.sess.run([self.policy_head, self.value_head], feed_dict=feed_dict) # print("action_probs_2.shape : ", np.array(action_probs_2).shape) # print("value_2.shape : ", np.array(value_2).shape) action_probs_2, value_2 = action_probs_2[0], value_2[0] # print("------------------------") # print("action_probs_2.shape : ", np.array(action_probs_2).shape) # print("value_2.shape : ", np.array(value_2).shape) if(positions.shape[0] != 1): # 多個數據的計算結果 action_probs = np.concatenate((action_probs, action_probs_2),axis=0) value = np.concatenate((value, value_2),axis=0) # print("action_probs.shape : ", np.array(action_probs).shape) # print("value.shape : ", np.array(value).shape) return action_probs, value else: # 只有1個數據的計算結果 return action_probs_2, value_2 else: # 正常情況,能被gpu均分 feed_dict = { self.inputs_: positions, self.training: False } action_probs, value = self.sess.run([self.policy_head, self.value_head], feed_dict=feed_dict) # print("np.vstack(action_probs) shape : ", np.vstack(action_probs).shape) # print("np.vstack(value) shape : ", np.vstack(value).shape) # 將多個gpu的計算結果堆疊起來返回 return np.vstack(action_probs), np.vstack(value)

自對弈訓練

自對弈訓練的思想在上面分析論文時已經說過了,程序自己跟自己下棋,將每盤棋的數據保存起來,當數據量達到我們設置的大小時就開始訓練神經網路。

def run(self): batch_iter = 0 try: while(True): batch_iter += 1 play_data, episode_len = self.selfplay() # 自我對弈,返回下棋數據 print("batch i:{}, episode_len:{}".format(batch_iter, episode_len)) extend_data = [] for state, mcts_prob, winner in play_data: states_data = self.mcts.state_to_positions(state) extend_data.append((states_data, mcts_prob, winner)) # 將棋盤特徵平面、MCTS算出的概率向量、勝率保存起來 self.data_buffer.extend(extend_data) if len(self.data_buffer) > self.batch_size: # 保存的數據達到指定數量時 self.policy_update() # 開始訓練 except KeyboardInterrupt: self.log_file.close() self.policy_value_netowrk.save(self.global_step)

訓練網路

def policy_update(self): """update the policy-value net""" # 從數據中隨機抽取一部分數據 mini_batch = random.sample(self.data_buffer, self.batch_size) #print("training data_buffer len : ", len(self.data_buffer)) state_batch = [data[0] for data in mini_batch] mcts_probs_batch = [data[1] for data in mini_batch] winner_batch = [data[2] for data in mini_batch] # print(np.array(winner_batch).shape) # print(winner_batch) winner_batch = np.expand_dims(winner_batch, 1) # print(winner_batch.shape) # print(winner_batch) start_time = time.time() old_probs, old_v = self.mcts.forward(state_batch) # 先通過正向傳播預測下網路輸出結果,用於計算訓練後的KL散度 for i in range(self.epochs): # 一共訓練5次 # 訓練網路。敲黑板!這裡的學習率需要特別注意。我在aws上用的是g2.2xlarge,24小時只能下差不多200盤棋,很慢。 # 所以學習率是在這裡是動態調整的。當然您也可以使用指數衰減學習率,在上面定義學習率的地方就需要修改成類似下面這句: # self.learning_rate = tf.maximum(tf.train.exponential_decay(0.001, self.global_step, 1e3, 0.66), 1e-5) # 然後這裡訓練網路的地方學習率就不用作為參數傳遞了,也可以在訓練網路函數裡面不使用傳遞的學習率參數。 accuracy, loss, self.global_step = self.policy_value_netowrk.train_step(state_batch, mcts_probs_batch, winner_batch, self.learning_rate * self.lr_multiplier) # new_probs, new_v = self.mcts.forward(state_batch) #使用訓練後的新網路預測結果,跟之前的結果計算KL散度 kl_tmp = old_probs * (np.log((old_probs + 1e-10) / (new_probs + 1e-10))) # print("kl_tmp.shape", kl_tmp.shape) kl_lst = [] for line in kl_tmp: # print("line.shape", line.shape) all_value = [x for x in line if str(x) != nan and str(x)!= inf] #除去inf值 kl_lst.append(np.sum(all_value)) kl = np.mean(kl_lst) # kl = scipy.stats.entropy(old_probs, new_probs) # kl = np.mean(np.sum(old_probs * (np.log(old_probs + 1e-10) - np.log(new_probs + 1e-10)), axis=1)) if kl > self.kl_targ * 4: # early stopping if D_KL diverges badly break self.policy_value_netowrk.save(self.global_step) print("train using time {} s".format(time.time() - start_time)) # 通過計算調整學習率乘子 # adaptively adjust the learning rate if kl > self.kl_targ * 2 and self.lr_multiplier > 0.1: self.lr_multiplier /= 1.5 elif kl < self.kl_targ / 2 and self.lr_multiplier < 10: self.lr_multiplier *= 1.5 explained_var_old = 1 - np.var(np.array(winner_batch) - old_v.flatten()) / np.var(np.array(winner_batch)) explained_var_new = 1 - np.var(np.array(winner_batch) - new_v.flatten()) / np.var(np.array(winner_batch)) print( "kl:{:.5f},lr_multiplier:{:.3f},loss:{},accuracy:{},explained_var_old:{:.3f},explained_var_new:{:.3f}".format( kl, self.lr_multiplier, loss, accuracy, explained_var_old, explained_var_new)) self.log_file.write("kl:{:.5f},lr_multiplier:{:.3f},loss:{},accuracy:{},explained_var_old:{:.3f},explained_var_new:{:.3f}".format( kl, self.lr_multiplier, loss, accuracy, explained_var_old, explained_var_new) +
) self.log_file.flush()

自我對弈

自我對弈就是通過MCTS下每一步棋,直到分出勝負,並返回下棋數據。

def selfplay(self): self.game_borad.reload() # 初始化棋盤 states, mcts_probs, current_players = [], [], [] z = None game_over = False winnner = "" start_time = time.time() while(not game_over): # 下棋循環,結束條件是分出勝負 action, probs, win_rate = self.get_action(self.game_borad.state, self.temperature) # 通過MCTS算出下哪一步棋 ################################################ # 這部分代碼是跟我的設計有關的。因為在輸入特徵平面中我沒有使用顏色特徵, # 所以傳給神經網路數據時,要把當前選手轉換成紅色(先手),轉換的其實是棋盤的棋子位置 # 這樣神經網路預測的始終是紅色先手方向該如何下棋 state, palyer = self.mcts.try_flip(self.game_borad.state, self.game_borad.current_player, self.mcts.is_black_turn(self.game_borad.current_player)) states.append(state) prob = np.zeros(labels_len) # 神經網路返回的概率向量也需要轉換,假如當前選手是黑色,轉換成紅色後,由於棋盤位置的變化,概率向量(走子集合)是基於紅色棋盤的 # 要把走子action轉換成黑色選手的方向才行。明白我的意思吧? if self.mcts.is_black_turn(self.game_borad.current_player): for idx in range(len(probs[0][0])): act = "".join((str(9 - int(a)) if a.isdigit() else a) for a in probs[0][0][idx]) prob[label2i[act]] = probs[0][1][idx] else: for idx in range(len(probs[0][0])): prob[label2i[probs[0][0][idx]]] = probs[0][1][idx] mcts_probs.append(prob) ################################################ current_players.append(self.game_borad.current_player) last_state = self.game_borad.state self.game_borad.state = GameBoard.sim_do_action(action, self.game_borad.state) # 在棋盤上下算出的這步棋,得到新的棋盤狀態 self.game_borad.round += 1 # 更新回合數 self.game_borad.current_player = "w" if self.game_borad.current_player == "b" else "b" # 切換當前選手 if is_kill_move(last_state, self.game_borad.state) == 0: # 剛剛下的棋是否吃子了 self.game_borad.restrict_round += 1 # 更新沒有進展回合數 else: self.game_borad.restrict_round = 0 if (self.game_borad.state.find(K) == -1 or self.game_borad.state.find(k) == -1): # 條件滿足說明將/帥被吃了,遊戲結束 z = np.zeros(len(current_players)) if (self.game_borad.state.find(K) == -1): winnner = "b" if (self.game_borad.state.find(k) == -1): winnner = "w" z[np.array(current_players) == winnner] = 1.0 z[np.array(current_players) != winnner] = -1.0 game_over = True print("Game end. Winner is player : ", winnner, " In {} steps".format(self.game_borad.round - 1)) elif self.game_borad.restrict_round >= 60: # 60回合沒有進展(吃子),平局 z = np.zeros(len(current_players)) game_over = True print("Game end. Tie in {} steps".format(self.game_borad.round - 1)) # 認輸的部分沒有實現 # elif(self.mcts.root.v < self.resign_threshold): # pass # elif(self.mcts.root.Q < self.resign_threshold): # pass if(game_over): self.mcts.reload() # 遊戲結束,重置棋盤 print("Using time {} s".format(time.time() - start_time)) return zip(states, mcts_probs, z), len(z) # 返回下棋數據

MCTS實現

關鍵的代碼來了,函數通過MCTS進行若干次模擬(論文是1600次,我用了1200次),然後根據子節點的訪問量決定要下哪步棋。

#@profiledef get_action(self, state, temperature = 1e-3): # MCTS主函數,模擬下棋 self.mcts.main(state, self.game_borad.current_player, self.game_borad.restrict_round, self.playout_counts) # 取得當前局面下所有子節點的合法走子和相應的訪問量。 # 這個所有子節點可能並不會覆蓋所有合法的走子,這個是由樹搜索的質量決定的,加大模擬次數會搜索更多不同的走法, # 就是加大思考的深度,考慮更多的局面,避免出現有些特別重要的棋步卻沒有考慮到的情況。 actions_visits = [(act, nod.N) for act, nod in self.mcts.root.child.items()] actions, visits = zip(*actions_visits) probs = softmax(1.0 / temperature * np.log(visits)) #+ 1e-10 move_probs = [] move_probs.append([actions, probs]) if(self.exploration): # 訓練時,可以通過加入雜訊來探索更多可能性的走子 act = np.random.choice(actions, p=0.75 * probs + 0.25*np.random.dirichlet(0.3*np.ones(len(probs)))) else: act = np.random.choice(actions, p=probs) # 通過節點訪問量的softmax選擇最大可能性的走子 win_rate = self.mcts.Q(act) # 將節點的Q值當做勝率 self.mcts.update_tree(act) # 更新搜索樹,將算出的這步棋的局面作為樹的根節點

來看看MCTS的類定義:

from collections import deque, defaultdict, namedtupleQueueItem = namedtuple("QueueItem", "feature future")c_PUCT = 5virtual_loss = 3cut_off_depth = 30class MCTS_tree(object): def __init__(self, in_state, in_forward, search_threads): # 參數search_threads我默認使用16個搜索線程 self.noise_eps = 0.25 self.dirichlet_alpha = 0.3 #0.03 # 根節點的先驗概率加入了雜訊 self.p_ = (1 - self.noise_eps) * 1 + self.noise_eps * np.random.dirichlet([self.dirichlet_alpha]) # 定義根節點,傳入概率和局面(棋子位置) self.root = leaf_node(None, self.p_, in_state) self.c_puct = 5 #1.5 # 保存前向傳播(預測)函數 self.forward = in_forward self.node_lock = defaultdict(Lock) # 虛擬損失 self.virtual_loss = 3 # 用來保存正在擴展的節點 self.now_expanding = set() # 保存擴展過的節點 self.expanded = set() self.cut_off_depth = 30 # self.QueueItem = namedtuple("QueueItem", "feature future") self.sem = asyncio.Semaphore(search_threads) # 保存搜索線程的隊列 self.queue = Queue(search_threads) self.loop = asyncio.get_event_loop() self.running_simulation_num = 0

葉子節點的類定義:

class leaf_node(object): # 定義節點時,傳入父節點,概率和棋盤狀態(棋子位置) def __init__(self, in_parent, in_prior_p, in_state): self.P = in_prior_p # 保存概率,其他值默認是0 self.Q = 0 self.N = 0 self.v = 0 self.U = 0 self.W = 0 self.parent = in_parent # 保存父節點 self.child = {} # 子節點默認是空 self.state = in_state # 保存棋盤狀態

MCTS主函數,模擬下棋:

def is_expanded(self, key) -> bool: """Check expanded status""" return key in self.expanded #@profiledef main(self, state, current_player, restrict_round, playouts): node = self.root # 先通過神經網路擴展根節點 if not self.is_expanded(node): # and node.is_leaf() # node.state # print(Expadning Root Node...) positions = self.generate_inputs(node.state, current_player) positions = np.expand_dims(positions, 0) action_probs, value = self.forward(positions) # 通過神經網路預測走子概率 if self.is_black_turn(current_player): # 判斷走子概率是否需要根據先手/後手進行轉換 action_probs = cchess_main.flip_policy(action_probs) # 取得當前局面所有合法的走子,有關中國象棋的演算法就不在這裡討論了,感興趣可以查看源代碼 moves = GameBoard.get_legal_moves(node.state, current_player) # print("current_player : ", current_player) # print(moves) node.expand(moves, action_probs) # 擴展節點 self.expanded.add(node) # 將當前節點加入到已擴展節點集合中 coroutine_list = [] for _ in range(playouts): # 模擬1200次,非同步的方式執行,一共使用了16個線程 coroutine_list.append(self.tree_search(node, current_player, restrict_round)) coroutine_list.append(self.prediction_worker()) self.loop.run_until_complete(asyncio.gather(*coroutine_list))async def tree_search(self, node, current_player, restrict_round) -> float: """Independent MCTS, stands for one simulation""" self.running_simulation_num += 1 # reduce parallel search number with await self.sem: # 非同步執行樹搜索,共16個線程 value = await self.start_tree_search(node, current_player, restrict_round) self.running_simulation_num -= 1 return value# ***樹搜索函數***async def start_tree_search(self, node, current_player, restrict_round)->float: """Monte Carlo Tree search Select,Expand,Evauate,Backup""" now_expanding = self.now_expanding # 如果當前節點正在被擴展,就小睡一會 while node in now_expanding: await asyncio.sleep(1e-4) if not self.is_expanded(node): # 如果節點沒有被擴展過,要擴展這個節點 """is leaf node try evaluate and expand""" # add leaf node to expanding list self.now_expanding.add(node) # 加入到正在擴展集合中 positions = self.generate_inputs(node.state, current_player) # 這裡有個trick,就是並不是一個節點一個節點的使用神經網路預測結果,這樣效率太低 # 而是放到隊列中,通過prediction_worker函數統一管理隊列,將隊列中的一組(16個)輸入傳給神經網路,得到預測結果 # 這一切都是非同步的 # push extracted dihedral features of leaf node to the evaluation queue future = await self.push_queue(positions) # type: Future await future action_probs, value = future.result() if self.is_black_turn(current_player): # 根據當前棋手的顏色決定是否對走子概率翻轉 action_probs = cchess_main.flip_policy(action_probs) moves = GameBoard.get_legal_moves(node.state, current_player) # print("current_player : ", current_player) # print(moves) node.expand(moves, action_probs) # Expand操作,使用神經網路預測的結果擴展當前節點 self.expanded.add(node) # # remove leaf node from expanding list self.now_expanding.remove(node) # must invert, because alternative layer has opposite objective return value[0] * -1 # 返回神經網路預測的勝率,一定要取負,理由在論文分析時已經說過了 else: # 如果節點被擴展過,執行Select """node has already expanded. Enter select phase.""" # select child node with maximum action scroe last_state = node.state action, node = node.select_new(c_PUCT) # Select操作,根據Q+U最大選擇節點 current_player = "w" if current_player == "b" else "b" if is_kill_move(last_state, node.state) == 0: restrict_round += 1 else: restrict_round = 0 last_state = node.state # 為選擇的節點添加虛擬損失,防止其他線程繼續探索這個節點,增加探索多樣性 # add virtual loss node.N += virtual_loss node.W += -virtual_loss # evolve game board status # 判斷這個節點狀態下,是否分出勝負 if (node.state.find(K) == -1 or node.state.find(k) == -1): # 分出勝負了,設置勝率1或者0 if (node.state.find(K) == -1): value = 1.0 if current_player == "b" else -1.0 if (node.state.find(k) == -1): value = -1.0 if current_player == "b" else 1.0 # 一定要符號取反 value = value * -1 elif restrict_round >= 60: # 60回合無進展(吃子),平局 value = 0.0 else: # 沒有分出勝負,在當前節點局面下繼續樹搜索 value = await self.start_tree_search(node, current_player, restrict_round) # next move # 當前節點搜索完畢,去掉虛擬損失,恢復節點狀態 node.N += -virtual_loss node.W += virtual_loss # on returning search path # update: N, W, Q, U node.back_up_value(value) # 執行節點的Backup操作,更新節點的各類數值 # must invert return value * -1 # 一定要符號取反# 管理隊列數據,一旦隊列中有數據,就統一傳給神經網路,獲得預測結果async def prediction_worker(self): """For better performance, queueing prediction requests and predict together in this worker. speed up about 45sec -> 15sec for example. """ q = self.queue margin = 10 # avoid finishing before other searches starting. while self.running_simulation_num > 0 or margin > 0: if q.empty(): if margin > 0: margin -= 1 await asyncio.sleep(1e-3) continue item_list = [q.get_nowait() for _ in range(q.qsize())] # type: list[QueueItem] features = np.asarray([item.feature for item in item_list]) # action_probs, value = self.forward(features) for p, v, item in zip(action_probs, value, item_list): item.future.set_result((p, v))async def push_queue(self, features): future = self.loop.create_future() item = QueueItem(features, future) await self.queue.put(item) return future

最後看看葉子節點的Select、Expand和Backup的實現。

# Select,選擇Q+U最大的節點def select_new(self, c_puct): return max(self.child.items(), key=lambda node: node[1].get_Q_plus_U_new(c_puct))# 返回節點的Q+Udef get_Q_plus_U_new(self, c_puct): """Calculate and return the value for this node: a combination of leaf evaluations, Q, and this nodes prior adjusted for its visit count, u c_puct -- a number in (0, inf) controlling the relative impact of values, Q, and prior probability, P, on this nodes score. """ U = c_puct * self.P * np.sqrt(self.parent.N) / ( 1 + self.N) return self.Q + U# 參數是所有合法走子moves,和神經網路預測的概率向量action_probs#@profiledef expand(self, moves, action_probs): tot_p = 1e-8 action_probs = action_probs.flatten() for action in moves: # 模擬執行每一個合法走子,得到相應的局面(棋子位置) in_state = GameBoard.sim_do_action(action, self.state) # 從概率向量中得到當前走子對應的概率 mov_p = action_probs[label2i[action]] # 創建新節點,傳入父節點(因為是擴展當前節點,所以當前節點是新節點的父節點)、概率、棋盤狀態 new_node = leaf_node(self, mov_p, in_state) self.child[action] = new_node # 將新節點添加到當前節點的子節點集合中 tot_p += mov_p for a, n in self.child.items(): n.P /= tot_p# 更新節點的各項參數def back_up_value(self, value): self.N += 1 # 計數加一 self.W += value # 更新總行動價值 self.v = value self.Q = self.W / self.N # 更新平均行動價值 self.U = c_PUCT * self.P * np.sqrt(self.parent.N) / ( 1 + self.N) # 更新U

以上,就是自對弈訓練神經網路的全部內容了,關於中國象棋的實現部分請看項目代碼。

最後

我來說說訓練情況,因為是從白板一塊開始訓練,剛開始都是亂下,從亂下的經驗當中學會下棋是需要大量對弈才行的。解的空間是很稀疏的,相當於100個數據,有99個是負例,只有1個正例。論文中訓練了700K次的mini-batch,國際象棋開源項目chess-alpha-zero也訓練了10K次。我呢,訓練不到4K次,模型剛剛學會用象和士防守,總之仍然下棋很爛。如果您有條件可以再多訓練試試,我自從收到信用卡扣款400美元通知以後就把aws下線了:D 貧窮限制了我的想像力O(∩_∩)O

參考資料

  • 深入淺出看懂AlphaGo元
  • 深入淺出看懂AlphaGo如何下棋
  • 圍棋開源項目AlphaGOZero-python-tensorflow
  • TensorFlow多GPU並行計算實例---MNIST
  • 國際象棋開源項目chess-alpha-zero
  • FEN文件格式
  • 著法表示
  • 中國象棋通用引擎協議 版本:3.0
  • 五子棋開源項目AlphaZero_Gomoku
  • 黑白棋開源項目reversi-alpha-zero
  • 中國象棋開源項目IntelliChess
  • 中國象棋UI項目ChineseChess

今天的分享就到這裡,請多指教~

推薦閱讀:

TAG:AlphaGo | 強化學習 | AlphaZero |