如何一步步構建加密聊天應用
譯文聲明
本文是翻譯文章,文章原作者spec,文章來源:http://0x00sec.org
原文地址:https://0x00sec.org/t/encrypted-chat-part-i/5839一、前言
前一陣子我正在編寫一個加密聊天客戶端以及伺服器,想通過開發過程了解關於加密方法的更多知識,也了解如何在網路協議中實現加密協議(如基於TLS的SSH或者HTTPs協議)。現在這個工作已基本完成,我想與大家一起分享下我學到的一些經驗及知識。
本文可以分為兩部分,第一部分介紹了一些相關的基本概念,第二部分會深入分析代碼,將這些概念與具體程序結合起來(@oaktree也建議我採用這種組織結構)。
話不多說,開始步入正題。
二、基本概念
我們的目標是創建一個共享密鑰,以便加密及解密客戶端和伺服器交互的消息。
為了創建聊天室,客戶端必須能夠以某種安全的方式將消息發送給伺服器,反之亦然。對稱加密(symmetric encryption)機制採用了相同的密鑰來加密及解密消息,我們可以考慮採用這種方式來實現。然而如果採用這種方式,那麼通信雙方必須就通過某種方式來獲取相同的密鑰。
請注意:加密密鑰實際上是非常大的數字。
那麼我們怎麼樣才能讓兩台計算機使用相同的大數呢?我們不能直接發送這個數字,這樣做會在第一步就破壞掉整個加密體系,因為這樣竊聽者就可以竊取密鑰,解開加密信息。相反,我們會使用Diffie-Hellman密鑰交換演算法來完成這個任務。
第一步:DH密鑰交換演算法
這種方法是創建共享密鑰的絕佳方法。
Diffie-Hellman(簡稱為DH)演算法可以讓兩個實體在若干次交互後獲得相同的數字,並且該過程沒有直接公開這個數字。許多DH實現方法會在有限循環中讓整數對p
取模(p
是一個素數),然而我們也可以選擇使用其他方式(比如橢圓曲線演算法,大家在瀏覽加密網頁時也用到了這種技術)。
我們可以拋開複雜的數學概念,直觀地來理解這個密鑰交換過程背後的原理,並且我們也不需要掌握太高超的數學知識就能理解這個過程。維基百科上有一張示意圖可以描述這個過程:
在上圖中,「common paint」包含一個非常大的素數p
(至少為2048位)以及小素數g
,這兩個數都可以公開,不會對我們的安全性造成影響。「secret colors」是每個成員自己生成的隨機數,不能對外公布。這種方法之所以能實現密鑰共享,原因在於如下等式成立:
(g^a % p)^b % p = (g^b % p)^a % p
其中,a
以及b
為「secret colors」,%
為取模操作(即除法運算後取餘數)。大家可能會問,為什麼這兩個表達式會相等?這是個數學問題,以我的水平可能很難解釋清楚。我想這就相當於在問為什麼2 + 2 = 4
,不同的是看起來沒有那麼直白,但事實的確如此。如果有人能簡單明了地解釋這個等式,歡迎發表高見,讓我能更好地理解。我還沒有專門學習過相關理論,希望有人能幫忙解釋一下。
如果這裡你還沒有完全理解,沒關係,後面我們會在代碼中再詳細解釋一下。現在重要的是我們可以通過這種方法獲得相同的數字:一旦完成這個任務,我們就可以繼續執行下一個任務。
第二步:AES加密
AES的全稱為Advanced Encryption Standard(高級加密標準),由於加密速度較快、安全性較高,AES現在已經成為廣泛使用的一種對稱加密演算法。除非使用該演算法的系統本身正在泄露數據,否則想攻破這種演算法,只能採用暴力破解法。我們會以Cipher Block Chaining)(加密塊鏈,CBC)模式來使用AES-256演算法,這種模式需要使用256位的密鑰(因此我們需要通過SHA-256演算法將共享大數的長度標準化為256位),加密序列中的每個加密塊都需要依賴前一個加密塊才能進行加解密。
在這種模式下,我們必須使用一個初始化向量(initialization vector,IV)來作為加密鏈中的初始塊,以便處理後面的每個數據塊。
在CBC模式下,每個明文塊會先使用前一個密文塊進行異或(XOR)處理,然後再加密。這樣一來,每個加密塊都需要依賴之前所有的加密塊,直到追溯到第一個加密塊為止。
這也是IV的作用所在,因為如果我們還沒有開始加密,我們就沒辦法找到起點之前的一個塊來異或處理。
(感謝@pry0cc提供如上解釋)
與加密密鑰不同的是,這個值可以對外公開。這種方法基本上可以讓針對密鑰的逆向分析無功而返,因為即便使用相同的密鑰對「hello」進行兩次加密,如果IV值不同,那麼我們會得到完全不同的加密結果。但我們必須小心謹慎,重複使用相同的IV會降低通信的安全性。大家可以參考WEP中的安全缺陷了解更多細節。
此外,我們還需要加入一些額外的字元(即「填充」數據),使待加密數據的大小為AES塊大小的整數倍。如果在消息尾部添加空格符,對端處理起來也比較方便,因此我們可以採用這種填充策略。這樣一來,第一個塊為IV,後面的塊為我們提供的消息(包含填充數據)。
基本概念就這麼多。現在我們可以在伺服器和客戶端之間來回發送加密數據。客戶端可以將加密的聊天消息提交給伺服器,伺服器會解密消息,使用其他客戶端的密鑰重新加密消息然後再廣播消息。如果網路中有人在竊聽,那麼他所收到的每個消息看起來完全不同,沒辦法破譯其中內容。
三、代碼實現
現在我們來看一下客戶端以及服務端的具體代碼。
整個工程所包含的文件如下:
客戶端以及服務端包含許多相同代碼,其中大部分為消息的加密以及解密代碼。然而,由於服務端負責整個流程中的大部分工作,因此我們先從服務端開始。
這裡我只會介紹一些重要內容,略過一些無關緊要的代碼(使用…來替代),大家可以訪問Github獲取完整代碼。
server.py
# Inside Server Class def __init__(self, host=127.0.0.1, port=DEFAULT_PORT): ... self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.dh_params = M2DH.gen_params(DH_SIZE, 2) ...
首先我們需要創建一個新的socket以便伺服器進行通信。
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
這裡socket.AF_INET
表明我們使用的是IPv4地址(如0x00sec.org
、74.125.136.94
),socket.SOCK_STREAM
表明使用的是TCP協議。
接下來,我們使用m2crypto
庫的DH
模塊生成DH密鑰交換演算法所需的參數。
self.dh_params = M2DH.gen_params(DH_SIZE, 2)
請記住,這裡我們需要創建兩個對外公開的值:素數p
以及素數q
。當我們指定所需素數的大小(以bit為單位,至少為2048位)以及所需的生成器後,這個函數就會返回包含p
以及g
的一個類。該函數使用OpennSSL來完成這個任務,OpenSSL是「專為TLS(傳輸層安全)協議以及SSL(安全套接層)協議設計的功能強大的商業級工具包」,許多程序都用到了這個庫,比如OpenVPN。
現在伺服器可以監聽新客戶端的連接請求,然後嘗試進行DH密鑰交換,監聽客戶端發來的加密數據。
while True: connection, address = self.socket.accept() client = Client(self, connection, address) ... self.clients.append(client) ... threading.Thread(target=self.listen, args=(client, )).start()
我們使用同一個套接字來接受(accept)下一個連入請求(該函數為阻塞型函數),將新的連接初始化為新的客戶端。Client類的初始化函數如下所示:
# Inside Client Class (this is the client object the server uses) def __init__(self, server, connection, address): ... self.key = self.dh(server.dh_params)
Client類接受3個參數:server(該客戶端所屬的伺服器)、connection(socket連接)以及address(地址信息,格式為(ip_address, port)
),然後調用dh()
函數來生成共享密鑰。
# Inside Client Class def dh(self, dh_params): """ Perform Diffie-Hellman Key Exchange with a client. :param dh_params: p and g generated by DH :return shared_key: shared encryption key for AES """ # p: shared prime p = DH.b2i(dh_params.p) # g: primitive root modulo g = DH.b2i(dh_params.g) # a: randomized private key a = DH.gen_private_key() # Generate public key from p, g, and a public_key = DH.gen_public_key(g, a, p) # Create a DH message to send to client as bytes dh_message = bytes(DH(p, g, public_key)) self.connection.sendall(dh_message) # Receive public key from client as bytes ... client_key = DH.b2i(response) # Calculate shared key with newly received client key shared_key = DH.get_shared_key(client_key, a, p) return shared_key
首先,該方法會使用DH.b2i
函數將位元組轉換為整數(b2i
函數位於dhke.py
中),生成參數p
以及g
,然後再生成一個隨機的私鑰a
(同樣是一個整數)。伺服器的公鑰採用這幾個參數生成,公式為g^a % p
,所使用的函數為DH.gen_public_key()
,該函數的定義如下:
def gen_public_key(g, private, p): # g^private % p return pow(g, private, p)
現在,伺服器可以使用p
、g
以及public_key
來構造一則公開消息,發送給客戶端。
注意:客戶端需要這3個參數才能生成自己的公鑰。
接下來這行代碼可以生成一個新的DH
對象,並將其轉換為位元組:
dh_message = bytes(DH(p, g, public_key))
我們可以查看DH
類的__bytes__
方法,了解如何將這3個變數編碼為位元組:
def __bytes__(self): """ Convert DH message to bytes. :return: packaged DH message as bytes +-------+-----------+------------+ | Prime | Generator | Public Key | | 1024 | 16 | 1024 | +-------+-----------+------------+ """ prm = self.package(self.p, LEN_PRIME) gen = self.package(self.g, LEN_GEN) pbk = self.package(self.pk, LEN_PK) return prm + gen + pbk
由於我們需要規定標準的消息格式,以便客戶端對消息進行解封裝處理,因此我們約定前1024個位元組為p
,後面的16個位元組為g
,最後的1024個位元組為public key
。package
方法可以將整數變數轉化為位元組並添加填充數據,直到長度滿足要求為止。
def package(i, length): """ Package an integer as a bytes object of length "length". :param i: integer to be package :param length: desired length of the bytes object :return: bytes representation of the integer """ # Convert i to hex and remove 0x from the left i_hex = hex(i)[2:] # Make the length of i_hex a multiple of 2 if len(i_hex) % 2 != 0: i_hex = 0 + i_hex # Convert hex string into bytes i_bytes = binascii.unhexlify(i_hex) # Check to make sure bytes to not exceed the max length len_i = len(i_bytes) if len_i > length: raise InvalidDH("Length Exceeds Maximum of {}".format(length)) # Generate padding for the remaining space on the left i_padding = bytes(length - len_i) return i_padding + i_bytes
消息封裝完畢後,我們將其發給客戶端,等待客戶端返回公鑰數據:
self.connection.sendall(dh_message)try: response = self.connection.recv(LEN_PK)except ConnectionError: print("Key Exchange with {} failed".format(self.address[0])) return Noneclient_key = DH.b2i(response)
然後我們可以將響應數據從位元組轉化為整數,再與我們自己的私鑰和公開素數p
一起提交給DH.get_shared_key()
方法:
shared_key = DH.get_shared_key(client_key, a, p)
get_shared_key()
函數可以計算(client_key ^ a) % p
公式,將結果轉化為十六進位字元串,然後提交給sha256
函數,以標準化結果的長度。
def get_shared_key(public, private, p): """ Calculate a shared key from a foreign public key, a local private key, and a shared prime. :param public: public key as an integer :param private: private key as an integer :param p: prime number :return: shared key as a 256-bit bytes object """ s = pow(public, private, p) s_hex = hex(s)[2:] # Make the length of s_hex a multiple of 2 if len(s_hex) % 2 != 0: s_hex = 0 + s_hex # Convert hex to bytes s_bytes = binascii.unhexlify(s_hex) # Hash and return the hex result return sha256(s_bytes).digest()
現在我們終於生成了一個共享密鑰!這個過程非常有趣,當然實際環境會比理論實驗更加複雜。接下來我們還需要做些什麼呢?
生成共享密鑰後,我們才剛完成伺服器上客戶端的初始化工作。
client = Client(self, connection, address)...# Add client to list of clients on serverself.clients.append(client)...# Listen for incoming messages from clientthreading.Thread(target=self.listen, args=(client, )).start()
服務端會啟用新的線程(self.listen
),接收客戶端發過來的加密消息,解壓這些消息,然後將消息(再次加密後)廣播給伺服器上其他所有客戶端。
四、總結
整個過程大概就是這樣,原理及代碼已經介紹清楚。後面我們還可以看一下伺服器如何處理消息、客戶端的工作流程等,感謝大家閱讀本文。
推薦閱讀:
※Is MAC enough——關於BB84量子密鑰分發協議(三)
※《Crick 4 *4 *4 遺傳密碼錶》的起草origin與修訂evolution (一)
※《Crick 4 *4 *4 遺傳密碼錶》的起草origin與修訂evolution (三)
※怎樣才能證明我的密碼是安全的?怎麼才能說明安全?
TAG:密碼學 |