標籤:

【Python】socket-Part7-實現多客戶端"並發"

之前寫socket都是簡單使用socket模塊,沒有實現多客戶端「並發「:如果現在有1個client和server建立了conn,第2個client是無法也去建立的……

附一個早前的簡單socket實現:

田田田田:【Python】Socket-Part4-TCP值得注意的地方

之前以為這是socket.accept()這個函數的自帶特性……今天寫完兩個例子才發現原因不在那裡:conn,addr=socket.accept()這一句完全可以多次執行,執行一次就accept一個新的conn。

真正的原因是自己代碼寫的不夠好,之前是把accept這一句放在while循環體裡面……(這樣accept的執行次數就是確定的,但實際上有多少個客戶端會連過來是程序員在寫代碼時根本無法預計的,所以不合適,用事件觸發的思想比較合適)

今天給socket實現多客戶端"並發",4種方案:

1、直接利用現成的socketserver模塊

2、自己在socket的基礎上加入threads

3、利用select 模塊,實現IO多路復用(始終1個線程,不涉及多線程)

4、利用selectors模塊,實現IO多路復用(內置了select/poll/epoll3中IO復用方式,視操作系統決定用哪一種,其實在我的windows上就是select因為windows只有selec,API有點區別)

下面是代碼和心得。(後台處理的代碼有省略)

1、直接利用現成的socketserver模塊

這個模塊的代碼值得好好看看,現在只是大概看看,它的實現是這樣:拿conn的過程用了select模塊,然後到數據處理的過程開了子進程。

千萬不要在創建socket這個過程上去開多線程,雖然一開始我就是這麼做的(⊙︿⊙)

(【Python】socketmodule.h和socketmodule.c 這個有機會再往深入看,總之socket對下都是通過調用操作系統的系統服務,也是這個調用過程涉及了IO、阻塞這些概念……)

import socketserverimport refrom calculate_bakstage import generate_responseimport timeclass Myserver(socketserver.BaseRequestHandler): def handle(self): print(conn is:,self.request) print(addr is:,self.client_address) while True: # 內層循環 try: data_recv = self.request.recv(1024) if not data_recv: break data_recv = data_recv if data_recv.startswith(GET): f = open("html_practicing/params.html", rb) self.request.sendall(f.read()) print(已經響應GET請求) …… except Exception as e: print(e) print(exception occured ,go to accpet next ) breakif __name__==__main__: s=socketserver.ThreadingTCPServer((127.0.0.1,80),Myserver) s.serve_forever()

2、自己在socket的基礎上加入threads

"""1、用threading模塊自己實現socket的多線程,而不用現成的socketserver2、這裡的多線程開在每一個conn上,而不是用多線程開了socket對象本身上。在server上全程只有1個socket對象。每accept一個conn,就順帶著開3個子進程,用於接收數據和後台處理。現成的socketserver也是這個思路:socket只有1個,用select而不是while循環去監測新的conn。在監測並拿到一個新的conn以後,再對conn開一個子線程。關鍵代碼如下:class ThreadingMixIn:…… def process_request(self, request, client_address): #Start a new thread to process the request. t = threading.Thread(target = self.process_request_thread, args = (request, client_address)) t.daemon = self.daemon_threads t.start()3、這裡3個線程沒有在共同操作一個數據,因此沒有加鎖,也沒有用到queue.Queue"""import socket,threadingimport refrom calculate_bakstage import generate_responsedef serve(): print(server begining) s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) print(s) #s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) #這一句保證了埠地址可重用 s.bind((127.0.0.1,80)) s.listen(5) s.set while True: #外層循環 conn,addr=s.accept() print(Accept a conn,conn is:,conn) conn.settimeout(10.0)#設置超時時間,因為1個conn上開了多個子線程,而且有的子線程就是一直在wait狀態,很浪費資源。客戶端長時間沒有動作,該斷就斷。 for i in range(3): # 每當accetp()一個新的conn,另開3個子線程來處理任務 t=threading.Thread(target=handle,args=(conn,)) t.start()#這些t就不要join了,這樣子線程處理這個conn的任務,而主線程就可以繼續進入while的下一輪,去accept新的conndef handle(conn): #handle是後台的處理邏輯 while True: # 內層循環 try: print(開始從操作系統接收數據……) data_recv = conn.recv(1024) if not data_recv: break data_recv = data_recv.decode(utf-8) if data_recv.startswith(GET): f = open("html_practicing/params.html", rb) conn.sendall(f.read()) print(已經響應GET請求) …… print(處理正常結束) except Exception as e: print(e) print(exception occured…… ) breakif __name__==__main__: serve()

3、利用select 模塊,實現IO多路復用(始終1個線程,不涉及多線程)

"""1、用select模塊I/O多路復用方式,實現socket的多線程,而不用現成的socketserver2、這裡有個概念需要澄清一下:之前寫過一個、最簡單的、單線程環境下的socket對象同時只能accept1個客戶端的conn,如果有第2個客戶端,需要等第1個conn結束以後才能得到服務。但這並不意味著,1個socket對象就真的只能accpet1個conn。實際上conn.accept()是可以多次執行的。之前那個效果,其實是自己的代碼的問題:在外層循環里只有一個conn.accept()(那時不知道有select這種事件監聽的方式,那麼也只能想到把accept寫在while True里。如果在while True里寫accpet2次,這樣同時可以為2個client服務,但是第3個要等;在while True里寫accpet3次,這樣同時可以為3個client服務,但是第4個要等……總之寫在while循環里,accept的執行次數是確定的。而實際上一台伺服器會有多少個客戶端想要同時連接,是一個程序員無法預知的時間,所以要實現必須借用到事件觸發。3、select的原理:select沒有創建多線程,只不過是每隔一定時間去執行select,去探測是否有新的client進來:如果有,轉入處理,有多少個處理多少個(遍歷);如果沒有,不動。4、一個問題:客戶端與服務端建立連接以後,第二次POST發到的是同一個socket?答:所以說即使要開多線程,也不要在1個埠上開多線程,多線程開在拿到conn以後的處理上,這樣保證伺服器到一個客戶端的連接只有一個conn,5、一個現象:可以看到剛開始服務端的s=socket.socket(),fd是208。然後從conn,addr=s.accept()後,拿到的conn也是一個socket對象,fd就是196,如果再有一個新的client進來,拿到的conn,fd又是208……"""import socket,selectimport refrom calculate_bakstage import generate_responseimport timedef serve(): #創建socket s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.bind((127.0.0.1,80)) print(s) s.listen(5) inputs =[s,] s.setblocking(True) while True: print(while True^) #用select監聽inputs列表裡對的所有,觸發方式是高電平觸發 r,w,e=select.select(inputs,[],[],5)#5表示每5s執行一次,如果沒有client in,那麼r就為空,for就會跳過,只列印end #如果檢測到有變化,就進入下面的for print(r) for obj in r: print(obj:,obj) if obj==s:#obj要麼是s要麼是con,是s就s.accept(),是conn就conn.recv() print(conn) conn,addr=obj.accept() print(conn:,conn) inputs.append(conn) else: print(data_recv) time.sleep(1) data_recv = obj.recv(1024)#好奇怪這裡會變成非阻塞的,而且setblocing(True)以後還是不生效,一直收空,只好關閉了 print(recv……) if not data_recv: print(kong) obj.close() inputs.remove(obj)#沒辦法了,如果收空就關閉並停止select data_recv = data_recv.decode(utf-8) if data_recv.startswith(GET): f = open("html_practicing/params.html", rb) obj.sendall(f.read()) print(已經響應GET請求) …… print(finish)if __name__==__main__: serve()

4、利用selectors模塊,實現IO多路復用

#!/usr/bin/python#coding=utf-8import selectors,socketimport refrom calculate_bakstage import generate_responseimport timedef accept(sock,mask): conn,addr=sock.accept() print(accept) #conn.setblocking(False)按默認的為True好像也不影響,畢竟是select()已經監聽到變化,不會阻塞了 sel.register(conn,selectors.EVENT_READ,read)def read(conn,mask): try: data_recv =conn.recv(1024) if not data_recv: conn.close() sel.unregister(conn) data_recv = data_recv.decode(utf-8) print(data_recv.__repr__()) print(type(data_recv)) if data_recv.startswith(GET): f = open("html_practicing/params.html", rb) conn.sendall(f.read()) print(已經響應GET請求) …… except Exception as e: print(e) print(exception occured ,go to accpet next )if __name__==__main__: #首先定義一個socket對象 s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) s.bind((127.0.0.1,80)) s.listen(5) #s.setblocking(False) #接著創造一個selectors.DefaultSelector對象,用於監聽 sel = selectors.DefaultSelector() #把socket對象加入監聽列表裡,加入同時需要與一個函數綁定, sel.register(s,selectors.EVENT_READ,accept) """ #這裡的accept就是一個回調函數。註冊這一步就是在sel這個調用者這裡提前寫好,把s和accpet的對應關係寫入事件隊列。這樣程序運行到後面,當s進數據這樣一個事件發生,就會觸發sel去調用accept這個函數(而不是別的什麼函數) #如果沒有提前註冊,事件發生時sel根本不知道調用哪個函數,或者說不知道要調用的函數在哪裡。 #這裡稱作"回調"其實已經超越了狹義上「回調函數作為另一個函數的參數,在那個函數中被執行",這是從廣義上來講的,"先註冊-再調用",所以叫回調。「回調函數作為另一個函數的參數,在那個函數中被執行",應該只是在代碼實現上的一種表現,但也可能是另外的表現,比如本例。 為什麼需要"先註冊-再調用"? 回調函數與觸發事件的對應關係提前在調用者那裡聲明好(註冊過程),之後在調用者發生事件時再去執行回調函數(發生的事件時無法提前預先知道的,因此執行哪個回調函數也是程序員無法用代碼寫死的。因此只能對所有的事件都寫一個處理邏輯:如果發生事件1,就調用函數1;如果發生事件2,就調用函數2……如果沒有時間發生,就不調用任何函數。 """ while True: print(while True) #selcet()方法拿到有動靜的對象,以key,mask的格式封裝在events裡面 events=sel.select() for key,mask in events: #key.fileobj和key.data就是剛剛註冊的兩個東西:fd和一個函數 print(key.data:,key.data)#<function accept at 0x0069D6A8> print(key.fileobj:,key.fileobj)#<socket.socket fd=208, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(127.0.0.1, 80)> fn=key.data#<function accept at 0x0069D6A8> fn(key.fileobj,mask)#accept(s,1)#這裡是典型的回調,通過調用fn去調用不同的key.fileobj

最後,再次總結一下,最重要的幾點是:

1、socket和conn都是socket.socket()對象,但是fd不同。

2、select模塊並沒有開多線程,但是也實現了任意個客戶端的並發,它是通過輪詢的方式:固定間隔去問操作系統:所監聽的socket和conn等對象是否有新的事件需要處理?如果沒有,就先轉去處理別的事情而不是一直阻塞,不浪費手上的CPU資源。

所以沒有IO阻塞,所以會看到並發的效果。

3、多線程不要開在socket對象上,而要開在conn對象 上。


推薦閱讀:

【強烈推薦】十三個鮮為人知的大數據學習網站
python與numpy使用的一些小tips[6]
Python文件處理
草根學Python(十二)元類
用不到 50 行的 Python 代碼構建最小的區塊鏈

TAG:Python | Socket |