強化學習快速上手:編寫自定義通用gym環境類+主流開源強化學習框架調用

前言

相信很多同學接觸強化學習都是從使用OpenAI提供的gym示例開始,跟著講義一步步開發自己的演算法程序。這個過程雖然能夠幫助我們熟悉強化學習的理論基礎,卻有著陡峭的學習曲線,需要耗費大量的時間精力。對於那些僅僅是想藉助強化學習解決問題的同學來說,這個過程顯得過於漫長了。熟悉機器學習那一套的同學都知道,通常我們僅僅需要將數據組織成合適的格式,調用scikit-learn中的演算法就可以了。類似地,由於強化學習的演算法大多已經有成熟的開源框架可用,我們僅僅需要定義具體的研究對象即可。


預備

  1. 強化學習基本知識:智能體agent與環境environment、狀態states、動作actions、回報rewards等等,網上都有相關教程,不再贅述。
  2. gym安裝:openai/gym 注意,直接調用pip install gym只會得到最小安裝。如果需要使用完整安裝模式,調用pip install gym[all]。
  3. 主流開源強化學習框架推薦如下。以下只有前三個原生支持gym的環境,其餘的框架只能自行按照各自的格式編寫環境,不能做到通用。並且前三者提供的強化學習演算法較為全面,PyBrain提供了較基礎的如Q-learning、Sarsa、Natural AC演算法。Tensorlayer基於tensorflow開發,提供了更為基本的api。由於我始終未能安裝上coach……本文將以前兩個為例。
    1. OpenAI提供的Baselines:openai/baselines 3.2k stars
    2. Tensorforce:reinforceio/tensorforce 1.2k stars
    3. Intel提供的Coach:NervanaSystems/coach 0.6k stars,pip安裝可以使用pip install rl-coach,而不是pip install coach
    4. PyBrain:pybrain/pybrain 2.5k stars
    5. Tensorlayer:tensorlayer/tensorlayer 3.1k stars

編寫自定義環境

本文所有代碼可下載:xiaotanzhuying/Reinforcement_Learning_in_Action

參考gym中經典的CartPole環境代碼CartPole.py,我們逐步構建自定義環境。

我們將要創建的是一個在二維水平面上移動的小車。該二維區域長寬各20單位。區域中心為坐標原點,也是小車要到達的目的地。

動作:小車每次只能選擇不動或向四周移動一個單位。

狀態:小車的橫縱坐標。

獎勵:小車到達目的地周圍有+10的獎勵;每移動一個單位有-0.1的獎勵,這表示我們希望小車盡量以較少的時間(移動次數)到達終點;小車移動到區域外的獎勵為-50。

首先新建一個Car2D.py的文件,需要import的包如下:

import gymfrom gym import spacesimport numpy as np

我們聲明一個Car2DEnv的類,它是gym.Env的子類。它一般包括的內容如下:

class Car2DEnv(gym.Env): metadata = { render.modes: [human, rgb_array], video.frames_per_second: 2 } def __init__(self): self.action_space = None self.observation_space = None pass def step(self, action): return self.state, reward, done, {} def reset(self): return self.state def render(self, mode=human): return None def close(self): return None

必須實現的內容:

__init__():將會初始化動作空間與狀態空間,便於強化學習演算法在給定的狀態空間中搜索合適的動作。gym提供了spaces方法,詳細內容可以help查看。;

step():用於編寫智能體與環境交互的邏輯,它接受action的輸入,給出下一時刻的狀態、當前動作的回報、是否結束當前episode及調試信息。輸入action由__init__()函數中的動作空間給定。我們規定當action為0表示小車不動,當action為1,2,3,4時分別是向上、下、左、右各移動一個單位。據此可以寫出小車坐標的更新邏輯;

reset():用於在每輪開始之前重置智能體的狀態。

不是必須實現的但有助於調試演算法的內容:

metadata、render()、close()是與圖像顯示有關的,我們不涉及這一部分,感興趣的同學可以自行編寫相關內容。

完整的Car2D.py代碼如下

# -*- coding: utf-8 -*-import gymfrom gym import spacesimport numpy as npclass Car2DEnv(gym.Env): metadata = { render.modes: [human, rgb_array], video.frames_per_second: 2 } def __init__(self): self.xth = 0 self.target_x = 0 self.target_y = 0 self.L = 10 self.action_space = spaces.Discrete(5) # 0, 1, 2,3,4: 不動,上下左右 self.observation_space = spaces.Box(np.array([self.L, self.L]), np.array([self.L, self.L])) self.state = None def step(self, action): assert self.action_space.contains(action), "%r (%s) invalid"%(action, type(action)) x, y = self.state if action == 0: x = x y = y if action == 1: x = x y = y + 1 if action == 2: x = x y = y - 1 if action == 3: x = x - 1 y = y if action == 4: x = x + 1 y = y self.state = np.array([x, y]) self.counts += 1 done = (np.abs(x)+np.abs(y) <= 1) or (np.abs(x)+np.abs(y) >= 2*self.L+1) done = bool(done) if not done: reward = -0.1 else: if np.abs(x)+np.abs(y) <= 1: reward = 10 else: reward = -50 return self.state, reward, done, {} def reset(self): self.state = np.ceil(np.random.rand(2)*2*self.L)-self.L self.counts = 0 return self.state def render(self, mode=human): return None def close(self): return None if __name__ == __main__: env = Car2DEnv() env.reset() env.step(env.action_space.sample()) print(env.state) env.step(env.action_space.sample()) print(env.state)


