標籤:

莫煩老師,DQN代碼學習筆記

詳情請見莫煩老師DQN主頁:DQN 演算法更新 (Tensorflow) - 強化學習 Reinforcement Learning | 莫煩Python

莫煩老師代碼(沒有我繁瑣注釋代碼直通車):MorvanZhou/Reinforcement-learning-with-tensorflow

參考文獻:Playing Atari with Deep Reinforcement Learning

cs.toronto.edu/~vmnih/d

第一次在網上寫文章,不知怎麼註明參考出處,如涉及侵權問題請評論告訴我。

本人初入強化學習,看莫煩老師的課受益匪淺,再次由衷的感謝老師的無私奉獻,筆芯?~

由於本人不懂的地方太多,所以注釋的比較多,當然也加入了老師的注釋。供以後學習參考,和其他小白一起進步。

建議參考DQN演算法的英文流程,我也不知道這種中文注釋能在哪裡更方便的上傳,就在這裡發好啦,這裡排版沒準亂如果真要參考,就拷到編輯器上吧

廢話不多說:


-----------這裡是run_this.py文件嘿嘿嘿-----------------------------------------------------

#更新的步驟

from maze_env import Maze

from RL_brain import DeepQNetwork#引入了自己寫的maze_env,RL_brain模塊中class maze,class DeepQNetwork

def run_maze():

step = 0#為了記錄當前走的第幾步,因為先要存儲一些記憶,當記憶庫中有一些東西的時候才去學習

for episode in range(300):

# initial observation

observation = env.reset()#環境給出初始坐標

while True:

# fresh env更新環境

env.render()

# RL choose action based on observation根據觀測值選擇一個動作

action = RL.choose_action(observation)

# RL take action and get next observation and reward選擇動作後得到觀測值,獎勵,是否終結done的信息

observation_, reward, done = env.step(action)

RL.store_transition(observation, action, reward, observation_)##重要:存儲記憶:現在這步觀測值,採取的動作。會得到的獎勵,採取行動後下一步觀測值

# 控制學習起始時間和頻率 (先累積一些記憶再開始學習)

if (step > 200) and (step % 5 == 0):#當步數大於兩百的時候才開始學習,每五步學習一次

RL.learn()

# swap observation

observation = observation_#更新觀測值

# break while loop when end of this episode

if done:

break

step += 1

# end of game

print(game over)

env.destroy()

if __name__ == "__main__":

# maze game

env = Maze()#引入環境

RL = DeepQNetwork(env.n_actions, env.n_features,

learning_rate=0.01,

reward_decay=0.9,

e_greedy=0.9,

replace_target_iter=200, # 每 200 步替換一次 target_net 的參數

memory_size=2000,# 記憶上限

output_graph=False # 是否輸出 tensorboard 文件

)

env.after(100, run_maze)

env.mainloop()

RL.plot_cost() # 觀看神經網路的誤差曲線


--------------這裡是RL_brain.py,核心啊,看了好幾天哭------------------------------------

import numpy as np

import tensorflow as tf

np.random.seed(1)

tf.set_random_seed(1)

# Deep Q Network off-policy

class DeepQNetwork:

def __init__(

self,

n_actions,#輸出多少個action的值

n_features,#接受多少個觀測值的相關特徵

learning_rate=0.01,#NN中learning_rate學習速率

reward_decay=0.9,#Q-learning中reward衰減因子

e_greedy=0.9,

replace_target_iter=300,#更新Q現實網路參數的步驟數

memory_size=500,#存儲記憶的數量

batch_size=32,#每次從記憶庫中取的樣本數量

e_greedy_increment=None,

output_graph=False,

):

self.n_actions = n_actions#由maze得4

self.n_features = n_features#由maze得2

self.lr = learning_rate

self.gamma = reward_decay

self.epsilon_max = e_greedy#

self.replace_target_iter = replace_target_iter#隔多少步後將target net 的參數更新為最新的參數

self.memory_size = memory_size#整個記憶庫的容量,即RL.store_transition(observation, action, reward, observation_)有多少條

self.batch_size = batch_size#隨機梯度下降SGD會用到

self.epsilon_increment = e_greedy_increment#表示不斷擴大epsilon,以便有更大的概率拿到好的值

self.epsilon = 0 if e_greedy_increment is not None else self.epsilon_max#如果e_greedy_increment沒有值,則self.epsilon設置為self.epsilon_max=0.9

# total learning step

self.learn_step_counter = 0#用這個記錄學習了多少步,可以讓self.epsilon根據這個步數來不斷提高

# initialize zero memory [s, a, r, s_]

self.memory = np.zeros((self.memory_size, n_features * 2 + 2))

#self.memory存儲記憶的表

#行(高度)為存儲記憶的數量

#列為(observation, action, reward, observation_)的長度

#對於一條記憶信息來說observation和observation_都有n_features的長度

#而action,reward都各自有一個單值信息

#則總列數為n_features+2+n_features

#創建 [target_net, evaluate_net]神經網路

self._build_net()

# 替換 target net 的參數

t_params = tf.get_collection(target_net_params)#tf.get_collection(key,scope=None)返回具有給定名稱的集合中的值列表

#如果未將值添加到該集合,則為空列表。該列表按照收集順序包含這些值。

e_params = tf.get_collection(eval_net_params)

self.replace_target_op = [tf.assign(t, e) for t, e in zip(t_params, e_params)]#tf.assign(ref,value,validate_shape=None,use_locking=None,name=None)

#該操作在賦值後輸出一個張量,該張量保存ref的新值。函數完成了將value賦值給ref的作用

#zip()函數用於將可迭代的對象作為參數,將對象中對應的元素打包成一個個元組,然後返回由這些元組組成的列表。

self.sess = tf.Session()

if output_graph:

# $ tensorboard --logdir=logs

tf.summary.FileWriter("logs/", self.sess.graph)

self.sess.run(tf.global_variables_initializer())

self.cost_his = []#為記錄每一步的誤差的一個cost表

def _build_net(self):#搭建網路,q_next, q_eval 包含所有 action 的值

# ------------------ build evaluate_net ------------------預測值網路具備最新參數,最後輸出q_eval

self.s = tf.placeholder(tf.float32, [None, self.n_features], name=s) # input輸入當前狀態,作為NN的輸入

self.q_target = tf.placeholder(tf.float32, [None, self.n_actions], name=Q_target) # for calculating loss輸入Q現實為了後面誤差計算反向傳遞

#NN輸出q_predict

with tf.variable_scope(eval_net):

#首先對圖層進行配置,w,b初始化,第一層網路的神經元數n_l1 #表示沒有[],()的換行

c_names, n_l1, w_initializer, b_initializer =

[eval_net_params, tf.GraphKeys.GLOBAL_VARIABLES], 10,

tf.random_normal_initializer(0., 0.3), tf.constant_initializer(0.1)

#c_name作為一個存儲變數的集合,其名稱為eval_net_params[],將q估計的參數都放入這個集合中

#Variable()構造函數或get_variable()自動將新變數添加到圖形集合:GraphKeys.GLOBAL_VARIABLES(默認)。

#這個方便函數返回該集合的內容。

#用於調用參數,將q估計的參數都放在collection這個集合當中

#tf.random_normal_initializer(mean=0.0, stddev=1.0, seed=None, dtype=tf.float32)

#返回一個生成具有正態分布的張量的初始化器

#tf.constant_initializer可以簡寫為tf.Constant()初始化為常數

#tf.zeros_initializer()也可以簡寫為tf.Zeros()

#tf.ones_initializer(), 也可以簡寫為tf.Ones()

# 第一層. collections is used later when assign to target net

with tf.variable_scope(l1):

w1 = tf.get_variable(w1, [self.n_features, n_l1], initializer=w_initializer, collections=c_names)

b1 = tf.get_variable(b1, [1, n_l1], initializer=b_initializer, collections=c_names)

#創建一個變數對於get_variable(),來說,如果已經創建的變數對象,就把那個對象返回,如果沒有創建變數對象的話,就創建一個新的。

#tf.get_variable(name, shape=None, dtype=None,

# initializer=None, regularizer=None,

# trainable=True, collections=None, 這個collection表示The new variable is added to these collections

# caching_device=None, partitioner=None,

# validate_shape=True, custom_getter=None)

l1 = tf.nn.relu(tf.matmul(self.s, w1) + b1)#python有廣播功能,l1輸出維度[None,n_l1]

# 第二層. collections is used later when assign to target net

with tf.variable_scope(l2):

w2 = tf.get_variable(w2, [n_l1, self.n_actions], initializer=w_initializer, collections=c_names)

b2 = tf.get_variable(b2, [1, self.n_actions], initializer=b_initializer, collections=c_names)

self.q_eval = tf.matmul(l1, w2) + b2#L2輸出q估計維度[None,self_action]

with tf.variable_scope(loss):

self.loss = tf.reduce_mean(tf.squared_difference(self.q_target, self.q_eval))#基於Q估計與Q現實,構造loss-function

with tf.variable_scope(train):

self._train_op = tf.train.RMSPropOptimizer(self.lr).minimize(self.loss)#進行訓練

# ------------------ build target_net ------------------目標值網路預測Qtarget參數是以前的,最後輸出q_next

self.s_ = tf.placeholder(tf.float32, [None, self.n_features], name=s_) # input s_表示下一狀態,用下一狀態計算q_target

with tf.variable_scope(target_net):

# c_names(collections_names) are the collections to store variables 將q現實的參數都放入這個集合中

c_names = [target_net_params, tf.GraphKeys.GLOBAL_VARIABLES]

# first layer. collections is used later when assign to target net

with tf.variable_scope(l1):#這裡與前面的網路結構要相同,只不過存儲的參數不同,因為後面不會訓練這個target網路,只是單獨更新估計網路

w1 = tf.get_variable(w1, [self.n_features, n_l1], initializer=w_initializer, collections=c_names)

b1 = tf.get_variable(b1, [1, n_l1], initializer=b_initializer, collections=c_names)

l1 = tf.nn.relu(tf.matmul(self.s_, w1) + b1)

# second layer. collections is used later when assign to target net

with tf.variable_scope(l2):

w2 = tf.get_variable(w2, [n_l1, self.n_actions], initializer=w_initializer, collections=c_names)

b2 = tf.get_variable(b2, [1, self.n_actions], initializer=b_initializer, collections=c_names)

self.q_next = tf.matmul(l1, w2) + b2

def store_transition(self, s, a, r, s_):#存儲記憶,類里定義的函數語法規定第一個要為self

if not hasattr(self, memory_counter):#hasattr(object, name)判斷一個對象裡面是否有name屬性或者name方法,返回BOOL值,初始不存在這個索引項,創建

#判斷self對象有name特性返回True, 否則返回False。即沒有這個索引值memory_counter,則令self.memory_counter=0

self.memory_counter = 0

transition = np.hstack((s, [a, r], s_))#numpy.hstack(tup)參數tup可以是元組,列表,或者numpy數組,返回結果為按順序堆疊numpy的數組(按列堆疊一個)。

# replace the old memory with new memory

index = self.memory_counter % self.memory_size# 總 memory 大小是固定的, 如果超出總大小, 取index為語數,舊 memory 就被新 memory 替換

self.memory[index, :] = transition#將memory中的index這一行用transition這個一行的數組代替

self.memory_counter += 1#

def choose_action(self, observation):#選擇動作

# to have batch dimension when feed into tf placeholder

observation = observation[np.newaxis, :]#因為observation加入時是一維的數值

#np.newaxis 為 numpy.ndarray(多維數組)增加一個軸,多加入了一個行軸

if np.random.uniform() < self.epsilon:#np.random.uniform生成均勻分布的隨機數,默認0-1,大概率選擇actions_value最大下的動作

# forward feed the observation and get q value for every actions

actions_value = self.sess.run(self.q_eval, feed_dict={self.s: observation})

action = np.argmax(actions_value)

else:

action = np.random.randint(0, self.n_actions)#小概率隨機選擇actions_value下的一個動作,np.random.randint用於生成一個指定範圍內的整數

return action

def learn(self):#如何學習, 更新參數的. 這裡涉及了 target_net 和 eval_net 的交互使用.

# check to replace target parameters

if self.learn_step_counter % self.replace_target_iter == 0:#提前檢測是否替換 target_net 參數,self.learn_step_counter記錄了步數

#隔self.replace_target_iter後將target net 的參數更新為最新的參數

self.sess.run(self.replace_target_op)

print(
target_params_replaced
)

# sample batch memory from all memory調用記憶庫中的記憶,從 memory 中隨機抽取 batch_size 這麼多記憶,下面都是

if self.memory_counter > self.memory_size:#如果需要記憶的步數超過記憶庫容量

sample_index = np.random.choice(self.memory_size, size=self.batch_size)#從給定的一維陣列self.memory_size生成一個隨機樣本,size為Output shape.

else:

sample_index = np.random.choice(self.memory_counter, size=self.batch_size)#步數未超過記憶總容量,則最多在self.memory_counter個記憶值中選擇32個索引數值

batch_memory = self.memory[sample_index, :]#上面都是從多少個(一維)數中選出多少個數(一維),這裡,抽取記憶表self.memory中前sample_index行

q_next, q_eval = self.sess.run( #運行這兩個神經網路

[self.q_next, self.q_eval],

feed_dict={#前面store_transition(observation, action, reward, observation_)在run_this里都是參數

self.s_: batch_memory[:, -self.n_features:], # fixed params,q_next由目標值網路用記憶庫中倒數n_features個列(observation_)的值做輸入

self.s: batch_memory[:, :self.n_features], # newest params,q_eval由預測值網路用記憶庫中正數n_features個列(observation)的值做輸入

})

# change q_target w.r.t q_evals action

q_target = q_eval.copy()#淺拷貝,父目錄複製,子目錄會隨原始改變而變化,

#q_next, q_eval 包含所有 action 的值,而我們需要的只是已經選擇好的 action 的值, 其他的並不需要.

# 所以我們將其他的 action 值全變成 0, 將用到的 action 誤差值 反向傳遞迴去, 作為更新憑據.

# 這是我們最終要達到的樣子, 比如 q_target - q_eval = [1, 0, 0] - [-1, 0, 0] = [2, 0, 0]

# q_eval = [-1, 0, 0] 表示這一個記憶中有我選用過 action 0, 而 action 0 帶來的 Q(s, a0) = -1, 所以其他的 Q(s, a1) = Q(s, a2) = 0.

# q_target = [1, 0, 0] 表示這個記憶中的 r+gamma*maxQ(s_) = 1, 而且不管在 s_ 上我們取了哪個 action,我們都需要對應上 q_eval 中的 action 位置, 所以將 1 放在了 action 0 的位置.

# 下面也是為了達到上面說的目的, 不過為了更方面讓程序運算, 達到目的的過程有點不同.

# 是將 q_eval 全部賦值給 q_target, 這時 q_target-q_eval 全為 0,

# 不過 我們再根據 batch_memory 當中的 action 這個 column 來給 q_target 中的對應的 memory-action 位置來修改賦值.

# 使新的賦值為 reward + gamma * maxQ(s_), 這樣 q_target-q_eval 就可以變成我們所需的樣子.

# 具體在下面還有一個舉例說明.

batch_index = np.arange(self.batch_size, dtype=np.int32)#返回一個長度為self.batch_size的索引值列表aray([0,1,2,...,31])

eval_act_index = batch_memory[:, self.n_features].astype(int)#返回一個長度為32的動作列表,從記憶庫batch_memory中的標記的第2列,self.n_features=2

#即RL.store_transition(observation, action, reward, observation_)中的action,注意從0開始記,所以eval_act_index得到的是action那一列

reward = batch_memory[:, self.n_features + 1]#返回一個長度為32獎勵的列表,提取出記憶庫中的reward

q_target[batch_index, eval_act_index] = reward + self.gamma * np.max(q_next, axis=1)#返回一個32*4的ndarray數組形式,q_next由target網路輸出(樣本數*4),前面從記憶庫中取了32個輸入到網路中

#np.max(q_next, axis=1)取出q_next這個32*4中每行中取出最大的一個,返回1*32

#這個相當於將q_target按[batch_index, eval_act_index]索引計算出相應位置的q—_target值

#這裡q_target複製了q_eval可以實現下面相減之後得0

"""

假如在這個 batch 中, 我們有2個提取的記憶, 根據每個記憶可以生產3個 action 的值:

q_eval =

[[1, 2, 3],

[4, 5, 6]]

q_target = q_eval =

[[1, 2, 3],

[4, 5, 6]]

然後根據 memory 當中的具體 action 位置來修改 q_target 對應 action 上的值:q_target[batch_index, eval_act_index] = reward + self.gamma * np.max(q_next, axis=1)

比如在:

記憶 0 的 q_target 計算值是 -1, 而且我用了 action 0;

記憶 1 的 q_target 計算值是 -2, 而且我用了 action 2:

q_target =

[[-1, 2, 3],

[4, 5, -2]]

所以 (q_target - q_eval) 就變成了:

[[(-1)-(1), 0, 0],

[0, 0, (-2)-(6)]]

"""

#最後我們將這個 (q_target - q_eval) 當成誤差, 反向傳遞會神經網路.所有為 0 的 action 值是當時沒有選擇的 action, 之前有選擇的 action 才有不為0的值.

#我們只反向傳遞之前選擇的 action 的值,

# train eval network

_, self.cost = self.sess.run([self._train_op, self.loss],feed_dict={self.s: batch_memory[:, :self.n_features],self.q_target: q_target})

self.cost_his.append(self.cost)

# increasing epsilon提高選擇正確的概率,直到self.epsilon_max

self.epsilon = self.epsilon + self.epsilon_increment if self.epsilon < self.epsilon_max else self.epsilon_max

self.learn_step_counter += 1

def plot_cost(self):#展示學習曲線

import matplotlib.pyplot as plt

plt.plot(np.arange(len(self.cost_his)), self.cost_his)#arange函數用於創建等差數組,arange返回的是一個array類型的數據

#在for循環中,用到range,而range返回的是list。

plt.ylabel(Cost)

plt.xlabel(training steps)

plt.show()


---------為代碼完整性,這裡是maze_env.py沒注釋,二維迷宮設定------------------------

import numpy as np

import time

import sys

if sys.version_info.major == 2:

import Tkinter as tk

else:

import tkinter as tk

UNIT = 40 # pixels

MAZE_H = 4 # grid height

MAZE_W = 4 # grid width

class Maze(tk.Tk, object):

def __init__(self):

super(Maze, self).__init__()

self.action_space = [u, d, l, r]

self.n_actions = len(self.action_space)

self.n_features = 2

self.title(maze)

self.geometry({0}x{1}.format(MAZE_H * UNIT, MAZE_H * UNIT))

self._build_maze()

def _build_maze(self):

self.canvas = tk.Canvas(self, bg=white,

height=MAZE_H * UNIT,

width_=MAZE_W * UNIT)

# create grids

for c in range(0, MAZE_W * UNIT, UNIT):

x0, y0, x1, y1 = c, 0, c, MAZE_H * UNIT

self.canvas.create_line(x0, y0, x1, y1)

for r in range(0, MAZE_H * UNIT, UNIT):

x0, y0, x1, y1 = 0, r, MAZE_H * UNIT, r

self.canvas.create_line(x0, y0, x1, y1)

# create origin

origin = np.array([20, 20])

# hell

hell1_center = origin + np.array([UNIT * 2, UNIT])

self.hell1 = self.canvas.create_rectangle(

hell1_center[0] - 15, hell1_center[1] - 15,

hell1_center[0] + 15, hell1_center[1] + 15,

fill=black)

# hell

# hell2_center = origin + np.array([UNIT, UNIT * 2])

# self.hell2 = self.canvas.create_rectangle(

# hell2_center[0] - 15, hell2_center[1] - 15,

# hell2_center[0] + 15, hell2_center[1] + 15,

# fill=black)

# create oval

oval_center = origin + UNIT * 2

self.oval = self.canvas.create_oval(

oval_center[0] - 15, oval_center[1] - 15,

oval_center[0] + 15, oval_center[1] + 15,

fill=yellow)

# create red rect

self.rect = self.canvas.create_rectangle(

origin[0] - 15, origin[1] - 15,

origin[0] + 15, origin[1] + 15,

fill=red)

# pack all

self.canvas.pack()

def reset(self):

self.update()

time.sleep(0.1)

self.canvas.delete(self.rect)

origin = np.array([20, 20])

self.rect = self.canvas.create_rectangle(

origin[0] - 15, origin[1] - 15,

origin[0] + 15, origin[1] + 15,

fill=red)

# return observation

return (np.array(self.canvas.coords(self.rect)[:2]) - np.array(self.canvas.coords(self.oval)[:2]))/(MAZE_H*UNIT)

def step(self, action):

s = self.canvas.coords(self.rect)

base_action = np.array([0, 0])

if action == 0: # up

if s[1] > UNIT:

base_action[1] -= UNIT

elif action == 1: # down

if s[1] < (MAZE_H - 1) * UNIT:

base_action[1] += UNIT

elif action == 2: # right

if s[0] < (MAZE_W - 1) * UNIT:

base_action[0] += UNIT

elif action == 3: # left

if s[0] > UNIT:

base_action[0] -= UNIT

self.canvas.move(self.rect, base_action[0], base_action[1]) # move agent

next_coords = self.canvas.coords(self.rect) # next state

# reward function

if next_coords == self.canvas.coords(self.oval):

reward = 1

done = True

elif next_coords in [self.canvas.coords(self.hell1)]:

reward = -1

done = True

else:

reward = 0

done = False

s_ = (np.array(next_coords[:2]) - np.array(self.canvas.coords(self.oval)[:2]))/(MAZE_H*UNIT)

return s_, reward, done

def render(self):

# time.sleep(0.01)

self.update()


推薦閱讀:

R語言學習筆記(八):功效分析
《Go程序設計語言》筆記四
《一生的讀書計劃》
如何創建一個server伺服器
筆記-ubuntu14.04安裝SciPy

TAG:筆記 |