【學界】第六章:Python代碼之蟻群演算法部分
作者:小楊
學校:廣東工業大學 年級:研二 專業:工業工程 主要研究興趣:強化學習、深度學習簡介:作者是廣東工業大學2016級工業工程系研究生,師從廣東工業大學教授、博士生導師、《工業工程》雜誌主編,陳慶新教授,廣東工業大學講師、港大博士,張婷。於研一至研二第一學期初次進行學術研究,發表一篇被EI期刊、國內Top工程期刊《系統工程理論與實踐》收錄,兩篇國際學術會議文章(International Conference on Networking, Sensing and Control 2018,Healthcare Operations Management SIG 2018 Conference),完成一個國際會議poster演講(Manufacturing and Service Operations Management Conference 2018 ),投稿一篇文章至Journal of Clean Production正在初審。目前研究興趣主要集中在機器學習與運籌學的交叉應用領域。
這系列文章是個人的一些體會,也是對上一階段工作的總結。由於是初次涉水,閱歷尚淺,希望通過總結來反省,希望收穫大神的意見~
研究課題:
[ 社區居家養老調度與路徑規劃問題 ]
[ Community Home Health Care Scheduling and Routing Problem ]
------本文目錄------
第一、二、三章:論文選題、文獻綜述方法、問題分析、與模型假設
第四章:馬氏決策過程&機會約束模型&多人隨機路徑規劃
第五章:Python代碼之模型與Q-Learning
第六章:Python代碼之蟻群演算法部分
第七章:Python代碼之模塊整合與主程序
8 摘要,前言與結論:表達能力
------6 Python代碼之蟻群演算法與程序整合------
(寫著寫著發現蟻群演算法部分太多了,另起一篇敘述。)
這篇文章主要從如何用Python實現演算法的角度來總結。如標題所示,這個啟發式演算法結合了Q-learning和Ant Colony Optimization,其本質思想是強化學習的思想,即通過不斷地試錯來尋找可接受的近似最優解。演算法的流程如下所示。
那麼整個程序可以分為四部分:
Data Representation(DR) Best-Worst Ant Colony Optimization(BWACO) Q-Learning(QL) Results Exporting(RE)
所有需要用到的包:
import numpy as npimport copyimport pandas as pdimport mathimport randomimport timeimport matplotlib.pyplot as pltimport xlwt
------6.1 蟻群演算法實現與初始化------
蟻群演算法是一種仿生學啟發式演算法,其核心思想在於模仿現實世界中螞蟻尋找最優路徑的生物機制,在1991年首次被提出(Colorni et al., 1991),如今已經產生了很多應用成果(Dorigo and Stützle, 2010),而BW這個加速收斂的方法則第一次出現在2000年(Cordon et al., 2000)。感興趣的同學可以仔細研究一下參考文獻:
- Colorni, A., Dorigo, M., Maniezzo, V., 1991. Distributed optimization by ant colonies, Proceedings of the first European conference on artificial life. Paris, France, pp. 134-142.
- Cordon, O., de Viana, I.F., Herrera, F., Moreno, L., 2000. A new ACO model integrating evolutionary computation concepts: The best-worst Ant System, From Ant Colonies to Artificial Ants: Second International Workshop on Ant Algorithms Brussels, Belgium.
- Dorigo, M., Stützle, T., 2010. Ant colony optimization: overview and recent advances, Handbook of metaheuristics. Springer, pp. 227-263.
本文蟻群演算法經過了修改以適應具體問題,流程為:
- 設置n只螞蟻在起點,初始化每條邊的phenomenon量
- 每隻螞蟻按照轉移概率函數構建路徑
- 根據等待時長挑出最優路徑和最差路徑,對每條在最優路徑中的邊,增加一個固定的phenomenon量,對最差路徑中的則減去同等的量,並根據揮發係數減去一個固定百分比的量
- 重複2-3步直至maximum iteration,輸出最優路徑
*流程中第2步的概率轉移函數:
*流程中第3步的信息素更新方程:
已經可以列出蟻群演算法總共需要調試的參數了:螞蟻數量、初始信息素量、α、β、揮發係數ρ。那麼這部分演算法怎麼實現呢?(修改完代碼發現要講明白還不容易 orz...)
函數包:
import numpy as npimport pandas as pdimport copyimport mathimport random
把整個演算法當成一個構建路徑的函數,先確定輸入輸出。
輸入:
CCP參數 ccp_acquaintance_increment ccp_alphaccp_beta
ccp_waiting_limit ccp_workload_limitACO參數 aco_alpha aco_beta aco_rho aco_ant_number aco_pheromone_matrix實驗數據常量e_preference_matrix
e_distance_matrix e_service_time_mean e_walk_speed被選中的護工對象(QL演算法給出的action) ql_nurse滿足技能匹配規則的所有可選目標 sd_matched_targets輸出:一個包含最優路徑的護工對象
先看一下演算法流程圖:
def aco(acquaintance_increment, alpha_model_p, beta_model_p, waiting_limit, workload, alpha_aco_p, beta_aco_p, rho_aco_p, ant_num, pheromone_matrix, walk_speed, preference_matrix, distance_matrix, service_time_mean, nurse, sd_matched_targets, depot): # Output variables arrival_time_trace = [] shortest_time = 0 waiting_time = 0 best_path = [] # Temporal variables worst_path = [] ccp_best_objective = 0 ccp_worst_objective = 0 for ant in range(ant_num): # Initialization: depot, time, waiting time, workload current_job = depot current_time = 0 current_waiting = 0 current_workload = 0 # Lists for recording sub-arrival time and path sub_arrival_time = [] ant_path_table = [depot] # Initialize visiting list and preference matrix visiting_list = copy.deepcopy(sd_matched_targets) current_preference = copy.deepcopy(preference_matrix) # Build routes while current_workload <= workload: # Read out service time mean value and preference value st_mean = service_time_mean[nurse.s][current_job.lv] preference_factor = copy.deepcopy(current_preference[current_job.e][nurse.l]) # Inspect waiting and record sub-arrival time if current_time < current_job.twb: current_waiting += (current_job.twb - current_time) current_time = copy.deepcopy(current_job.twb) sub_arrival_time.append(copy.deepcopy(current_time)) else: sub_arrival_time.append(copy.deepcopy(current_time)) # Compute arrival time as predicted workload when going back to depot at current position # Then check if overwork occurs current_workload = current_time + (preference_factor * st_mean) + beta_model_p + (distance_matrix[current_job.e][depot.e]) / walk_speed if current_workload >= workload: # Overwork predicted, stop routing # Set depot as the next target and record arrival time ant_path_table.append(depot) current_time = copy.deepcopy(current_workload) sub_arrival_time.append(copy.deepcopy(current_time)) break else: # Continue routing # Add up service time current_time += (preference_factor * st_mean) + alpha_model_p # Search for targets satisfying the time window constraint feasible_targets = collect_feasible_targets(visiting_list, distance_matrix, walk_speed, waiting_limit, current_job, current_time) # Count feasible targets, calculate transition probabilities and choose target chosen_target = calculate_transition_probability(feasible_targets, current_time, distance_matrix, current_job, walk_speed, sub_arrival_time, ant_path_table, visiting_list, pheromone_matrix, alpha_aco_p, beta_aco_p, depot) if chosen_target.l == 0: # No feasible target, stop routing break else: # Feasible target chosen, continue current_job = chosen_target # Revise preference current_preference[chosen_target.e][nurse.l] -= acquaintance_increment continue # Calculate fulfilled demands fulfilled_demand = copy.deepcopy(len(ant_path_table) - 2) # Record the best and worst solution according to the CCP objective if fulfilled_demand == 0: # no fulfilled demand if len(best_path) == 0: best_path = copy.deepcopy(ant_path_table) worst_path = copy.deepcopy(ant_path_table) else: # record current PPM objective: total waiting time ccp_objective = copy.deepcopy(current_waiting) if ant == 0: # first iteration, record best CCP objective, worst PPM objective, working time, # waiting time, best route, and worst route ccp_best_objective = copy.deepcopy(ccp_objective) ccp_worst_objective = copy.deepcopy(ccp_objective) shortest_time = copy.deepcopy(current_time) waiting_time = copy.deepcopy(current_waiting) best_path = copy.deepcopy(ant_path_table) worst_path = copy.deepcopy(ant_path_table) arrival_time_trace = copy.deepcopy(sub_arrival_time) else: # not first iteration if ccp_best_objective > ccp_objective: # find the best one ccp_best_objective = copy.deepcopy(ccp_objective) shortest_time = copy.deepcopy(current_time) waiting_time = copy.deepcopy(current_waiting) best_path = copy.deepcopy(ant_path_table) arrival_time_trace = copy.deepcopy(sub_arrival_time) elif ccp_worst_objective < ccp_objective: # find the worst one ccp_worst_objective = copy.deepcopy(ccp_objective) worst_path = copy.deepcopy(ant_path_table) else: continue # update pheromone according to Best-Worst rule update_pheromone(best_path, worst_path, pheromone_matrix, rho_aco_p, distance_matrix) # update route nurse.tt = copy.deepcopy(shortest_time) nurse.aT = copy.deepcopy(arrival_time_trace) nurse.twt = copy.deepcopy(waiting_time) for o in range(len(best_path)): nurse.r.append(best_path[o]) if best_path[o].lv == 1: nurse.sd[0] += 1 elif best_path[o].lv == 2: nurse.sd[1] += 1 elif best_path[o].lv == 3: nurse.sd[2] += 1 if sum(nurse.sd) != 0: nurse.avg_w = copy.deepcopy(nurse.twt / sum(nurse.sd))
代碼看了頭暈的別急,我們來分解這個 def aco(): 函數。上面說我們有n只螞蟻,所以就有n條路要來構建,所以就有了第一個for循環:
for ant in range(ant_num):
而每一隻螞蟻都要在workload約束(CCO,有疑問請轉上一章模型)下適時結束旅途,於是就有了那個while循環:
# Build routes while current_workload <= workload:
對應的if判斷語句都可以在流程圖中找到,注意判斷overwork(CCO約束)那個語句中,一旦條件成立需要跳出while循環結束當前路徑,所以用了break。下面是關於選擇下一個移動目標的兩個函數:
def collect_feasible_targets(visiting_list, distance_matrix, walk_speed, waiting_limit, current_job, current_time): ft = [] for j in range(len(visiting_list)): distance = distance_matrix[current_job.e][visiting_list[j].e] travel = distance / walk_speed arrival = current_time + travel # Arrival time must be earlier than the upper bound # and later than the maximum waiting time + lower bound if arrival < visiting_list[j].twe: if (arrival + waiting_limit) >= visiting_list[j].twb: ft.append(visiting_list[j]) continue else: continue return ft
這個函數是將所有滿足時間窗基本要求的目標挑出來,所以對所有可選目標計算到達時刻,返回值是一個列表。再把這個列表拋給選擇移動目標的函數:
def calculate_transition_probability(feasible_targets, current_time, distance_matrix, current_job, walk_speed, sub_arrival_time, ant_path_table, visiting_list, pheromone_matrix, alpha_aco_p, beta_aco_p, depot): # Count feasible targets # =0: return depot as the next target # =1: return it as the next target # >2: return the target chosen according to probability transition function if (len(feasible_targets)) == 0: # No feasible targets, end routing current_time += (distance_matrix[current_job.e][depot.e]) / walk_speed sub_arrival_time.append(copy.deepcopy(current_time)) # record arrival time back to depot ant_path_table.append(depot) return depot elif len(feasible_targets) == 1: # Only one feasible target, choose it and update route ant_path_table.append(feasible_targets[0]) current_time += (distance_matrix[current_job.e][feasible_targets[0].e]) / walk_speed # Remove chosen target from visiting list for v in range(len(visiting_list)): if visiting_list[v].l == feasible_targets[0].l: visiting_list.remove(visiting_list[v]) return feasible_targets[0] else: # More than 1 feasible targets, calculate transition probabilities pr = [] pD = 0 for pdd in range(len(feasible_targets)): yitaD = distance_matrix[current_job.e][feasible_targets[pdd].e] pD += pow((pheromone_matrix[current_job.e][feasible_targets[pdd].e]), alpha_aco_p) * pow(yitaD, beta_aco_p) for pt in range(len(feasible_targets)): yitaU = distance_matrix[current_job.e][feasible_targets[pt].e] pU = pow((pheromone_matrix[current_job.e][feasible_targets[pt].e]), alpha_aco_p) * pow(yitaU, beta_aco_p) pT = pU / pD pr.append([current_job, feasible_targets[pt], pT]) if math.isnan(pT): print(pT) # Choose target randomly and update route target_index = choose_target_randomly(pr) ant_path_table.append(pr[target_index][1]) current_time += (distance_matrix[current_job.e][pr[target_index][1].e]) / walk_speed # Remove chosen target from visiting list for v in range(len(visiting_list)): if visiting_list[v].l == pr[target_index][1].l: visiting_list.remove(visiting_list[v]) break return pr[target_index][1]
這個函數里要這裡分三種情況討論,即可選集里為空、或者有一個,或者多於一個,分別進行不同操作。
# Count feasible targets
# =0: return depot as the next target# =1: return it as the next target# >2: return the target chosen according to probability transition function
返回值是一個Job對象。
calculate_transition_probability()這個函數里調用了另一個小函數:
def choose_target_randomly(pr): p_coor = 0 p_axi = [0] for pta in range(len(pr)): p_coor += pr[pta][2] p_axi.append(p_coor) # generate a random value ran_var = random.uniform(0, p_coor) search = (len(p_axi) // 2) - 1 while True: if search == 0: return 0 if ran_var <= p_axi[search]: if ran_var > p_axi[search - 1]: return (search - 1) else: search -= 1 else: if ran_var <= p_axi[search + 1]: return search else: search += 1
這個小演算法用來根據多個目標的轉移概率選擇移動目標,類似抽樣。具體思想是把所有目標的轉移概率加起來,成為一個區間,並以每個子區間表示一個目標。生成一個隨機數,落在哪個區間里則返回對應的目標。實現起來返回的是一個索引值。
好了,到了這裡程序基本完成了,最後一步是把解的信息存進作為輸入的Nurse對象裡面:
# update route nurse.tt = copy.deepcopy(shortest_time) nurse.aT = copy.deepcopy(arrival_time_trace) nurse.twt = copy.deepcopy(waiting_time) for o in range(len(best_path)): nurse.r.append(best_path[o]) if best_path[o].lv == 1: nurse.sd[0] += 1 elif best_path[o].lv == 2: nurse.sd[1] += 1 elif best_path[o].lv == 3: nurse.sd[2] += 1
然後,進入馬爾科夫鏈的下一個狀態,演算法轉化至Q-Learning計算獎勵值並進入下一個決策階段。
推薦閱讀: