莫煩老師,DQN代碼學習筆記
詳情請見莫煩老師DQN主頁:DQN 演算法更新 (Tensorflow) - 強化學習 Reinforcement Learning | 莫煩Python
莫煩老師代碼(沒有我繁瑣注釋代碼直通車):MorvanZhou/Reinforcement-learning-with-tensorflow
參考文獻:Playing Atari with Deep Reinforcement Learning
https://www.cs.toronto.edu/~vmnih/docs/dqn.pdf
第一次在網上寫文章,不知怎麼註明參考出處,如涉及侵權問題請評論告訴我。
本人初入強化學習,看莫煩老師的課受益匪淺,再次由衷的感謝老師的無私奉獻,筆芯?~
由於本人不懂的地方太多,所以注釋的比較多,當然也加入了老師的注釋。供以後學習參考,和其他小白一起進步。
建議參考DQN演算法的英文流程,我也不知道這種中文注釋能在哪裡更方便的上傳,就在這裡發好啦,這裡排版沒準亂如果真要參考,就拷到編輯器上吧
廢話不多說:
-----------這裡是run_this.py文件嘿嘿嘿-----------------------------------------------------
#更新的步驟
from maze_env import Maze
from RL_brain import DeepQNetwork#引入了自己寫的maze_env,RL_brain模塊中class maze,class DeepQNetwork
def run_maze():
step = 0#為了記錄當前走的第幾步,因為先要存儲一些記憶,當記憶庫中有一些東西的時候才去學習
for episode in range(300):
# initial observation
observation = env.reset()#環境給出初始坐標
while True:
# fresh env更新環境
env.render()
# RL choose action based on observation根據觀測值選擇一個動作
action = RL.choose_action(observation)
# RL take action and get next observation and reward選擇動作後得到觀測值,獎勵,是否終結done的信息
observation_, reward, done = env.step(action)
RL.store_transition(observation, action, reward, observation_)##重要:存儲記憶:現在這步觀測值,採取的動作。會得到的獎勵,採取行動後下一步觀測值
# 控制學習起始時間和頻率 (先累積一些記憶再開始學習)
if (step > 200) and (step % 5 == 0):#當步數大於兩百的時候才開始學習,每五步學習一次
RL.learn()
# swap observation
observation = observation_#更新觀測值
# break while loop when end of this episode
if done:
break
step += 1
# end of game
print(game over)
env.destroy()
if __name__ == "__main__":
# maze game
env = Maze()#引入環境
RL = DeepQNetwork(env.n_actions, env.n_features,
learning_rate=0.01,
reward_decay=0.9,
e_greedy=0.9,
replace_target_iter=200, # 每 200 步替換一次 target_net 的參數
memory_size=2000,# 記憶上限
output_graph=False # 是否輸出 tensorboard 文件
)
env.after(100, run_maze)
env.mainloop()
RL.plot_cost() # 觀看神經網路的誤差曲線
--------------這裡是RL_brain.py,核心啊,看了好幾天哭------------------------------------
import numpy as np
import tensorflow as tf
np.random.seed(1)
tf.set_random_seed(1)
# Deep Q Network off-policy
class DeepQNetwork:
def __init__(
self,
n_actions,#輸出多少個action的值
n_features,#接受多少個觀測值的相關特徵
learning_rate=0.01,#NN中learning_rate學習速率
reward_decay=0.9,#Q-learning中reward衰減因子
e_greedy=0.9,
replace_target_iter=300,#更新Q現實網路參數的步驟數
memory_size=500,#存儲記憶的數量
batch_size=32,#每次從記憶庫中取的樣本數量
e_greedy_increment=None,
output_graph=False,
):
self.n_actions = n_actions#由maze得4
self.n_features = n_features#由maze得2
self.lr = learning_rate
self.gamma = reward_decay
self.epsilon_max = e_greedy#
self.replace_target_iter = replace_target_iter#隔多少步後將target net 的參數更新為最新的參數
self.memory_size = memory_size#整個記憶庫的容量,即RL.store_transition(observation, action, reward, observation_)有多少條
self.batch_size = batch_size#隨機梯度下降SGD會用到
self.epsilon_increment = e_greedy_increment#表示不斷擴大epsilon,以便有更大的概率拿到好的值
self.epsilon = 0 if e_greedy_increment is not None else self.epsilon_max#如果e_greedy_increment沒有值,則self.epsilon設置為self.epsilon_max=0.9
# total learning step
self.learn_step_counter = 0#用這個記錄學習了多少步,可以讓self.epsilon根據這個步數來不斷提高
# initialize zero memory [s, a, r, s_]
self.memory = np.zeros((self.memory_size, n_features * 2 + 2))
#self.memory存儲記憶的表
#行(高度)為存儲記憶的數量
#列為(observation, action, reward, observation_)的長度
#對於一條記憶信息來說observation和observation_都有n_features的長度
#而action,reward都各自有一個單值信息
#則總列數為n_features+2+n_features
#創建 [target_net, evaluate_net]神經網路
self._build_net()
# 替換 target net 的參數
t_params = tf.get_collection(target_net_params)#tf.get_collection(key,scope=None)返回具有給定名稱的集合中的值列表
#如果未將值添加到該集合,則為空列表。該列表按照收集順序包含這些值。
e_params = tf.get_collection(eval_net_params)
self.replace_target_op = [tf.assign(t, e) for t, e in zip(t_params, e_params)]#tf.assign(ref,value,validate_shape=None,use_locking=None,name=None)
#該操作在賦值後輸出一個張量,該張量保存ref的新值。函數完成了將value賦值給ref的作用
#zip()函數用於將可迭代的對象作為參數,將對象中對應的元素打包成一個個元組,然後返回由這些元組組成的列表。
self.sess = tf.Session()
if output_graph:
# $ tensorboard --logdir=logs
tf.summary.FileWriter("logs/", self.sess.graph)
self.sess.run(tf.global_variables_initializer())
self.cost_his = []#為記錄每一步的誤差的一個cost表
def _build_net(self):#搭建網路,q_next, q_eval 包含所有 action 的值
# ------------------ build evaluate_net ------------------預測值網路具備最新參數,最後輸出q_eval
self.s = tf.placeholder(tf.float32, [None, self.n_features], name=s) # input輸入當前狀態,作為NN的輸入
self.q_target = tf.placeholder(tf.float32, [None, self.n_actions], name=Q_target) # for calculating loss輸入Q現實為了後面誤差計算反向傳遞
#NN輸出q_predict
with tf.variable_scope(eval_net):
#首先對圖層進行配置,w,b初始化,第一層網路的神經元數n_l1 #表示沒有[],()的換行
c_names, n_l1, w_initializer, b_initializer =
[eval_net_params, tf.GraphKeys.GLOBAL_VARIABLES], 10,
tf.random_normal_initializer(0., 0.3), tf.constant_initializer(0.1)
#c_name作為一個存儲變數的集合,其名稱為eval_net_params[],將q估計的參數都放入這個集合中
#Variable()構造函數或get_variable()自動將新變數添加到圖形集合:GraphKeys.GLOBAL_VARIABLES(默認)。
#這個方便函數返回該集合的內容。
#用於調用參數,將q估計的參數都放在collection這個集合當中
#tf.random_normal_initializer(mean=0.0, stddev=1.0, seed=None, dtype=tf.float32)
#返回一個生成具有正態分布的張量的初始化器
#tf.constant_initializer可以簡寫為tf.Constant()初始化為常數
#tf.zeros_initializer()也可以簡寫為tf.Zeros()
#tf.ones_initializer(), 也可以簡寫為tf.Ones()
# 第一層. collections is used later when assign to target net
with tf.variable_scope(l1):
w1 = tf.get_variable(w1, [self.n_features, n_l1], initializer=w_initializer, collections=c_names)
b1 = tf.get_variable(b1, [1, n_l1], initializer=b_initializer, collections=c_names)
#創建一個變數對於get_variable(),來說,如果已經創建的變數對象,就把那個對象返回,如果沒有創建變數對象的話,就創建一個新的。
#tf.get_variable(name, shape=None, dtype=None,
# initializer=None, regularizer=None,
# trainable=True, collections=None, 這個collection表示The new variable is added to these collections
# caching_device=None, partitioner=None,
# validate_shape=True, custom_getter=None)
l1 = tf.nn.relu(tf.matmul(self.s, w1) + b1)#python有廣播功能,l1輸出維度[None,n_l1]
# 第二層. collections is used later when assign to target net
with tf.variable_scope(l2):
w2 = tf.get_variable(w2, [n_l1, self.n_actions], initializer=w_initializer, collections=c_names)
b2 = tf.get_variable(b2, [1, self.n_actions], initializer=b_initializer, collections=c_names)
self.q_eval = tf.matmul(l1, w2) + b2#L2輸出q估計維度[None,self_action]
with tf.variable_scope(loss):
self.loss = tf.reduce_mean(tf.squared_difference(self.q_target, self.q_eval))#基於Q估計與Q現實,構造loss-function
with tf.variable_scope(train):
self._train_op = tf.train.RMSPropOptimizer(self.lr).minimize(self.loss)#進行訓練
# ------------------ build target_net ------------------目標值網路預測Qtarget參數是以前的,最後輸出q_next
self.s_ = tf.placeholder(tf.float32, [None, self.n_features], name=s_) # input s_表示下一狀態,用下一狀態計算q_target
with tf.variable_scope(target_net):
# c_names(collections_names) are the collections to store variables 將q現實的參數都放入這個集合中
c_names = [target_net_params, tf.GraphKeys.GLOBAL_VARIABLES]
# first layer. collections is used later when assign to target net
with tf.variable_scope(l1):#這裡與前面的網路結構要相同,只不過存儲的參數不同,因為後面不會訓練這個target網路,只是單獨更新估計網路
w1 = tf.get_variable(w1, [self.n_features, n_l1], initializer=w_initializer, collections=c_names)
b1 = tf.get_variable(b1, [1, n_l1], initializer=b_initializer, collections=c_names)
l1 = tf.nn.relu(tf.matmul(self.s_, w1) + b1)
# second layer. collections is used later when assign to target net
with tf.variable_scope(l2):
w2 = tf.get_variable(w2, [n_l1, self.n_actions], initializer=w_initializer, collections=c_names)
b2 = tf.get_variable(b2, [1, self.n_actions], initializer=b_initializer, collections=c_names)
self.q_next = tf.matmul(l1, w2) + b2
def store_transition(self, s, a, r, s_):#存儲記憶,類里定義的函數語法規定第一個要為self
if not hasattr(self, memory_counter):#hasattr(object, name)判斷一個對象裡面是否有name屬性或者name方法,返回BOOL值,初始不存在這個索引項,創建
#判斷self對象有name特性返回True, 否則返回False。即沒有這個索引值memory_counter,則令self.memory_counter=0
self.memory_counter = 0
transition = np.hstack((s, [a, r], s_))#numpy.hstack(tup)參數tup可以是元組,列表,或者numpy數組,返回結果為按順序堆疊numpy的數組(按列堆疊一個)。
# replace the old memory with new memory
index = self.memory_counter % self.memory_size# 總 memory 大小是固定的, 如果超出總大小, 取index為語數,舊 memory 就被新 memory 替換
self.memory[index, :] = transition#將memory中的index這一行用transition這個一行的數組代替
self.memory_counter += 1#
def choose_action(self, observation):#選擇動作
# to have batch dimension when feed into tf placeholder
observation = observation[np.newaxis, :]#因為observation加入時是一維的數值
#np.newaxis 為 numpy.ndarray(多維數組)增加一個軸,多加入了一個行軸
if np.random.uniform() < self.epsilon:#np.random.uniform生成均勻分布的隨機數,默認0-1,大概率選擇actions_value最大下的動作
# forward feed the observation and get q value for every actions
actions_value = self.sess.run(self.q_eval, feed_dict={self.s: observation})
action = np.argmax(actions_value)
else:
action = np.random.randint(0, self.n_actions)#小概率隨機選擇actions_value下的一個動作,np.random.randint用於生成一個指定範圍內的整數
return action
def learn(self):#如何學習, 更新參數的. 這裡涉及了 target_net 和 eval_net 的交互使用.
# check to replace target parameters
if self.learn_step_counter % self.replace_target_iter == 0:#提前檢測是否替換 target_net 參數,self.learn_step_counter記錄了步數
#隔self.replace_target_iter後將target net 的參數更新為最新的參數
self.sess.run(self.replace_target_op)
print(
target_params_replaced
)
# sample batch memory from all memory調用記憶庫中的記憶,從 memory 中隨機抽取 batch_size 這麼多記憶,下面都是
if self.memory_counter > self.memory_size:#如果需要記憶的步數超過記憶庫容量
sample_index = np.random.choice(self.memory_size, size=self.batch_size)#從給定的一維陣列self.memory_size生成一個隨機樣本,size為Output shape.
else:
sample_index = np.random.choice(self.memory_counter, size=self.batch_size)#步數未超過記憶總容量,則最多在self.memory_counter個記憶值中選擇32個索引數值
batch_memory = self.memory[sample_index, :]#上面都是從多少個(一維)數中選出多少個數(一維),這裡,抽取記憶表self.memory中前sample_index行
q_next, q_eval = self.sess.run( #運行這兩個神經網路
[self.q_next, self.q_eval],
feed_dict={#前面store_transition(observation, action, reward, observation_)在run_this里都是參數
self.s_: batch_memory[:, -self.n_features:], # fixed params,q_next由目標值網路用記憶庫中倒數n_features個列(observation_)的值做輸入
self.s: batch_memory[:, :self.n_features], # newest params,q_eval由預測值網路用記憶庫中正數n_features個列(observation)的值做輸入
})
# change q_target w.r.t q_evals action
q_target = q_eval.copy()#淺拷貝,父目錄複製,子目錄會隨原始改變而變化,
#q_next, q_eval 包含所有 action 的值,而我們需要的只是已經選擇好的 action 的值, 其他的並不需要.
# 所以我們將其他的 action 值全變成 0, 將用到的 action 誤差值 反向傳遞迴去, 作為更新憑據.
# 這是我們最終要達到的樣子, 比如 q_target - q_eval = [1, 0, 0] - [-1, 0, 0] = [2, 0, 0]
# q_eval = [-1, 0, 0] 表示這一個記憶中有我選用過 action 0, 而 action 0 帶來的 Q(s, a0) = -1, 所以其他的 Q(s, a1) = Q(s, a2) = 0.
# q_target = [1, 0, 0] 表示這個記憶中的 r+gamma*maxQ(s_) = 1, 而且不管在 s_ 上我們取了哪個 action,我們都需要對應上 q_eval 中的 action 位置, 所以將 1 放在了 action 0 的位置.
# 下面也是為了達到上面說的目的, 不過為了更方面讓程序運算, 達到目的的過程有點不同.
# 是將 q_eval 全部賦值給 q_target, 這時 q_target-q_eval 全為 0,
# 不過 我們再根據 batch_memory 當中的 action 這個 column 來給 q_target 中的對應的 memory-action 位置來修改賦值.
# 使新的賦值為 reward + gamma * maxQ(s_), 這樣 q_target-q_eval 就可以變成我們所需的樣子.
# 具體在下面還有一個舉例說明.
batch_index = np.arange(self.batch_size, dtype=np.int32)#返回一個長度為self.batch_size的索引值列表aray([0,1,2,...,31])
eval_act_index = batch_memory[:, self.n_features].astype(int)#返回一個長度為32的動作列表,從記憶庫batch_memory中的標記的第2列,self.n_features=2
#即RL.store_transition(observation, action, reward, observation_)中的action,注意從0開始記,所以eval_act_index得到的是action那一列
reward = batch_memory[:, self.n_features + 1]#返回一個長度為32獎勵的列表,提取出記憶庫中的reward
q_target[batch_index, eval_act_index] = reward + self.gamma * np.max(q_next, axis=1)#返回一個32*4的ndarray數組形式,q_next由target網路輸出(樣本數*4),前面從記憶庫中取了32個輸入到網路中
#np.max(q_next, axis=1)取出q_next這個32*4中每行中取出最大的一個,返回1*32
#這個相當於將q_target按[batch_index, eval_act_index]索引計算出相應位置的q—_target值
#這裡q_target複製了q_eval可以實現下面相減之後得0
"""
假如在這個 batch 中, 我們有2個提取的記憶, 根據每個記憶可以生產3個 action 的值:
q_eval =
[[1, 2, 3],
[4, 5, 6]]
q_target = q_eval =
[[1, 2, 3],
[4, 5, 6]]
然後根據 memory 當中的具體 action 位置來修改 q_target 對應 action 上的值:q_target[batch_index, eval_act_index] = reward + self.gamma * np.max(q_next, axis=1)
比如在:
記憶 0 的 q_target 計算值是 -1, 而且我用了 action 0;
記憶 1 的 q_target 計算值是 -2, 而且我用了 action 2:
q_target =
[[-1, 2, 3],
[4, 5, -2]]
所以 (q_target - q_eval) 就變成了:
[[(-1)-(1), 0, 0],
[0, 0, (-2)-(6)]]
"""
#最後我們將這個 (q_target - q_eval) 當成誤差, 反向傳遞會神經網路.所有為 0 的 action 值是當時沒有選擇的 action, 之前有選擇的 action 才有不為0的值.
#我們只反向傳遞之前選擇的 action 的值,
# train eval network
_, self.cost = self.sess.run([self._train_op, self.loss],feed_dict={self.s: batch_memory[:, :self.n_features],self.q_target: q_target})
self.cost_his.append(self.cost)
# increasing epsilon提高選擇正確的概率,直到self.epsilon_max
self.epsilon = self.epsilon + self.epsilon_increment if self.epsilon < self.epsilon_max else self.epsilon_max
self.learn_step_counter += 1
def plot_cost(self):#展示學習曲線
import matplotlib.pyplot as plt
plt.plot(np.arange(len(self.cost_his)), self.cost_his)#arange函數用於創建等差數組,arange返回的是一個array類型的數據
#在for循環中,用到range,而range返回的是list。
plt.ylabel(Cost)
plt.xlabel(training steps)
plt.show()
---------為代碼完整性,這裡是maze_env.py沒注釋,二維迷宮設定------------------------
import numpy as np
import time
import sys
if sys.version_info.major == 2:
import Tkinter as tk
else:
import tkinter as tk
UNIT = 40 # pixels
MAZE_H = 4 # grid height
MAZE_W = 4 # grid width
class Maze(tk.Tk, object):
def __init__(self):
super(Maze, self).__init__()
self.action_space = [u, d, l, r]
self.n_actions = len(self.action_space)
self.n_features = 2
self.title(maze)
self.geometry({0}x{1}.format(MAZE_H * UNIT, MAZE_H * UNIT))
self._build_maze()
def _build_maze(self):
self.canvas = tk.Canvas(self, bg=white,
height=MAZE_H * UNIT,
width_=MAZE_W * UNIT)
# create grids
for c in range(0, MAZE_W * UNIT, UNIT):
x0, y0, x1, y1 = c, 0, c, MAZE_H * UNIT
self.canvas.create_line(x0, y0, x1, y1)
for r in range(0, MAZE_H * UNIT, UNIT):
x0, y0, x1, y1 = 0, r, MAZE_H * UNIT, r
self.canvas.create_line(x0, y0, x1, y1)
# create origin
origin = np.array([20, 20])
# hell
hell1_center = origin + np.array([UNIT * 2, UNIT])
self.hell1 = self.canvas.create_rectangle(
hell1_center[0] - 15, hell1_center[1] - 15,
hell1_center[0] + 15, hell1_center[1] + 15,
fill=black)
# hell
# hell2_center = origin + np.array([UNIT, UNIT * 2])
# self.hell2 = self.canvas.create_rectangle(
# hell2_center[0] - 15, hell2_center[1] - 15,
# hell2_center[0] + 15, hell2_center[1] + 15,
# fill=black)
# create oval
oval_center = origin + UNIT * 2
self.oval = self.canvas.create_oval(
oval_center[0] - 15, oval_center[1] - 15,
oval_center[0] + 15, oval_center[1] + 15,
fill=yellow)
# create red rect
self.rect = self.canvas.create_rectangle(
origin[0] - 15, origin[1] - 15,
origin[0] + 15, origin[1] + 15,
fill=red)
# pack all
self.canvas.pack()
def reset(self):
self.update()
time.sleep(0.1)
self.canvas.delete(self.rect)
origin = np.array([20, 20])
self.rect = self.canvas.create_rectangle(
origin[0] - 15, origin[1] - 15,
origin[0] + 15, origin[1] + 15,
fill=red)
# return observation
return (np.array(self.canvas.coords(self.rect)[:2]) - np.array(self.canvas.coords(self.oval)[:2]))/(MAZE_H*UNIT)
def step(self, action):
s = self.canvas.coords(self.rect)
base_action = np.array([0, 0])
if action == 0: # up
if s[1] > UNIT:
base_action[1] -= UNIT
elif action == 1: # down
if s[1] < (MAZE_H - 1) * UNIT:
base_action[1] += UNIT
elif action == 2: # right
if s[0] < (MAZE_W - 1) * UNIT:
base_action[0] += UNIT
elif action == 3: # left
if s[0] > UNIT:
base_action[0] -= UNIT
self.canvas.move(self.rect, base_action[0], base_action[1]) # move agent
next_coords = self.canvas.coords(self.rect) # next state
# reward function
if next_coords == self.canvas.coords(self.oval):
reward = 1
done = True
elif next_coords in [self.canvas.coords(self.hell1)]:
reward = -1
done = True
else:
reward = 0
done = False
s_ = (np.array(next_coords[:2]) - np.array(self.canvas.coords(self.oval)[:2]))/(MAZE_H*UNIT)
return s_, reward, done
def render(self):
# time.sleep(0.01)
self.update()
推薦閱讀:
※R語言學習筆記(八):功效分析
※《Go程序設計語言》筆記四
※《一生的讀書計劃》
※如何創建一個server伺服器
※筆記-ubuntu14.04安裝SciPy
TAG:筆記 |