調用強化學習框架

Baselines

由於Baselines與gym都是一家開發的,代碼上基本無縫銜接,直接import我們聲明的這個Car2DEnv就可以使用了。以DQN為例,以下是訓練+測試的代碼:

from baselines import deepqfrom Car2D import Car2DEnvenv = Car2DEnv()model = deepq.models.mlp([32, 16], layer_norm=True)act = deepq.learn( env, q_func=model, lr=0.01, max_timesteps=10000, print_freq=1, checkpoint_freq=1000)print(Finish!)#act.save("mountaincar_model.pkl")#act = deepq.load("mountaincar_model.pkl")while True: obs, done = env.reset(), False episode_reward = 0 while not done: env.render() obs, reward, done, _ = env.step(act(obs[None])[0]) episode_reward += reward print([episode_reward, env.counts])

訓練部分:調用了多層感知機mlp作為我們的簡化深度Q網路。deepq.learn()中lr表示學習率,max_timesteps表示本次訓練的總時間步steps達到10000步後結束(是的,你沒看錯!這不是episode而是時間步steps),print_freq表示每隔多少episode列印一次統計信息,checkpoint_freq表示每隔多少steps保存一次模型。最終將選擇已保存的模型中平均回報最高的模型,賦給變數act。act可以save為pkl文件,方便下次load。

測試部分:act接受當前狀態obs後給出action,將其傳給環境的step()函數,得到下一時間步的狀態、回報、是否結束。我們在Car2DEnv中有一個變數counts記錄了每輪從開始到結束的時間步數,表示小車需要的時間。循環列印每輪的總回報和時間步數。如果總回報為正且時間步數較少,則表明我們的演算法取得了較好的效果。

Tensorforce

Tensorforce通過調用方法OpenAIGym將已註冊的gym環境導入。框架設計與Baselines略有不同。以PPO演算法為例,直接看代碼:

# -*- coding: utf-8 -*-import numpy as npfrom tensorforce.agents import PPOAgentfrom tensorforce.execution import Runnerfrom tensorforce.contrib.openai_gym import OpenAIGymenv = OpenAIGym(Car2D-v0, visualize=False)# Network as list of layersnetwork_spec = [ dict(type=dense, size=32, activation=tanh), dict(type=dense, size=32, activation=tanh)]agent = PPOAgent( states_spec=env.states, actions_spec=env.actions, network_spec=network_spec, batch_size=4096, # BatchAgent keep_last_timestep=True, # PPOAgent step_optimizer=dict( type=adam, learning_rate=1e-3 ), optimization_steps=10, # Model scope=ppo, discount=0.99, # DistributionModel distributions_spec=None, entropy_regularization=0.01, # PGModel baseline_mode=None, baseline=None, baseline_optimizer=None, gae_lambda=None, # PGLRModel likelihood_ratio_clipping=0.2, summary_spec=None, distributed_spec=None)# Create the runnerrunner = Runner(agent=agent, environment=env)# Callback function printing episode statisticsdef episode_finished(r): print("Finished episode {ep} after {ts} timesteps (reward: {reward})".format(ep=r.episode, ts=r.episode_timestep, reward=r.episode_rewards[-1])) return True# Start learningrunner.run(episodes=1000, max_episode_timesteps=200, episode_finished=episode_finished)# Print statisticsprint("Learning finished. Total episodes: {ep}. Average reward of last 100 episodes: {ar}.".format( ep=runner.episode, ar=np.mean(runner.episode_rewards[-100:]))) while True: agent.reset() state, done = env.reset(), False episode_reward = 0 while not done: action = agent.act(state, deterministic = True) state, done, reward = env.execute(action) agent.observe(done, reward) episode_reward += reward print([episode_reward])

為了讓代碼能夠順利運行,我們需要額外配置一些東西:

1. 註冊自定義gym環境

首先找到gym環境的文件夾位置。我使用的是anaconda,路徑是D:AnacondaLibsite-packagesgymgymenvs。新建一個文件夾user並進入。將剛才我們編寫的Car2D.py放進去。並增加入口文件__init__.py,內容為:

from gym.envs.user.Car2D import Car2DEnv

回到D:AnacondaLibsite-packagesgymgymenvs。修改入口文件__init__.py(不放心的可以備份原文件),在其中增加內容:

# User# ----------------------------------------register( id=Car2D-v0, entry_point=gym.envs.user:Car2DEnv, max_episode_steps=100, reward_threshold=10.0,)

2. 修改 D:AnacondaLibsite-packages ensorforceexecution
unner.py,將該文件最後兩行注釋掉,如下(不放心的同學也可以備份一下)

#self.agent.close() #self.environment.close()

原因是這將導致agent和environment在訓練完成後被清空,使得測試部分無法進行。

配置好這兩項內容後就可以愉快地運行代碼啦!Tensorforce中使用runner()進行訓練,對相關參數有興趣的同學可以閱讀官方文檔相關內容。代碼中agent.observe()僅僅是用於更新agent.episode信息,不會更新訓練好的模型參數。需要說明的是,由於tensorforce通過導入的形式調用自定義環境,我們自定義的內容如env.counts的正確調用形式是env.gym.env.counts。


後記

由於本人目前正在學習強化學習ing,難免有疏漏和錯誤。歡迎大家提出批評意見,共同進步!


推薦閱讀:

TAG:強化學習ReinforcementLearning | 強化學習 | openai |