用Python打造在線盈利的項目-代理服務
前言
這是Python學習系列文章的中篇,當你學完這個系列的文章之後,我希望你能學到以下知識點:
- 學會從靜態UML圖入手來初步整理開源項目各個模塊的依賴關係
- 學會從數據流來進一步分析各個模塊是如何合作的
- 學會socks5協議的tcp與udp部分
- 學會非同步io的系統是怎麼寫的
- 學會狀態機是什麼,以及狀態機用於解決什麼問題
- 根據用戶場景來設計介面或者命令
- 學會對稱加密中有一大類是流加密
內容目錄
- 1.回顧過去,藉助已有開源框架實現RequestRelayer服務
- 1.1 情景回顧
- 1.2 重新認識RequestRelayer
- 2.RequestRelayer模塊分解
- 2.1 都有哪些模塊組成
- 2.2 模塊之間的依賴關係
- 3.RequestRelayer工作流
- 3.1 tcp類型的加密中轉服務
- 3.2 udp類型的加密中轉服務
- 4.為RequestRelayer添加新功能
- 4.1 用戶場景分析
- 4.2 為每個用戶場景添加對應的功能
1. 回顧過去,藉助已有開源框架實現RequestRelayer服務
1.1 情景回顧
經過一段時間的忙碌,我還有事兒沒完成,繼續分享一下之前我在這個Live(用Python打造在線盈利的項目)中沒有完成的事情。這篇文章將講解使用Python實現RequestRelayer的細節,如下圖紅色矩形模塊所示。
想要在短期內上線一款服務,我們一般會藉助一些開源的項目,這個RequestRelayer也不例外,因此我們會在這個項目的基礎上為RequestRelayer添加功能。在這裡我就用RequestRelayer代替這個項目的名字,這個項目很優秀,如果大家感興趣,那麼就去看看他們每日的輸出。為了定製符合RequestRelayer的功能,我們就有必要熟悉和理解我們選擇的開源項目的源代碼。因此本篇文章將對我們選擇的開源項目進行源碼分析,了解它的模塊組成部分,工作流,數據流之後,我們將在合適的位置為這個開源的項目添加適合我們業務場景的功能。
1.2 重新認識RequestRelayer
RequestRelayer共有2部分子系統,分別是下圖黃色部分的local與紅色部分的server。local相當於上圖的RequestRelayer Client,server相當於上圖的RequestRelayer,不過切記,local與server共同組成了RequestRelayer。local運行在用戶的PC機上(命令行版本),server運行在遠程的vps上(也就是一台運行linux OS的PC機)。
有上圖可知,我們需要學習local與server兩個子系統,以便我們日後能夠為具體的業務實現自己的功能。
2. RequestRelayer模塊分解
2.1 都有哪些模塊組成
好,在分析別人的項目的源碼之前,我們一般需要將別人的項目中涉及重要功能的static class diagram繪製出來,好消息是我已經繪製了,請看下圖:
這個圖簡要的將項目中各個功能模塊的從屬關係展現出來,是我們了解別人項目源碼的第一步。通過這個圖,我們要逼迫自己做到以下幾點來理解這個開源項目:
- 這個項目由哪幾個功能模塊組成,每個模塊都提供了什麼功能
由上圖我們可以知道組成這個開源項目的功能模塊不多,對我們而言是好事,因為我們可以在短期內學完這個開源項目。我們再來看看每個功能模塊都提供了哪些功能:
local: 完成與socks5客戶端握手,建立連接,傳輸3個階段;加密轉發來自socks5客戶端的信息到server模塊,常見的socks5客戶端有chrome瀏覽器。local是需要用戶填寫好配置文件後,在PC端用Python啟動。
server: 解密來自local的信息,轉發到target server,並接收加密target server返回的信息,再將加密的信息返回給local。
daemon: 守護者,linux裡面有一種進程,是運行在後端,你沒法看到它的用戶界面的。而這個模塊的功能就是使得local和server運行在後端,用戶沒法直接干預它們。
shell: 外殼,主要用來檢測Python版本,解析配置文件,列印local和server的使用說明以及異常處理。
manager: 多用戶管理模塊,這個模塊主要用於管理多個用戶使用加密中轉服務(包括添加用戶,移除用戶,查看當前活躍用戶等介面),我們將在這裡添加一些介面來掛起流量用超的用戶以及每個月恢復掛起的用戶。
eventloop: 事件循環,這個模塊非常重要,因為它實現了非同步IO功能,通過它我們可以添加多個socket,並且監聽每個socket的狀態,然後根據狀態去處理對應的事件。
asyncdns: 非同步dns,這個模塊主要的功能是將域名非同步的解析成ip,Python中提供了域名解析介面,但卻是同步的。
tcprelay: tcp轉發器,這個模塊基於tcp連接,創建了一個socket,這個socket用於接收信息,並且創建TCPRelayHandler。
TCPRelayHandler: tcp轉發處理,這個模塊創建了2個socket,分別是_local_sock和_server_socket,同時這裡面維護了一個狀態機以及針對每一個狀態的處理函數。
udprelay: udp轉發器,這個模塊基於udp連接
lru_cache: least recently used緩存
encrypt: 提供加密解密方法,用於加/解密信息
2.2 模塊之間的依賴關係
local和server均為上層模塊,需要藉助很多下層模塊來提供完整的加密中轉服務。由於local和server的工作方式有一些區別,同時它們都會依賴相同的下層模塊,因此我們需要從local和server方面分析各功能模塊的依賴關係。通過上圖我們知道,local和server是程序的入口點,因此我們應該從local和server開始分析這個開源項目的模塊依賴關係。
local依賴的模塊
local需要將socks5客戶端的信息加密轉發到server,因此需要用到通信功能,在上圖模塊中能代表通信功能的模塊有tcprelay,TCPRelayHandler,udprelay;此外需要監聽每個通信socket的狀態,因此也需要eventloop來協助完成;在通信過程中還會涉及加密功能,因此會依賴encrypt模塊;在通信過程中需要緩存一些ip信息以及socket信息,為什麼要緩存,原因是省時間,因此我們需要用到lru_cache模塊;在和其它計算機通信之前,我們有時需要解析這台計算機的域名,最終獲得該計算機的ip進行通信連接,因此我們需要用到asyncdns模塊;local啟動的時候還需要讀取配置文件和檢查Python的版本,這個時候我們需要依賴shell模塊;倘如我們需要使local能夠像守護者進程一樣啟動,那麼我們就需要藉助daemon模塊。我相信寫到這裡你的思維已經凌亂了,那我們按照以下方式再捋一捋思路:
根據上圖,local做的事情主要是,讀取配置文件(包含了用戶的本地埠,本地ip,遠程埠,遠程ip,加密方法等,與shell模塊有關),根據配置文件的選項來決定是否通過daemon的方式運行local(與daemon模塊有關),創建1個tcprelay,1個udprelay,1個asyncdns,1個eventloop,並把tcprelay,udprelay,asyncdns這些實例加入到eventloop中,由eventloop監聽它們的網路讀寫狀態,這個時候local就運行起來了,eventloop進入死循環狀態,不停的監聽加入到它的socket,其中TCPRelayHandler會根據socks5客戶端發起請求由tcprelay創建,encrypt也隨著TCPRelayHandler的創建而創建,lru_cache也將隨著udprelay,asyncdns的創建而創建。最後我們發現,local只是把所有模塊的集聚地,它負責把所有模塊初始化,最後由eventloop接管所有網路讀寫狀態。
server依賴的模塊
server需要接收並解密local發送過來的信息,並轉發給target server,再將target server返回的信息進行加密並返回給local,與local不同的是,server需要在線添加或移除用戶的通信節點,因此除了與local所依賴的模塊一樣之外,還必須引入manager模塊,如下圖所示。同樣,當server啟動之後,eventloop將接管所有網路讀寫狀態。
3. RequestRelayer工作流
分析完開源項目的源碼之後,接著就要想想這個開源項目是如何工作的,以及我們要在現有的工作流上添加新的功能,下圖展示了RequestRelayer的工作流。
上圖分為3大部分,分別是SS Controller,local,server,我們來一一解釋。
- SS Controller
這個是web網站,使用Python開發的,之後我會單獨寫一篇文章,來告訴大家如何開發這個網站,目前我們只需要知道它是用來管理server,以及供用戶註冊購買自助開通賬號的網頁版應用,server會暴露一些api供SS Controller調用。
- local
這個是RequestRelayer的一部分,上圖有很多個local,每一個local是運行在不同的用戶PC機上,圖中有n個用戶,每個用戶運行自己的local,socks5客戶端會與local通信。注意每一個用戶都有自己的eventloop。
- server
這個是RequestRelayer的一部分,上圖只有一個server,這個server會為每一個用戶開通與之對應的通信節點,所有這些通信節點會由唯一一個eventloop輪詢監聽它們的網路讀寫狀態,圖中有n個用戶,每一個用戶與左邊的local一一對應。
好了,我們來把這三部分串起來:準備一台有公網IP的PC,使用Python運行server,這時我們將暴露一些restful api可供SS Controller使用了,最常用的api分別是add_port(為某個用戶開通服務),remove_port(移除到期用戶所使用的服務),detach_port(掛起流量用超的用戶所對應的服務節點),attach_port(恢復服務期限內流量用超用戶所對應的服務節點)。部署好server之後我們還需要一個具有公網的PC,部署SS Controller,這個SS Controller通過對server暴露的restful api排列組合,實現上圖ssManager模塊來操作server。當用戶通過SS Controller付費自助開通server上的服務之後,用戶需要在他自己的PC機上,為local配置好(本地ip,本地埠,遠程ip,遠程埠,密碼,加密方式),並啟動local。最後用戶打開socks5客戶端,比如chrome瀏覽器,填寫本地ip和本地埠,進而使用server上已開通的服務。上圖左邊部分與右邊部分之間通信是完全加密的(對傳輸的信息解密,就需要讓local與server知道密碼和加密方式)。
以上便是感性的認識,有點類似3個代表思想,下面我將結合上圖刨根問底將這個工作流所涉及的節點標示出來。請看下圖:
用戶訪問網站,以Paypal的方式購買服務,此時會調用ssManager模塊,該模塊會通過udp連接,對server發出加密請求,讓server為該用戶開通加密中轉服務,請求會被eventloop監聽到,然後由manager接收,並且對接收到的請求進行解密,再解析請求的命令,調用開通服務的介面,完成之後,server會添加一個綠色的節點,比如用戶1的連接sock,這個綠色的節點同時提供2種類型的連接通道,一種是tcp類型的通道,由tcprelay提供服務,另外一種是udp類型的通道,由udprelay提供服務。因此我們需要分2條線來逐一分析每種類型通信過程中所涉及的模塊。
當用戶自助開通加密中轉服務之後,緊接著用戶需要使用該服務,用戶可以選擇使用tcp類型的服務,也可以選擇udp類型的服務,下面將針對每一種類型的服務所涉及的模塊以及工作流一一分析。
3.1 tcp類型的加密中轉服務
上圖就是tcp類型的加密中轉服務的工作流,由四個子系統組成,分別是chrome,local,server,target server(比如google server)。local與chrome運行在用戶的PC端,使用tcprelay來監聽來自chrome的連接;server運行在具有公網IP的PC機上,等待來自上圖local的連接;target server代表其它公司的server,比如說google的web server或者facebook的web server,chrome決定server訪問哪個target server;比如在chrome里輸入http://www.google.com,那麼target server就是google的web server。這個過程的詳細步驟如下所描述。
1、chrome發起請求,通知tcprelay創建TCPRelayHandler,並將TCPRelayHandler中的_local_sock加入到eventloop,這一步主要是建立TCP連接,連接chrome與TCPRelayHandler._local_sock,這一步過後,local.TCPRelayHandler將進入INIT狀態
- local.TCPRelay._server_socket的處理邏輯如下
def handle_event(self, sock, fd, event): conn = self._server_socket.accept() TCPRelayHandler(self, self._fd_to_handlers, self._eventloop, conn[0], self._config, self._dns_resolver, self._is_local)
2、chrome向TCPRelayHandler._local_sock發起請求,請求內容為socks5定義的第一階段內容(認證協商)。此時會由chrome發出所有可接受的認證方法到TCPRelayHandler._local_sock
- local.TCPRelayHandler._local_sock的處理邏輯如下
#chrome會發送以下格式的請求+----+----------+----------+|VER | NMETHODS | METHODS |+----+----------+----------+| 1 | 1 | 1 to 255 |+----+----------+----------+#處理邏輯如下def _handle_stage_init(self, data): _check_auth_method(self, data)def _check_auth_method(self, data): # VER, NMETHODS, and at least 1 METHODS if len(data) < 3: logging.warning(method selection header too short) raise BadSocksHeader socks_version = common.ord(data[0]) nmethods = common.ord(data[1]) if socks_version != 5: logging.warning(unsupported SOCKS protocol version + str(socks_version)) raise BadSocksHeader if nmethods < 1 or len(data) != nmethods + 2: logging.warning(NMETHODS and number of METHODS mismatch) raise BadSocksHeader noauth_exist = False for method in data[2:]: if common.ord(method) == METHOD_NOAUTH: noauth_exist = True break if not noauth_exist: logging.warning(none of SOCKS METHODs requested by client is supported) raise NoAcceptableMethods
3、TCPRelayHandler._local_sock將支持的認證方式返回給chrome,完成socks5第一階段。完成這一步之後local.TCPRelayHandler將進入ADDR狀態
- local.TCPRelayHandler._local_sock的處理邏輯如下
#在此處將**socks5**定義的方法X00 NO AUTHENTICATION REQUIRED返回,同時切換到ADDR狀態def _handle_stage_init(self, data): self._write_to_sock(bx05 0, self._local_sock) self._stage = STAGE_ADDR#返回的格式如下+----+--------+|VER | METHOD |+----+--------+| 1 | 1 |+----+--------+
4、 chrome接著向TCPRelayHandler._local_sock發起建立連接請求,這個階段屬於socks5的第2個階段(建立連接)
- local.TCPRelayHandler._local_sock的處理邏輯如下
#建立連接請求格式如下+----+-----+-------+------+----------+----------+-------+|VER | CMD | RSV | ATYP | DST.ADDR | DST.PORT | DATA |+----+-----+-------+------+----------+----------+-------+| 1 | 1 | 1 | 1 | Variable | 2 | m |+----+-----+-------+------+----------+----------+-------+比如訪問www.google.com的報文如下VER CMD RSV ATYP 域名長度 域名 80埠0x05 0x01 0x00 0x03 0x0a bgoogle.com 0x00 0x50#在此處處理請求的數據CMD_CONNECT = 0x01def _handle_stage_addr(self, data): if self._is_local: cmd = common.ord(data[1]) if cmd == CMD_CONNECT: # just trim VER CMD RSV data = data[3:] #執行這一步之後的data的格式如下 +------+----------+----------+------+ | ATYP | DST.ADDR | DST.PORT | DATA | +------+----------+----------+------+ | 1 | n | 2 | m | +------+----------+----------+------+
5、TCPRelayHandler._local_sock響應來自chrome發起的建立連接請求,完成socks5第2階段(建立連接)。完成這一步後將進入DNS狀態
- local.TCPRelayHandler._local_sock的處理邏輯如下
def _handle_stage_addr(self, data): self._stage = STAGE_DNS # forward address to remote # 響應格式如下 +----+-----+-------+------+----------+----------+ |VER | REP | RSV | ATYP | BND.ADDR | BND.PORT | +----+-----+-------+------+----------+----------+ | 1 | 1 | X00 | 1 | Variable | 2 | +----+-----+-------+------+----------+----------+ self._write_to_sock((bx05x00x00x01 bx00x00x00x00x10x10), self._local_sock) data_to_send = self._encryptor.encrypt(data) self._data_to_write_to_remote.append(data_to_send)
6、local根據用戶配置的遠程伺服器地址發起非同步域名解析並直接返回,這裡的遠程伺服器地址就是指上圖運行著server的PC
- local.DNSResolver._sock的處理邏輯如下
def _handle_stage_addr(self, data): # notice here may go into _handle_dns_resolved directly # self._chosen_server[0]來自json配置文件,也就是上圖server的域名或者IP self._dns_resolver.resolve(self._chosen_server[0], self._handle_dns_resolved)
7、 得到server所運行的PC的公網IP
- local.DNSResolver._sock的處理邏輯如下
def _handle_dns_resolved(self, result, error): ip = result[1] remote_addr = ip #self._chosen_server[1]來自json配置的server監聽的遠程埠 remote_port = self._chosen_server[1]
8、 local中的TCPRelayHandler根據用戶配置的遠程埠以及第7步得到的IP,創建_remote_sock並向上圖server.tcprelay建立TCP連接,並將_remote_sock加入到eventloop進行監聽,完成這一步了就進入了local的TCPRelayHandler就進入了CONNECTING狀態。同時server新建一個TCPRelayHandler,把TCPRelayHandler._local_sock加入到eventloop中,緊接著進入INIT狀態
- local.DNSResolver._sock的處理邏輯如下
def _handle_dns_resolved(self, result, error): remote_sock = self._create_remote_socket(remote_addr, remote_port) remote_sock.connect((remote_addr, remote_port)) self._loop.add(remote_sock, eventloop.POLL_ERR | eventloop.POLL_OUT, self._server) self._stage = STAGE_CONNECTING #打開上行通道讀(_local_sock)和寫(_remote_sock)的閥門,讓數據流通 #由於remote_sock.connect是非同步操作,那麼就需要有CONNECTING狀態 self._update_stream(STREAM_UP, WAIT_STATUS_READWRITING) #打開下行通道讀(_remote_sock)的閥門,讓數據只允許讀進來,不允許寫(_local_sock) self._update_stream(STREAM_DOWN, WAIT_STATUS_READING)
- server.TCPRelay._server_socket的處理邏輯如下
# eventloop會觸發這個函數,這個函數屬於TCPRelaydef handle_event(self, sock, fd, event): # handle events and dispatch to handlers conn = self._server_socket.accept() TCPRelayHandler(self, self._fd_to_handlers, self._eventloop, conn[0], self._config, self._dns_resolver, self._is_local)
9、 local._remote_sock將_data_to_write_to_remote中緩衝的已加密的數據發送給server._local_sock,並且進入STREAM狀態。在server端,server._local_sock接收加密的數據,並解密數據,對數據解析,並停止對server._local_sock讀取數據,緊接著server.TCPRelayHandler進入DNS狀態
- local.TCPRelayHandler._remote_sock的處理邏輯如下
def handle_event(self, sock, event): # order is important if sock == self._remote_sock: if event & eventloop.POLL_OUT: self._on_remote_write() def _on_remote_write(self): # handle remote writable event self._stage = STAGE_STREAM data = b.join(self._data_to_write_to_remote) self._data_to_write_to_remote = [] self._write_to_sock(data, self._remote_sock)
- server.TCPRelayHandler._local_sock的處理邏輯如下
def _on_local_read(self): data = self._local_sock.recv(BUF_SIZE) data = self._encryptor.decrypt(data) if (not is_local and self._stage == STAGE_INIT): self._handle_stage_addr(data) def _handle_stage_addr(self, data): header_result = parse_header(data) addrtype, remote_addr, remote_port, header_length = header_result # pause reading self._update_stream(STREAM_UP, WAIT_STATUS_WRITING) self._stage = STAGE_DNS if len(data) > header_length: self._data_to_write_to_remote.append(data[header_length:]) # notice here may go into _handle_dns_resolved directly self._dns_resolver.resolve(remote_addr, self._handle_dns_resolved)
10、 注意了,這一步發生在server端,非同步dns,並停止從TCPRelayHandler._local_sock讀取數據
- server.DNSResolver._sock的處理邏輯如下
def handle_event(self, sock, event): self._on_local_read() def _on_local_read(self): if (not is_local and self._stage == STAGE_INIT): self._handle_stage_addr(data) def _handle_stage_addr(self, data): self._stage = STAGE_DNS self._dns_resolver.resolve(remote_addr, self._handle_dns_resolved)
11、 TCPRelayHandler通過target server的IP以及第10步中獲得的埠向target server發起TCP連接,並將TCPRelayHandler中的_remote_sock加入eventloop中,此時可以讀取TCPRelayHandler._local_sock中數據,完成這一步之後server.TCPRelayHandler將進入CONNECTING狀態
- server.DNSResolver._sock的處理邏輯如下
def _handle_dns_resolved(self, result, error): # else do connect remote_sock = self._create_remote_socket(remote_addr, remote_port) remote_sock.connect((remote_addr, remote_port)) self._loop.add(remote_sock, eventloop.POLL_ERR | eventloop.POLL_OUT, self._server) self._stage = STAGE_CONNECTING self._update_stream(STREAM_UP, WAIT_STATUS_READWRITING) self._update_stream(STREAM_DOWN, WAIT_STATUS_READING)
12、 若target server響應之後,server端將發送數據給target server。這一步使得server.TCPRelayHandler進入STREAM狀態
- server.TCPRelayHandler._remote_sock的處理邏輯如下
def handle_event(self, sock, event): self._on_remote_write()def _on_remote_write(self): # handle remote writable event self._stage = STAGE_STREAM if self._data_to_write_to_remote: data = b.join(self._data_to_write_to_remote) self._data_to_write_to_remote = [] self._write_to_sock(data, self._remote_sock) else: self._update_stream(STREAM_UP, WAIT_STATUS_READING)
13、 通過第9步之後,local的TCPRelayHandler就進入了STREAM狀態,在這個狀態下,local接收到chrome發送過來的數據,然後進行簡單的加密轉發。chrome發送出來的數據將由local._local_sock接收
- local.TCPRelayHandler._local_sock的處理邏輯如下
def handle_event(self, sock, event): self._on_local_read() def _on_local_read(self): data = self._local_sock.recv(BUF_SIZE)
14、 local.TCPRelayHandler._local_sock將接收到的數據加密,並通過local.TCPRelayHandler._remote_sock轉發給server端
- local.TCPRelayHandler._local_sock與local.TCPRelayHandler._remote_sock的處理邏輯如下
def _on_local_read(self): data = self._local_sock.recv(BUF_SIZE) self._handle_stage_stream(data) def _handle_stage_stream(self, data): data = self._encryptor.encrypt(data) self._write_to_sock(data, self._remote_sock)
- server.TCPRelayHandler._local_sock的處理邏輯如下
def _on_local_read(self): data = self._local_sock.recv(BUF_SIZE) if not is_local: data = self._encryptor.decrypt(data) self._handle_stage_stream(data) def _handle_stage_stream(self, data): self._write_to_sock(data, self._remote_sock)
15、 這一步發生在server端與target server端。在這個過程中,對第15步中獲得的數據解密,並通過_remote_sock將解密之後的數據發送給target server
- server.TCPRelayHandler._remote_sock的處理邏輯如下
def _on_local_read(self): if not is_local: data = self._encryptor.decrypt(data) self._handle_stage_stream(data) def _handle_stage_stream(self, data): self._write_to_sock(data, self._remote_sock)
16、 target server將請求的數據返回給server端,由server端的_remote_sock接收
- server.TCPRelayHandler._remote_sock的處理邏輯如下
def _on_remote_read(self): data = self._remote_sock.recv(BUF_SIZE)
17、 將target server返回的數據進行加密,通過server中TCPRelayHandler的_local_sock,返回給local中TCPRelayHandler的_remote_sock
- local.TCPRelayHandler._remote_sock的處理邏輯如下
def _on_remote_read(self): data = self._remote_sock.recv(BUF_SIZE)
- server.TCPRelayHandler._local_sock的處理邏輯如下
def _on_remote_read(self): data = self._encryptor.encrypt(data) self._write_to_sock(data, self._local_sock)
18、 local中TCPRelayHandler對server返回的數據進行解密,最終返回給chrome
- local.TCPRelayHandler._remote_sock的處理邏輯如下
def _on_remote_read(self): data = self._encryptor.decrypt(data) self._write_to_sock(data, self._local_sock)
注意:local與server共同完成了以上18步,它們各自的TCPRelayHandler均有自己的狀態,互相獨立,但是又有一點聯繫,因為server.TCPRelay.TCPRelayHandler需要由local.TCPRelay.TCPRelayHandler來驅動(這一步發生在第9步,local.TCPRelay.TCPRelayHandler將進入STREAM狀態,同時將數據發送給server,而server.TCPRelay.TCPRelayHandler也將開始自己的狀態切換)。為了讓TCPRelayHandler區別對待每一個狀態,我們需要定義以下幾個不同的狀態。每個狀態均可被local與server共用,只不過server端不需要INIT狀態。那麼我們來看看這些狀態都有哪些?
STAGE_INIT = 0STAGE_ADDR = 1STAGE_UDP_ASSOC = 2STAGE_DNS = 3STAGE_CONNECTING = 4STAGE_STREAM = 5STAGE_DESTROYED = -1
以上就是所有狀態了,需要注意的是server不考慮狀態INIT(下圖紅色節點所示),並且直接從ADDR這個狀態開始,而local有INIT這個狀態,並且從INIT這個狀態開始,如下圖所示:
ADDR,DNS,CONNECTING,STREAM,這些狀態只要處理過程出現錯誤,就會轉移到DESTROY狀態。既然要切換狀態,那麼肯定要有一個驅動狀態變化的源頭,在這裡驅動狀態變化的源頭就是eventloop。它的執行步驟如下:
eventloop->TCPRelay.handle_event ->TCPRelayHandler(圖中的1,執行這一步後進入INIT狀態) ->TCPRelayHandler.handle_event ->_handle_stage_init(圖中的2,執行這一步後進入ADDR狀態,如果是server,那麼這一步是不會執行的,直接進入下一步) ->_handle_stage_addr(圖中的3,執行這一步之後進入DNS狀態) ->_handle_dns_resolved(圖中的4,執行這一步之後進入CONNECTING狀態) ->_on_remote_write(圖中的5,執行這一步之後進入STREAM狀態)
以上便是相關的代碼。那麼問題來了:eventloop是如何獲得發生連接的TCPRelay?而取到TCPRelay之後,又是如何獲得活躍的TCPRelayHandler?eventloop與TCPRelay有什麼樣的關係?TCPRelay與TCPRelayHandler又有什麼樣的關係?帶著這4個問題我們來看看下圖:
1、 每個TCPRelayHandler都會有2個socket,分別是_local_sock與_remote_sock。每個TCPRelay都會關聯多個TCPRelayHandler,這個關聯關係通過_fd_to_handlers字典建立聯繫。其中的fd就是file descriptor的簡稱,每一個socket都會有唯一的fd。也就是說每個TCPRelayHandler都會被作為值,分2次添加到_fd_to_handlers,而鍵分別是_local_sock與_remote_sock的fd,偽代碼如下:
tcprelay = TCPRelay()#創建10個TCPRelayHandler,一共有10對_local_sock與_remote_sock#每對_local_sock與_remote_sock擁有相同的TCPRelayHandler#每一個socket都有唯一的fd,因此tcprelay._fd_to_handlers的鍵不會重複for i in range(10): handler = TCPRelay.Create_TCPRelayHandler() tcprelay._fd_to_handlers[handler._local_sock.fd] = handler tcprelay._fd_to_handlers[handler._remote_sock.fd] = handler#注意_fd_to_handlers是TCPRelay的成員變數
2、 local和server均有一個eventloop存在(有且只有一個),而eventloop里維護了一個字典成員變數self._fdmap = {} # (f, handler),這個變數維護了eventloop與TCPRelay的關係,請看如下偽代碼:
#遍歷所有TCPRelay的_server_socket,遍歷所有TCPRelayHandler的_remote_sock & _local_sockfor sock in [TCPRelay1 TCPRelay2 ... ] [TCPRelayHandler1 TCPRelayHandler2 ...]: fd = sock.fd f = sock #根據sock獲得對應的TCPRelay實例,這裡有2種情況 #1.如果sock == _server_socket,那麼handler = TCPRelay #2.如果sock == _remote_sock || sock == _local_sock,那麼handler = TCPRelayHandler._server #而self與self._server是指向同一個TCPRelay實例 handler = get_TCPRelay_by_sock(sock) self._fdmap[fd]=(f,handler)
3、 了解eventloop與TCPRelay的關係以及TCPRelay與TCPRelayHandler的關係之後,讓我們來了解一下eventloop是如何驅動TCPRelayHandler狀態變化的。
#########################################################eventloop啟動時,僅僅會監聽TCPRelay._server_socket,此時沒有任何的TCPRelayHandler。當chrome向local發送連接建立請求時,此時eventloop監聽到TCPRelay._server_socket可讀,那麼eventloop將根據以下偽代碼獲得TCPRelay._server_socket對應的TCPRelay實例,並調用TCPRelay.handle_event函數將執行的上下文切換到TCPRelay。TCPRelay中會創建一個TCPRelayHandler實例(此時這個實例會進入INIT狀態)。把TCPRelayHandler._local_sock添加到eventloop中,為TCPRelayHandler._local_sock與TCPRelay建立映射,使得eventloop可以通過TCPRelayHandler._local_sock獲得創建這個TCPRelayHandler的TCPRelay,進而通過TCPRelay._fd_to_handlers獲取TCPRelayHandler,從而可以將程序執行的上下文切換到TCPRelayHandler,並調用TCPRelayHandler.handle_event來根據當前狀態選擇執行TCPRelayHandler所定義的函數,最終使得某個TCPRelayHandler進入最終的STREAM狀態。sock = self._server_socketfd = sock.fdsock_handler = eventloop._fdmap[fd]handler = sock_handler[1]handler.handle_event(sock, fd, event)#以下是對應代碼的分析#每一個TCPRelay負責監聽一個埠,而負責監聽這個埠的socket是self._server_socket#因此需要定義一個TCPRelay的類#這個類定義了一個方法叫handle_events,並且會由eventloop中的一個dict變數_fdmap建立_server_socket->handle_events映射#如果eventloop發現_server_socket可讀,說明發送端調用connect函數向_server_socket發送連接請求,那麼eventloop就會通過_fdmap獲取handle_events#緊接著eventloop會調用handle_events,通過accept接收來自發送端的連接請求,並得到conn連接(注意TCP的3次握手發生在connect與accept之間),進入以下代碼的第1步,創建TCPRelayHandler#在創建TCPRelayHandler過程中,self._local_sock記錄了發送端的socket(來自conn[0]),此時self._local_sock將用於連接發送端#注意,得到self._local_sock之後,我們也必須把它加入eventloop中,用_fdmap建立_local_sock->TCPRelay.handle_events映射#也就是說一個TCPRelay會管理很多個TCPRelayHandler,而每個TCPRelayHandler都會維護一個_local_sock與_remote_sock,eventloop會為_local_sock->TCPRelay.handle_events與_remote_sock->TCPRelay.handle_events建立映射#那麼問題來了,當來自sock != self._server_socket時,我如何知道選擇哪一個TCPRelayHandler?#這個時候就要藉助TCPRelay里的dict變數_fd_to_handlers,根據sock對應的fd來獲取TCPRelayHandler#然後將處理轉發給TCPRelayHandler的handle_event#我們來總結一下這個過程:eventloop監聽到可讀寫的socket,然後根據_fdmap取得TCPRelay實例,並調用TCPRelay.handle_event,#如果這個socket是_server_socket那麼就在TCPRelay中完成調用,如果socket不是_server_socket,那麼就根據通過socket從_fd_to_handlers#取出TCPRelayHandler實例,並調用TCPRelayHandler.handle_event方法#每次調用TCPRelayHandler.handle_event的方法時會根據不同的情況調用2個函數_on_local_read與_on_remote_write來切換狀態########################################################### define TCPRelay classclass TCPRelay(object): def handle_event(self, sock, fd, event): # handle events and dispatch to handlers # eventloop監聽到與TCPRelay對應的socket變化了,調用此函數 if sock == self._server_socket: conn = self._server_socket.accept() ######################################################### # 1.創建TCPRelayHandler,讓處理進入INIT狀態 ######################################################### TCPRelayHandler(self, self._fd_to_handlers, self._eventloop, conn[0], self._config, self._dns_resolver, self._is_local) else: # handler is TCPRelayHandler instance handler = self._fd_to_handlers.get(fd, None) # 將處理轉移給TCPRelayHandler的handle_event函數,2,3,4,5步都在這個函數里完成,接著看下面的代碼 handler.handle_event(sock, event)#define TCPRelayHandler classclass TCPRelayHandler(object): def handle_event(self, sock, event): # handle all events in this handler and dispatch them to methods # order is important if sock == self._remote_sock: if event & eventloop.POLL_OUT: ######################################################### # 5.在這裡切換到STREAM狀態 ######################################################### self._on_remote_write() elif sock == self._local_sock: if event & (eventloop.POLL_IN | eventloop.POLL_HUP): self._on_local_read() def _on_local_read(self): # handle all local read events and dispatch them to methods for # each stage is_local = self._is_local data = None if self._stage == STAGE_STREAM: self._handle_stage_stream(data) return elif is_local and self._stage == STAGE_INIT: ######################################################### # 2.socks5協議中的協商認證方式,處理完成以後將切換到ADDR狀態 ######################################################### self._handle_stage_init(data) elif (is_local and self._stage == STAGE_ADDR) or (not is_local and self._stage == STAGE_INIT): ######################################################### # 3. 完成socks5協議中的建立連接階段,並且停止讀取信息同時切換到DNS狀態,緊接著進行域名解析,最終由eventloop監測dns socket變化,調用_handle_dns_resolved切換到CONNECTING狀態 ######################################################### self._handle_stage_addr(data) ######################################################### # 4.切換到CONNECTING狀態 ######################################################### def _handle_dns_resolved(self, result, error): ip = result[1] self._stage = STAGE_CONNECTING remote_addr = ip if self._is_local: remote_port = self._chosen_server[1] else: remote_port = self._remote_address[1] remote_sock = self._create_remote_socket(remote_addr,remote_port) remote_sock.connect((remote_addr, remote_port)) self._loop.add(remote_sock, eventloop.POLL_ERR | eventloop.POLL_OUT, self._server)
3.2 udp類型的加密中轉服務
udp類型的加密中轉服務分為2部分:1.TCPRelayHandler處理socks5協議(只有在local端才會處理);2.UDPRelay轉發(local端與server端都會處理)。因此我們需要根據這2部分來理解udp類型的加密中轉服務。
1、TCPRelayHandler處理socks5協議(只有在local端才會處理) 注意這個階段的邏輯只會在local端才會執行,server端肯定不會執行。因此請把以下發生的場景切換到local上下文。
socks5的認證協商階段
def _handle_stage_init(self, data): #+----+----------+----------+ #|VER | NMETHODS | METHODS | #+----+----------+----------+ #| 1 | 1 | 1 to 255 | #+----+----------+----------+ self._check_auth_method(data) self._write_to_sock(bx05 0, self._local_sock) self._stage = STAGE_ADDR
socks5的UDP assosiate階段
def _handle_stage_addr(self, data): #+----+-----+-------+------+----------+----------+ #|VER | CMD | RSV | ATYP | DST.ADDR | DST.PORT | #+----+-----+-------+------+----------+----------+ #| 1 | 1 | 1 | 1 | Variable | 2 | #+----+-----+-------+------+----------+----------+ #DST.ADDR | DST.PORT都沒有被用到 cmd = common.ord(data[1]) if cmd == CMD_UDP_ASSOCIATE: logging.debug(UDP associate) if self._local_sock.family == socket.AF_INET6: header = bx05x00x00x04 else: header = bx05x00x00x01 addr, port = self._local_sock.getsockname()[:2] addr_to_send = socket.inet_pton(self._local_sock.family, addr) port_to_send = struct.pack(>H, port) #+----+-----+-------+------+----------+----------+ #|VER | REP | RSV | ATYP | BND.ADDR | BND.PORT | #+----+-----+-------+------+----------+----------+ #| 1 | 1 | 1 | 1 | Variable | 2 | #+----+-----+-------+------+----------+----------+ #|<--------header--------->|addr_to_send|port_to_send| #注意socks5客戶端會建立一個UDP類型的socket並且使用|addr_to_send|port_to_send|來向local發送消息 #同時注意local啟動的時候會在port_to_send上建立TCP與UDP的socket #因此socks5客戶端拿到|addr_to_send|port_to_send|之後,就可以向local的UDP socket發送數據了 self._write_to_sock(header + addr_to_send + port_to_send, self._local_sock) self._stage = STAGE_UDP_ASSOC # just wait for the client to disconnect return
注意這個階段將會把TCPRelayHandler切換到STAGE_UDP_ASSOC狀態,在這個狀態下將不會接收任何TCP連接發來的請求,久而久之LRUCache就會把這個狀態下的TCPRelayHandler銷毀。
2、UDPRelay轉發(local端與server端都會處理)
這一階段都會發生在local與server端,如下圖所示。在這個圖中主要分為上行傳輸與下行傳輸。在邏輯上,socks5客戶端,local.UDPRelay,server.UDPRelay,udp目標伺服器都有自己的udpsock,並且它們的udpsock是一一對應傳輸數據的,只不過實際實現的時候需要藉助_server_sock來幫忙轉發數據。
有了上圖為基礎,我們把整個過程分為以下2個階段,分別是上行傳輸與下行傳輸。
上行傳輸
上行傳輸是socks5客戶端將數據轉發給udp目標伺服器,如下圖所示
傳輸過程為:sock5客戶端.udpsock1將數據發送給local.UDPRelay._server_sock,local.UDPRelay._server_sock獲取數據,對數據進行加密,並且由local.UDPRelay.udpsock1將數據轉發給server.UDPRelay._server_sock,server.UDPRelay._server_sock獲取數據,對數據進行解密,並由server.UDPRelay.udpsock1將數據轉發給udp目標伺服器.udpsock1。這個過程涉及的代碼如下:
def _handle_server(self): server = self._server_socket #1.local端獲取socks5發送過來的數據,格式如下: # +----+------+------+----------+----------+----------+ # |RSV | FRAG | ATYP | DST.ADDR | DST.PORT | DATA | # +----+------+------+----------+----------+----------+ #~~~~~~~~~~~~~~~~~~~ #4.server端接收到local端發送過來的數據,格式如下: # +------+----------+----------+----------+ # | ATYP | DST.ADDR | DST.PORT | DATA | # +------+----------+----------+----------+ data, r_addr = server.recvfrom(BUF_SIZE) key = None iv = None if self._is_local: frag = common.ord(data[2]) if frag != 0: logging.warn(UDP drop a message since frag is not 0) return else: #local端將RSV,FRAG剔除,剔除之後數據格式如下 # +------+----------+----------+----------+ # | ATYP | DST.ADDR | DST.PORT | DATA | # +------+----------+----------+----------+ data = data[3:] else: data, key, iv = encrypt.dencrypt_all(self._password, self._method, data) header_result = parse_header(data) addrtype, dest_addr, dest_port, header_length = header_result if self._is_local: server_addr, server_port = self._get_a_server() else: server_addr, server_port = dest_addr, dest_port addrs = self._dns_cache.get(server_addr, None) if addrs is None: addrs = socket.getaddrinfo(server_addr, server_port, 0, socket.SOCK_DGRAM, socket.SOL_UDP) self._dns_cache[server_addr] = addrs af, socktype, proto, canonname, sa = addrs[0] key = client_key(r_addr, af) #2.local端,為socks5客戶端所對應的udpsock建立映射關係 #~~~~~~~~~~~~ #5.server端,為local端所對應的udpsock建立映射關係 client = self._cache.get(key, None) if not client: client = socket.socket(af, socktype, proto) client.setblocking(False) self._cache[key] = client self._client_fd_to_server_addr[client.fileno()] = r_addr self._sockets.add(client.fileno()) self._eventloop.add(client, eventloop.POLL_IN, self) if self._is_local: key, iv, m = encrypt.gen_key_iv(self._password, self._method) data = encrypt.encrypt_all_m(key, iv, m, self._method, data) data = data[header_length:] #3.local端,使用udpsock將數據發送到server端 #~~~~~~~~~~~~~~ #6.server端,使用udpsock將數據發送到udp目標伺服器 client.sendto(data, (server_addr, server_port))
下行傳輸
下行傳輸是udp目標伺服器向socks5客戶端傳輸數據。如下圖所示:
對應的代碼如下:
def _handle_client(self, sock): #1.server端,接收來自udp目標伺服器發來的數據 #~~~~~~~~~~~~ #4.local端,接收來自server端發來的數據 data, r_addr = sock.recvfrom(BUF_SIZE) if not self._is_local: data = pack_addr(r_addr[0]) + struct.pack(>H, r_addr[1]) + data response = encrypt.encrypt_all(self._password, self._method, 1, data) else: data = encrypt.encrypt_all(self._password, self._method, 0, data) header_result = parse_header(data) addrtype, dest_addr, dest_port, header_length = header_result response = bx00x00x00 + data #2.server端,獲取與local對應的udp地址和埠 #~~~~~~~~~~~~~~ #5.local端,獲取與socks5客戶端對應的udp地址和埠 client_addr = self._client_fd_to_server_addr.get(sock.fileno()) #3.server端,使用_server_socket將數據發送到local端 #~~~~~~~~~~~~~~ #6.local端,使用_server_socket將數據發送到socks5客戶端 self._server_socket.sendto(response, client_addr)
4. 為RequestRelayer添加新功能
4.1 用戶場景分析
在為RequestRelayer添加新的功能之前,我們需要分析用戶的使用場景有哪些。只有了解了用戶的使用場景,我們才能清楚的知道至少需要為RequestRelayer添加哪些功能,而不是盲目的為RequestRelayer添加功能。為了操作多台RequestRelayer伺服器,我們需要藉助於web應用。原因是我們迫切需要在任何時候,任何設備上都可以以管理員的身份登錄到web應用對每一個RequestRelayer伺服器進行操作。因此我們要實現自己的一個web應用(也就是我們之前反覆提到的SS Controller)來管理所有的RequestRelayer,而我們應該根據用戶的使用場景來思考哪些功能是SS Controller應該提供的。以下便是一些用戶使用場景,可以供大家參考:
- 某台運行RequestRelayer服務的機器宕機了,之前為每一個用戶開通的服務將無法使用了,過了一段時間這台機器恢復正常了,並重新啟動了RequestRelayer服務,作為管理員,我希望能重新恢復該RequestRelayer服務的所有用戶。另外一個用戶場景是將某台RequestRelayer伺服器上已開通的用戶服務轉移到另外一台RequestRelayer伺服器上。因此SS Controller需要提供以下介面,用於向指定伺服器,向RequestRelayer服務請求,在指定埠上開通加密轉發服務,並且指定密碼。
ssInfo=(server_ip,server_port,user_pwd)def addAccountOnSS(ssInfo):
- 在SS Controller中註冊的用戶或者管理員能夠自助開通RequestRelayer服務,因此我們需要一個介面,指定伺服器IP地址以及用戶密碼
ssInfo=(server_ip,user_pwd)def createAccountOnSS(ssInfo):
- 在SS Controller中,管理員可以關閉某個用戶已開通的RequestRelayer服務
ssInfo=(server_ip,server_port)def remove_account_on_ss(ssInfo):
- 用戶把流量用超了,此時RequestRelayer將暫時停止為該用戶服務,直到下一個月的時候,如果用戶還在服務期限內,那麼將會調用以下介面來讓RequestRelayer恢復該用戶的服務
ssInfo=(server_ip,server_port)coresponse to ss attach interfacedef restore_account_on_ss(ssInfo):
- 返回某個RequestRelayer服務的所有用戶
ssInfo=server_ipdef get_account_list_on_ss(ssInfo):
- 返回某個RequestRelayer服務的所有用戶的剩餘流量
ssInfo=server_ipdef auto_retrieve_all_available_transfer_on_ss(ssInfo):
- 獲取某個RequestRelayer上某個用戶當前剩餘的流量
ssInfo=(server_ip,server_port)def get_current_available_transfer_on_ss(ssInfo):
- 更新某個RequestRelayer上某個用戶的剩餘流量
ssInfo=(server_ip,server_port,available_transfer)def update_current_available_transfer_on_ss(ssInfo):
- 某個RequestRelayer crash了,然後需要重啟這個crash的RequestRelayer,重啟之後,需要重新為這個crash的RequestRelayer重新恢復crash之前所服務的所有用戶
ssInfo=(server_ip,port_password_dict)def auto_restart_on_ss(ssInfo):
4.2 為每個用戶場景添加對應的功能
根據以上列出的場景,我們需要相應的為RequestRelayer添加對應的介面來對接以上的應用場景。以下便是RequestRelayer需要添加的新的命令。
- 1-獲取RequestRelayer上正在服務的所有用戶
- 2-為用戶在RequestRelayer上創建該用戶的服務
- 3-恢復RequestRelayer crash之前所服務的所有用戶
- 4-獲取RequestRelayer上所有用戶的剩餘流量
- 5-獲取RequestRelayer上某個用戶的剩餘流量
- 6-更新RequestRelayer上某個用戶的剩餘流量
針對以上幾個命令我們需要在manager添加對應的if-else判斷
def handle_event(self, sock, fd, event): if sock == self._control_socket and event == eventloop.POLL_IN: encryptedData, self._control_client_addr = sock.recvfrom(BUF_SIZE) data = encrypt.encrypt_all(self._config[manager_password], self._config[manager_method], 0, encryptedData) parsed = self._parse_command(data) if parsed: command, config = parsed a_config = self._config.copy() if config: # let the command override the configuration file a_config.update(config) if server_port not in a_config: logging.error(can not find server_port in config) else: if command == add: self.add_port(a_config) self._send_control_data(bok) elif command == remove: self.remove_port(a_config) self._send_control_data(bok) elif command == attach: self.attach_port(a_config) self._send_control_data(bok) elif command == ping: self._send_control_data(bpong) elif command == 1: logging.info("1 command") port_list = {} for k,v in self._relays.items(): port_list[k] = v[2] self.send_data(port_list) elif command == 2: logging.info("2 command") a_config[server_port]=dispatch_port_mgr.dispatch_port() self.add_port(a_config) self._send_control_data(str(a_config[server_port])) elif command == 3: logging.info("3 command") ports_arr = a_config[valid_portpwds] del a_config[valid_portpwds] for port, password_transfer in ports_arr.items(): my_config = a_config.copy() my_config[server_port] = int(port) my_config[password] = password_transfer[0] self._available_transfer[my_config[server_port]]=long(password_transfer[1]) self.add_port(my_config) self._send_control_data(bok) elif command == 4: logging.info("4 command") r={} for k, v in self._available_transfer.items(): r[k] = v self.send_data(r) elif command == 5: logging.info("5 transfer") self._send_control_data(str(self._available_transfer[a_config[server_port]])) elif command == 6: logging.info("6 command") self._available_transfer[a_config[server_port]] = a_config[available_transfer] self._send_control_data(bok) else: logging.error(unknown command %s, command)
推薦閱讀: