標籤:

如何一步步構建加密聊天應用

譯文聲明

本文是翻譯文章,文章原作者spec,文章來源:0x00sec.org

原文地址:0x00sec.org/t/encrypted

一、前言

前一陣子我正在編寫一個加密聊天客戶端以及伺服器,想通過開發過程了解關於加密方法的更多知識,也了解如何在網路協議中實現加密協議(如基於TLS的SSH或者HTTPs協議)。現在這個工作已基本完成,我想與大家一起分享下我學到的一些經驗及知識。

本文可以分為兩部分,第一部分介紹了一些相關的基本概念,第二部分會深入分析代碼,將這些概念與具體程序結合起來(@oaktree也建議我採用這種組織結構)。

話不多說,開始步入正題。

二、基本概念

我們的目標是創建一個共享密鑰,以便加密及解密客戶端和伺服器交互的消息。

為了創建聊天室,客戶端必須能夠以某種安全的方式將消息發送給伺服器,反之亦然。對稱加密(symmetric encryption)機制採用了相同的密鑰來加密及解密消息,我們可以考慮採用這種方式來實現。然而如果採用這種方式,那麼通信雙方必須就通過某種方式來獲取相同的密鑰。

請注意:加密密鑰實際上是非常大的數字

那麼我們怎麼樣才能讓兩台計算機使用相同的大數呢?我們不能直接發送這個數字,這樣做會在第一步就破壞掉整個加密體系,因為這樣竊聽者就可以竊取密鑰,解開加密信息。相反,我們會使用Diffie-Hellman密鑰交換演算法來完成這個任務。

第一步:DH密鑰交換演算法

這種方法是創建共享密鑰的絕佳方法

Diffie-Hellman(簡稱為DH)演算法可以讓兩個實體在若干次交互後獲得相同的數字,並且該過程沒有直接公開這個數字。許多DH實現方法會在有限循環中讓整數對p取模(p是一個素數),然而我們也可以選擇使用其他方式(比如橢圓曲線演算法,大家在瀏覽加密網頁時也用到了這種技術)。

我們可以拋開複雜的數學概念,直觀地來理解這個密鑰交換過程背後的原理,並且我們也不需要掌握太高超的數學知識就能理解這個過程。維基百科上有一張示意圖可以描述這個過程:

在上圖中,「common paint」包含一個非常大的素數p(至少為2048位)以及小素數g,這兩個數都可以公開,不會對我們的安全性造成影響。「secret colors」是每個成員自己生成的隨機數,不能對外公布。這種方法之所以能實現密鑰共享,原因在於如下等式成立:

(g^a % p)^b % p = (g^b % p)^a % p

其中,a以及b為「secret colors」,%為取模操作(即除法運算後取餘數)。大家可能會問,為什麼這兩個表達式會相等?這是個數學問題,以我的水平可能很難解釋清楚。我想這就相當於在問為什麼2 + 2 = 4,不同的是看起來沒有那麼直白,但事實的確如此。如果有人能簡單明了地解釋這個等式,歡迎發表高見,讓我能更好地理解。我還沒有專門學習過相關理論,希望有人能幫忙解釋一下。

如果這裡你還沒有完全理解,沒關係,後面我們會在代碼中再詳細解釋一下。現在重要的是我們可以通過這種方法獲得相同的數字:一旦完成這個任務,我們就可以繼續執行下一個任務。

第二步:AES加密

AES的全稱為Advanced Encryption Standard(高級加密標準),由於加密速度較快、安全性較高,AES現在已經成為廣泛使用的一種對稱加密演算法。除非使用該演算法的系統本身正在泄露數據,否則想攻破這種演算法,只能採用暴力破解法。我們會以Cipher Block Chaining)(加密塊鏈,CBC)模式來使用AES-256演算法,這種模式需要使用256位的密鑰(因此我們需要通過SHA-256演算法將共享大數的長度標準化為256位),加密序列中的每個加密塊都需要依賴前一個加密塊才能進行加解密。

在這種模式下,我們必須使用一個初始化向量(initialization vector,IV)來作為加密鏈中的初始塊,以便處理後面的每個數據塊。

在CBC模式下,每個明文塊會先使用前一個密文塊進行異或(XOR)處理,然後再加密。這樣一來,每個加密塊都需要依賴之前所有的加密塊,直到追溯到第一個加密塊為止。

這也是IV的作用所在,因為如果我們還沒有開始加密,我們就沒辦法找到起點之前的一個塊來異或處理。

(感謝@pry0cc提供如上解釋)

與加密密鑰不同的是,這個值可以對外公開。這種方法基本上可以讓針對密鑰的逆向分析無功而返,因為即便使用相同的密鑰對「hello」進行兩次加密,如果IV值不同,那麼我們會得到完全不同的加密結果。但我們必須小心謹慎,重複使用相同的IV會降低通信的安全性。大家可以參考WEP中的安全缺陷了解更多細節。

此外,我們還需要加入一些額外的字元(即「填充」數據),使待加密數據的大小為AES塊大小的整數倍。如果在消息尾部添加空格符,對端處理起來也比較方便,因此我們可以採用這種填充策略。這樣一來,第一個塊為IV,後面的塊為我們提供的消息(包含填充數據)。

基本概念就這麼多。現在我們可以在伺服器和客戶端之間來回發送加密數據。客戶端可以將加密的聊天消息提交給伺服器,伺服器會解密消息,使用其他客戶端的密鑰重新加密消息然後再廣播消息。如果網路中有人在竊聽,那麼他所收到的每個消息看起來完全不同,沒辦法破譯其中內容。

三、代碼實現

現在我們來看一下客戶端以及服務端的具體代碼。

整個工程所包含的文件如下:

客戶端以及服務端包含許多相同代碼,其中大部分為消息的加密以及解密代碼。然而,由於服務端負責整個流程中的大部分工作,因此我們先從服務端開始。

這裡我只會介紹一些重要內容,略過一些無關緊要的代碼(使用…來替代),大家可以訪問Github獲取完整代碼。

server.py

# Inside Server Class def __init__(self, host=127.0.0.1, port=DEFAULT_PORT): ... self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.dh_params = M2DH.gen_params(DH_SIZE, 2) ...

首先我們需要創建一個新的socket以便伺服器進行通信。

self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

這裡socket.AF_INET表明我們使用的是IPv4地址(如0x00sec.org74.125.136.94),socket.SOCK_STREAM表明使用的是TCP協議。

接下來,我們使用m2crypto庫的DH模塊生成DH密鑰交換演算法所需的參數。

self.dh_params = M2DH.gen_params(DH_SIZE, 2)

請記住,這裡我們需要創建兩個對外公開的值:素數p以及素數q。當我們指定所需素數的大小(以bit為單位,至少為2048位)以及所需的生成器後,這個函數就會返回包含p以及g的一個類。該函數使用OpennSSL來完成這個任務,OpenSSL是「專為TLS(傳輸層安全)協議以及SSL(安全套接層)協議設計的功能強大的商業級工具包」,許多程序都用到了這個庫,比如OpenVPN。

現在伺服器可以監聽新客戶端的連接請求,然後嘗試進行DH密鑰交換,監聽客戶端發來的加密數據。

while True: connection, address = self.socket.accept() client = Client(self, connection, address) ... self.clients.append(client) ... threading.Thread(target=self.listen, args=(client, )).start()

我們使用同一個套接字來接受(accept)下一個連入請求(該函數為阻塞型函數),將新的連接初始化為新的客戶端。Client類的初始化函數如下所示:

# Inside Client Class (this is the client object the server uses) def __init__(self, server, connection, address): ... self.key = self.dh(server.dh_params)

Client類接受3個參數:server(該客戶端所屬的伺服器)、connection(socket連接)以及address(地址信息,格式為(ip_address, port)),然後調用dh()函數來生成共享密鑰。

# Inside Client Class def dh(self, dh_params): """ Perform Diffie-Hellman Key Exchange with a client. :param dh_params: p and g generated by DH :return shared_key: shared encryption key for AES """ # p: shared prime p = DH.b2i(dh_params.p) # g: primitive root modulo g = DH.b2i(dh_params.g) # a: randomized private key a = DH.gen_private_key() # Generate public key from p, g, and a public_key = DH.gen_public_key(g, a, p) # Create a DH message to send to client as bytes dh_message = bytes(DH(p, g, public_key)) self.connection.sendall(dh_message) # Receive public key from client as bytes ... client_key = DH.b2i(response) # Calculate shared key with newly received client key shared_key = DH.get_shared_key(client_key, a, p) return shared_key

首先,該方法會使用DH.b2i函數將位元組轉換為整數(b2i函數位於dhke.py中),生成參數p以及g,然後再生成一個隨機的私鑰a(同樣是一個整數)。伺服器的公鑰採用這幾個參數生成,公式為g^a % p,所使用的函數為DH.gen_public_key(),該函數的定義如下:

