網路編程(九):Python裸寫非同步非阻塞網路框架

裸寫的意思就是不用任何第三方庫,這樣有助於網路編程入門的同學了解其中的奧秘。

相關的前置的網路編程的知識參見:

  • 網路編程(一):演進——從Apache到Nginx
  • 網路編程(二):戲說非阻塞網路編程
  • 網路編程(三):從libevent到事件通知機制
  • 網路編程(四):互聯網中TCP Socket伺服器的實現過程需要考慮哪些安全問題?
  • 網路編程(五):長連接&連接池的應用
  • 網路編程(六):埠那些事兒
  • 網路編程(七):CAP原理推導和應用
  • 網路編程(八):再談Sendfile(2)

好了,直接上代碼(之前的代碼是沒有太多注釋的感謝@康存化 給代碼加了詳盡的注釋):

# 從之前寫好的守護進程類中導入Daemonfrom daemon import Daemonimport socketimport selectimport time# 導入pdb,即Python debug模塊,方便調試import pdb# 設定這個變數,可以控制本文件在被import *# 的時候導入的變數、函數和類的範圍__all__ = ["nbNet"]#DEBUG = True# 這個文件的內容會在後面詳述from nbNetUtils import *# 我們住程序的conn_state的類原型class STATE: def __init__(self): self.state = "accept" # 初始狀態 self.have_read = 0 # 我們的程序在開始的時候總是要讀取10個位元組的頭 self.need_read = 5 self.have_write = 0 self.need_write = 0 # 讀寫緩衝區 self.buff_read = "" self.buff_write = "" # 用來後續存放sock對象 self.sock_obj = "" def printState(self): """ debug輸出函數 """ if DEBUG: dbgPrint("
- current state of fd: %d" % self.sock_obj.fileno()) dbgPrint(" - - state: %s" % self.state) dbgPrint(" - - have_read: %s" % self.have_read) dbgPrint(" - - need_read: %s" % self.need_read) dbgPrint(" - - have_write: %s" % self.have_write) dbgPrint(" - - need_write: %s" % self.need_write) dbgPrint(" - - buff_write: %s" % self.buff_write) dbgPrint(" - - buff_read: %s" % self.buff_read) dbgPrint(" - - sock_obj: %s" % self.sock_obj)class nbNetBase: """ non-blocking Net類,首先我們設定了一套通信的協議,以10個byte的 ASCII碼數字的頭來表示,後續數據的長度,例如: 0000000005HELLO 0000000001a 0000000012hello world
這樣做的好處在於,我們可以很容易的解析消息的結束位置。 並且能夠在這套框架下進行任何數據的傳輸,包括各種二進位數據,而不需要轉義。 """ def setFd(self, sock): """把sock放進全局的conn_state字典里""" dbgPrint("
-- setFd start!") tmp_state = STATE() tmp_state.sock_obj = sock self.conn_state[sock.fileno()] = tmp_state self.conn_state[sock.fileno()].printState() dbgPrint("
-- setFd end!") def accept(self, fd): """在fd上進行accept,並且把socket設置成非阻塞模式""" dbgPrint("
-- accept start!") sock_state = self.conn_state[fd] sock = sock_state.sock_obj conn, addr = sock.accept() # 把socket設置成非阻塞模式 conn.setblocking(0) return conn def close(self, fd): """關閉fd,從epoll中取消關注,清理conn_state里相關的數據""" try: # cancel of listen to event sock = self.conn_state[fd].sock_obj sock.close() except: dbgPrint("Close fd: %s abnormal" % fd) finally: self.epoll_sock.unregister(fd) self.conn_state.pop(fd) def read(self, fd): """ 讀取fd中的數據(非阻塞模式) 並且設置各個計數器的數值,以供後續處理 返回值是個字元串,表示下一步需要進行的處理,如: 「readcontent」、「process」、「readmore」 """ # pdb.set_trace() try: # 從conn_state字典中取出連接 sock_state = self.conn_state[fd] conn = sock_state.sock_obj if sock_state.need_read <= 0: raise socket.error # 進行一次非阻塞的讀取 one_read = conn.recv(sock_state.need_read) dbgPrint(" read func fd: %d, one_read: %s, need_read: %d" % (fd, one_read, sock_state.need_read)) # 如果什麼都沒有讀到,那應該是socket出錯了 if len(one_read) == 0: raise socket.error # 將讀到的數據放入buff_read, # 設定have_read(已經從socket中讀取的數量) # 設定need_read(還需從socket中要讀取的數量) sock_state.buff_read += one_read sock_state.have_read += len(one_read) sock_state.need_read -= len(one_read) sock_state.printState() # 如果已經讀取的數據是10個byte,那麼說明數據的10位元組頭已經讀取完畢, # 我們可以解析判斷後續的數據的長度了 if sock_state.have_read == 5: # 由於是ASCII的數據頭,我們需要用int()將它轉化成數字 header_said_need_read = int(sock_state.buff_read) if header_said_need_read <= 0: raise socket.error sock_state.need_read += header_said_need_read sock_state.buff_read = "" # 為了方便大家理解,這裡列印一些debug信息 sock_state.printState() return "readcontent" elif sock_state.need_read == 0: # 所有數據已經讀取完畢,轉入業務邏輯處理「process」 return "process" else: # 出去上述的所有情況,剩下的情況就是還需要讀取更多的數據 return "readmore" except (socket.error, ValueError), msg: # 進行一些異常處理 try: # errno等於11,即「EAGAIN」。是表示,還可以嘗試進行一次讀取 if msg.errno == 11: dbgPrint("11 " + msg) return "retry" except: pass # 除去上述的特殊情況,發生了任何錯誤,不要掙扎,直接把socket關閉 return "closing" def write(self, fd): """ 非阻塞的寫數據到socket中,返回值的涵義和上述的read一致 """ # 還是取出fd對應的sock_state結構體 sock_state = self.conn_state[fd] conn = sock_state.sock_obj last_have_send = sock_state.have_write try: # 非阻塞的發送數據,這裡send的返回值是已經成功發送的數據量 have_send = conn.send(sock_state.buff_write[last_have_send:]) sock_state.have_write += have_send sock_state.need_write -= have_send if sock_state.need_write == 0 and sock_state.have_write != 0: # 如果已經全部發送成功,返回「writecomplete」 sock_state.printState() dbgPrint("
write data completed!") return "writecomplete" else: return "writemore" except socket.error, msg: # 發生錯誤,直接關閉socket return "closing" def run(self): """ 這個函數是裝個狀態機的主循環所在 """ while True: # 這部分就是我們上面多次提到的epoll # poll()返回的epoll_list就是有事件發生的fd的list # 需要在循環中按照event的類型分別處理,一般分為以下幾種類型 # EPOLLIN :表示對應的文件描述符可以讀; # EPOLLOUT:表示對應的文件描述符可以寫; # EPOLLPRI:表示對應的文件描述符有緊急的數據可讀;一般不需要特殊處理 # EPOLLERR:表示對應的文件描述符發生錯誤;後面這兩種需要關閉socket # EPOLLHUP:表示對應的文件描述符被掛斷; epoll_list = self.epoll_sock.poll() for fd, events in epoll_list: sock_state = self.conn_state[fd] if select.EPOLLHUP & events: dbgPrint("EPOLLHUP") sock_state.state = "closing" elif select.EPOLLERR & events: dbgPrint("EPOLLERR") sock_state.state = "closing" # 調用狀態機 self.state_machine(fd) def state_machine(self, fd): """ 這裡的邏輯十分的簡單:「按照不同fd的state,調用不同的函數即可」 具體的對應表見nbNet的__init__() """ sock_state = self.conn_state[fd] self.sm[sock_state.state](fd)class nbNet(nbNetBase): def __init__(self, addr, port, logic): dbgPrint("
__init__: start!") # 初始化conn_state字典,這個字典將會保存每個連接的狀態 # 以及連接的讀寫內容。 self.conn_state = {} # 初始化監聽socket # socket.AF_INET指的是乙太網 # socket.SOCK_STREAM指的是TCP self.listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) # 開啟SO_REUSEADDR,這樣當監聽埠處於各種xxx_WAIT的狀態的時候 # 也能正常的listen、bind self.listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 綁定在制定的IP和埠上 self.listen_sock.bind((addr, port)) # 指定backlog數,這裡初學者可能會存在一個誤區,需要解釋一下: # 有些地方把listen的參數成為「積壓值」或者backlog, # 最大連接數是最大能處理的連接數(accept了丟一邊晾著是耍流氓) # 提高並發處理能力是門學問,不單是提高「最大連接數」,兩點結論: # 1. backlog不能提高「最大連接數」 # 2. backlog不宜設置過大 # 舉個栗子,假設我們的伺服器是一個非常受歡迎的飯店: # 最大連接數就是這個飯店最大能容納的顧客人數,backlog就是門外允許排隊的最大長度。 # 如果飯店點菜慢上菜也慢(伺服器的處理不能滿足要求),飯店將很快被被顧客坐滿(最大連接數上限) # ,門口排位如果無限排下去(backlog設置非常大),可能還不如告訴顧客現在人太多了, # 我們處理不過來,不過等會兒再來試試。 # 想要提高服務質量只能通過提高翻桌率(伺服器處理速度)來實現。 self.listen_sock.listen(5) # backlog # 將listen socket同樣放入conn_state self.setFd(self.listen_sock) # 初始化epoll的fd self.epoll_sock = select.epoll() # 這裡指定我們的listen socket只關注EPOLLIN,即connect過來的連接 # LT 是這裡的默認, 想要ET 需要改成"select.EPOLLIN | select.EPOLLET" self.epoll_sock.register(self.listen_sock.fileno(), select.EPOLLIN) # 業務邏輯處理函數 self.logic = logic # 狀態機的各個狀態的處理函數,這裡的self.sm是一個key是字元串,value是函數的字典 self.sm = { "accept": self.accept2read, "read": self.read2process, "write": self.write2read, "process": self.process, "closing": self.close, } dbgPrint("
__init__: end, register no: %s" % self.listen_sock.fileno()) def process(self, fd): """ 調用業務邏輯處理函數self.logic,然後將它返回的字元串當成是 Server對Client的回應 通過約定好調用的函數原型,就可以實現比較乾淨的業務邏輯和網路框架的分離 """ sock_state = self.conn_state[fd] response = self.logic(sock_state.buff_read) # 組裝給Client回應的10位元組協議頭 sock_state.buff_write = "%05d%s" % (len(response), response) sock_state.need_write = len(sock_state.buff_write) sock_state.state = "write" self.epoll_sock.modify(fd, select.EPOLLOUT) sock_state.printState() def accept2read(self, fd): """ 這個函數主要完成accept到等待數據讀取的狀態轉換 """ # accept一個連接之後,需要註冊,初始化state為read conn = self.accept(fd) self.epoll_sock.register(conn.fileno(), select.EPOLLIN) self.setFd(conn) self.conn_state[conn.fileno()].state = "read" # 現在accept 到 read的轉換完成了 # 需要明確的是,我們的listen socket還是處於等待連接到來 # 的accept狀態 dbgPrint("
-- accept end!") def read2process(self, fd): """ 這個函數主要完成read完所有請求到處理業務邏輯的狀態轉換 """ # pdb.set_trace() read_ret = "" try: read_ret = self.read(fd) except (Exception), msg: dbgPrint(msg) read_ret = "closing" if read_ret == "process": # 數據接收完成,轉換到process階段 self.process(fd) # readcontent、readmore、retry 都不用改變socket的state elif read_ret == "readcontent": pass elif read_ret == "readmore": pass elif read_ret == "retry": pass elif read_ret == "closing": self.conn_state[fd].state = "closing" # 發生錯誤直接關閉,做到快速失敗 self.state_machine(fd) else: raise Exception("impossible state returned by self.read") def write2read(self, fd): """ 這個函數主要完成write給client回應到等待數據讀取的狀態轉換。 這個情況就是我們經常聽到的「長連接」 """ try: write_ret = self.write(fd) except socket.error, msg: write_ret = "closing" if write_ret == "writemore": pass # 寫數據完成,重置各種計數器,開始等待新請求過來 elif write_ret == "writecomplete": sock_state = self.conn_state[fd] conn = sock_state.sock_obj self.setFd(conn) self.conn_state[fd].state = "read" self.epoll_sock.modify(fd, select.EPOLLIN) elif write_ret == "closing": # 發生錯誤直接關閉,做到快速失敗 dbgPrint(msg) self.conn_state[fd].state = "closing" # closing directly when error. self.state_machine(fd)if __name__ == "__main__": # 這個是我們演示用的「業務邏輯」,做的事情就是將請求的數據反轉 # 例如: # 收到:0000000005HELLO # 回應:0000000005OLLEH def logic(d_in): return(d_in[::-1]) # 監聽在0.0.0.0:9076 reverseD = nbNet("0.0.0.0", 9090, logic) # 狀態機開始運行,除非被kill,否則永不退出 reverseD.run()# >>study:/home/kang>telnet 127.0.0.1 50009# Trying 127.0.0.1...# Connected to 127.0.0.1.# Escape character is "^]".# 00005abcde# 00005edcba

上面的代碼里import 的deamon類是一個用Python純手工實現的daemonlize類

import sysimport osimport timeimport atexitfrom signal import SIGTERMclass Daemon: """ 通用的Daemonlize類,能將一個程序變成守護進程 使用方式:繼承Daemon類,然後重寫run()函數即可 """ def __init__(self, pidfile="nbMon.pid", stdin="/dev/null", stdout="nbMon.log", stderr="nbMon.log"): self.stdin = stdin self.stdout = stdout self.stderr = stderr self.pidfile = pidfile def daemonize(self): """ 雙重fork,具體原因參見Stevens寫的《UNIX環境高級編程》 書籍鏈接參見:http://book.douban.com/subject/1788421/ 還有: http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16 """ try: pid = os.fork() if pid > 0: # 退出第一個「爺爺進程」,因為後面還要第二次fork # 所以這個進程輩分是「爺爺」 sys.exit(0) except OSError, e: sys.stderr.write("fork #1 failed: %d (%s)
" % (e.errno, e.strerror)) sys.exit(1) # 需要執行一些操作避免可能從父進程繼承過來的影響守護進程的設定 # 改變當前工作目錄 os.chdir("/") # 設置sid,成為session Leader os.setsid() # 重設umask os.umask(0) # 第二次fork try: pid = os.fork() if pid > 0: # 父進程依然退出 sys.exit(0) except OSError, e: sys.stderr.write("fork #2 failed: %d (%s)
" % (e.errno, e.strerror)) sys.exit(1) # 重定向0、1、2三個fd(依次為標準輸入、標準輸出、錯誤輸出) # 這裡需要注意,有些不講究的程序或者文章,會直接將0、1、2關閉, # 這樣會造成一定的隱患,可能會導致後續操作打開的文件句柄佔用 # 0、1、2這三個一般認為有特殊含義的句柄,會導致一些莫名其妙的問題發生 # 所以這裡最好的建議是,將這三個fd重新定向到/dev/null,或者相應的日誌文件 # 重新定向之前flush一次,確保該列印出來的文字已經輸出 sys.stdout.flush() sys.stderr.flush() si = file(self.stdin, "r") so = file(self.stdout, "a+") se = file(self.stderr, "a+", 0) os.dup2(si.fileno(), sys.stdin.fileno()) os.dup2(so.fileno(), sys.stdout.fileno()) os.dup2(se.fileno(), sys.stderr.fileno()) """ 寫pid文件 """ # 這裡設定一個程序退出時回調,但必須知道的是,這個回調 # 在一些情況下並不保證一定會被執行,比如被kill -9 atexit.register(self.delpid) pid = str(os.getpid()) file(self.pidfile, "w+").write("%s
" % pid) def delpid(self): os.remove(self.pidfile) def start(self): """ 啟動守護進程 """ # 檢查pid文件是否存在,如果存在就認為程序還在運行 try: pf = file(self.pidfile, "r") pid = int(pf.read().strip()) pf.close() except IOError: pid = None if pid: message = "pidfile %s already exist. Daemon already running?
" sys.stderr.write(message % self.pidfile) sys.exit(1) # 開始變身守護進程,哈哈哈哈 self.daemonize() self.run() def stop(self): """ 停止守護進程 """ # 從pid文件中獲取進程id try: pf = file(self.pidfile, "r") pid = int(pf.read().strip()) pf.close() except IOError: pid = None if not pid: message = "pidfile %s does not exist. Daemon not running?
" sys.stderr.write(message % self.pidfile) return # 開始嘗試kill掉守護進程 try: while 1: os.kill(pid, SIGTERM) time.sleep(0.1) except OSError, err: err = str(err) if err.find("No such process") > 0: if os.path.exists(self.pidfile): os.remove(self.pidfile) else: print str(err) sys.exit(1) def restart(self): """ 重啟 """ self.stop() self.start() def run(self): """ 這個方法是空的,所以要想使用這個類,必須在子類中 重寫這個函數,這個函數應該寫的是程序的主邏輯循環。 後面這個函數將會在start()和restart()函數中被調用。 """

服務端開發群:365534424,本文僅授權51 Reboot相關賬號發布。

推薦閱讀:

10min手寫(一):伺服器內存監控系統
為什麼 x in range(1000000000000001) 的執行速度這麼快
如何看待將Python代碼轉換成Go代碼並進一步編譯的 Grumpy 項目?
基於ArcGIS的python編程:2.python基礎(一)

TAG:网络编程 | Python |