def gen_public_key(g, private, p): # g^private % p return pow(g, private, p)

現在,伺服器可以使用pg以及public_key來構造一則公開消息,發送給客戶端。

注意:客戶端需要這3個參數才能生成自己的公鑰

接下來這行代碼可以生成一個新的DH對象,並將其轉換為位元組:

dh_message = bytes(DH(p, g, public_key))

我們可以查看DH類的__bytes__ 方法,了解如何將這3個變數編碼為位元組:

def __bytes__(self): """ Convert DH message to bytes. :return: packaged DH message as bytes +-------+-----------+------------+ | Prime | Generator | Public Key | | 1024 | 16 | 1024 | +-------+-----------+------------+ """ prm = self.package(self.p, LEN_PRIME) gen = self.package(self.g, LEN_GEN) pbk = self.package(self.pk, LEN_PK) return prm + gen + pbk

由於我們需要規定標準的消息格式,以便客戶端對消息進行解封裝處理,因此我們約定前1024個位元組為p,後面的16個位元組為g,最後的1024個位元組為public keypackage方法可以將整數變數轉化為位元組並添加填充數據,直到長度滿足要求為止。

def package(i, length): """ Package an integer as a bytes object of length "length". :param i: integer to be package :param length: desired length of the bytes object :return: bytes representation of the integer """ # Convert i to hex and remove 0x from the left i_hex = hex(i)[2:] # Make the length of i_hex a multiple of 2 if len(i_hex) % 2 != 0: i_hex = 0 + i_hex # Convert hex string into bytes i_bytes = binascii.unhexlify(i_hex) # Check to make sure bytes to not exceed the max length len_i = len(i_bytes) if len_i > length: raise InvalidDH("Length Exceeds Maximum of {}".format(length)) # Generate padding for the remaining space on the left i_padding = bytes(length - len_i) return i_padding + i_bytes

消息封裝完畢後,我們將其發給客戶端,等待客戶端返回公鑰數據:

self.connection.sendall(dh_message)try: response = self.connection.recv(LEN_PK)except ConnectionError: print("Key Exchange with {} failed".format(self.address[0])) return Noneclient_key = DH.b2i(response)

然後我們可以將響應數據從位元組轉化為整數,再與我們自己的私鑰和公開素數p一起提交給DH.get_shared_key()方法:

shared_key = DH.get_shared_key(client_key, a, p)

get_shared_key()函數可以計算(client_key ^ a) % p公式,將結果轉化為十六進位字元串,然後提交給sha256函數,以標準化結果的長度。

def get_shared_key(public, private, p): """ Calculate a shared key from a foreign public key, a local private key, and a shared prime. :param public: public key as an integer :param private: private key as an integer :param p: prime number :return: shared key as a 256-bit bytes object """ s = pow(public, private, p) s_hex = hex(s)[2:] # Make the length of s_hex a multiple of 2 if len(s_hex) % 2 != 0: s_hex = 0 + s_hex # Convert hex to bytes s_bytes = binascii.unhexlify(s_hex) # Hash and return the hex result return sha256(s_bytes).digest()

現在我們終於生成了一個共享密鑰!這個過程非常有趣,當然實際環境會比理論實驗更加複雜。接下來我們還需要做些什麼呢?

生成共享密鑰後,我們才剛完成伺服器上客戶端的初始化工作。

client = Client(self, connection, address)...# Add client to list of clients on serverself.clients.append(client)...# Listen for incoming messages from clientthreading.Thread(target=self.listen, args=(client, )).start()

服務端會啟用新的線程(self.listen),接收客戶端發過來的加密消息,解壓這些消息,然後將消息(再次加密後)廣播給伺服器上其他所有客戶端。

四、總結

整個過程大概就是這樣,原理及代碼已經介紹清楚。後面我們還可以看一下伺服器如何處理消息、客戶端的工作流程等,感謝大家閱讀本文。


推薦閱讀:

Is MAC enough——關於BB84量子密鑰分發協議(三)
《Crick 4 *4 *4 遺傳密碼錶》的起草origin與修訂evolution (一)
《Crick 4 *4 *4 遺傳密碼錶》的起草origin與修訂evolution (三)
怎樣才能證明我的密碼是安全的?怎麼才能說明安全?

TAG:密碼學 |