Elasticsearch內核解析 - 寫入篇
目前的Elasticsearch有兩個明顯的身份,一個是分散式搜索系統,另一個是分散式NoSQL資料庫,對於這兩種不同的身份,讀寫語義基本類似,但也有一點差異。
寫操作
- 實時性:
- 搜索系統的Index一般都是NRT(Near Real Time),近實時的,比如Elasticsearch中,Index的實時性是由refresh控制的,默認是1s,最快可到100ms,那麼也就意味著Index doc成功後,需要等待一秒鐘後才可以被搜索到。
- NoSQL資料庫的Write基本都是RT(Real Time),實時的,寫入成功後,立即是可見的。Elasticsearch中的Index請求也能保證是實時的,因為Get請求會直接讀內存中尚未Flush到存儲介質的TransLog。
- 可靠性:
- 搜索系統對可靠性要求都不高,一般數據的可靠性通過將原始數據存儲在另一個存儲系統來保證,當搜索系統的數據發生丟失時,再從其他存儲系統導一份數據過來重新rebuild就可以了。在Elasticsearch中,通過設置TransLog的Flush頻率可以控制可靠性,要麼是按請求,每次請求都Flush;要麼是按時間,每隔一段時間Flush一次。一般為了性能考慮,會設置為每隔5秒或者1分鐘Flush一次,Flush間隔時間越長,可靠性就會越低。
- NoSQL資料庫作為一款資料庫,必須要有很高的可靠性,數據可靠性是生命底線,決不能有閃失。如果把Elasticsearch當做NoSQL資料庫,此時需要設置TransLog的Flush策略為每個請求都要Flush,這樣才能保證當前Shard寫入成功後,數據能盡量持久化下來。
上面簡單介紹了下NoSQL資料庫和搜索系統的一些異同,我們會在後面有一篇文章,專門用來介紹Elasticsearch作為NoSQL資料庫時的一些局限和特點。
讀操作
下一篇《Elasticsearch內核解析 - 查詢篇》中再詳細介紹。
上面大概對比了下搜索和NoSQL在寫方面的特點,接下來,我們看一下Elasticsearch 6.0.0版本中寫入流程都做了哪些事情,希望能對大家有用。
寫操作的關鍵點
在考慮或分析一個分散式系統的寫操作時,一般需要從下面幾個方面考慮:
- 可靠性:或者是持久性,數據寫入系統成功後,數據不會被回滾或丟失。
- 一致性:數據寫入成功後,再次查詢時必須能保證讀取到最新版本的數據,不能讀取到舊數據。
- 原子性:一個寫入或者更新操作,要麼完全成功,要麼完全失敗,不允許出現中間狀態。
- 隔離性:多個寫入操作相互不影響。
- 實時性:寫入後是否可以立即被查詢到。
- 性能:寫入性能,吞吐量到底怎麼樣。
Elasticsearch作為分散式系統,也需要在寫入的時候滿足上述的四個特點,我們在後面的寫流程介紹中會涉及到上述四個方面。
接下來,我們一層一層剖析Elasticsearch內部的寫機制。
Lucene的寫
眾所周知,Elasticsearch內部使用了Lucene完成索引創建和搜索功能,Lucene中寫操作主要是通過IndexWriter類實現,IndexWriter提供三個介面:
public long addDocument(); public long updateDocuments(); public long deleteDocuments();
通過這三個介面可以完成單個文檔的寫入,更新和刪除功能,包括了分詞,倒排創建,正排創建等等所有搜索相關的流程。只要Doc通過IndesWriter寫入後,後面就可以通過IndexSearcher搜索了,看起來功能已經完善了,但是仍然有一些問題沒有解:
- 上述操作是單機的,而不是我們需要的分散式。
- 文檔寫入Lucene後並不是立即可查詢的,需要生成完整的Segment後才可被搜索,如何保證實時性?
- Lucene生成的Segment是在內存中,如果機器宕機或掉電後,內存中的Segment會丟失,如何保證數據可靠性 ?
- Lucene不支持部分文檔更新,但是這又是一個強需求,如何支持部分更新?
上述問題,在Lucene中是沒有解決的,那麼就需要Elasticsearch中解決上述問題。
Elasticsearch在解決上述問題時,除了我們在上一篇《Elasticsearch數據模型簡介》中介紹的幾種系統欄位外,在引擎架構上也引入了多重機制來解決問題。我們再來看Elasticsearch中的寫機制。
Elasticsearch的寫
Elasticsearch採用多Shard方式,通過配置routing規則將數據分成多個數據子集,每個數據子集提供獨立的索引和搜索功能。當寫入文檔的時候,根據routing規則,將文檔發送給特定Shard中建立索引。這樣就能實現分散式了。
此外,Elasticsearch整體架構上採用了一主多副的方式:
每個Index由多個Shard組成,每個Shard有一個主節點和多個副本節點,副本個數可配。但每次寫入的時候,寫入請求會先根據_routing規則選擇發給哪個Shard,Index Request中可以設置使用哪個Filed的值作為路由參數,如果沒有設置,則使用Mapping中的配置,如果mapping中也沒有配置,則使用_id作為路由參數,然後通過_routing的Hash值選擇出Shard(在OperationRouting類中),最後從集群的Meta中找出出該Shard的Primary節點。
請求接著會發送給Primary Shard,在Primary Shard上執行成功後,再從Primary Shard上將請求同時發送給多個Replica Shard,請求在多個Replica Shard上執行成功並返回給Primary Shard後,寫入請求執行成功,返回結果給客戶端。
這種模式下,寫入操作的延時就等於latency = Latency(Primary Write) + Max(Replicas Write)。只要有副本在,寫入延時最小也是兩次單Shard的寫入時延總和,寫入效率會較低,但是這樣的好處也很明顯,避免寫入後,單機或磁碟故障導致數據丟失,在數據重要性和性能方面,一般都是優先選擇數據,除非一些允許丟數據的特殊場景。
採用多個副本後,避免了單機或磁碟故障發生時,對已經持久化後的數據造成損害,但是Elasticsearch里為了減少磁碟IO保證讀寫性能,一般是每隔一段時間(比如5分鐘)才會把Lucene的Segment寫入磁碟持久化,對於寫入內存,但還未Flush到磁碟的Lucene數據,如果發生機器宕機或者掉電,那麼內存中的數據也會丟失,這時候如何保證?
對於這種問題,Elasticsearch學習了資料庫中的處理方式:增加CommitLog模塊,Elasticsearch中叫TransLog。
在每一個Shard中,寫入流程分為兩部分,先寫入Lucene,再寫入TransLog。
寫入請求到達Shard後,先寫Lucene文件,創建好索引,此時索引還在內存裡面,接著去寫TransLog,寫完TransLog後,刷新TransLog數據到磁碟上,寫磁碟成功後,請求返回給用戶。這裡有幾個關鍵點,一是和資料庫不同,資料庫是先寫CommitLog,然後再寫內存,而Elasticsearch是先寫內存,最後才寫TransLog,一種可能的原因是Lucene的內存寫入會有很複雜的邏輯,很容易失敗,比如分詞,欄位長度超過限制等,比較重,為了避免TransLog中有大量無效記錄,減少recover的複雜度和提高速度,所以就把寫Lucene放在了最前面。二是寫Lucene內存後,並不是可被搜索的,需要通過Refresh把內存的對象轉成完整的Segment後,然後再次reopen後才能被搜索,一般這個時間設置為1秒鐘,導致寫入Elasticsearch的文檔,最快要1秒鐘才可被從搜索到,所以Elasticsearch在搜索方面是NRT(Near Real Time)近實時的系統。三是當Elasticsearch作為NoSQL資料庫時,查詢方式是GetById,這種查詢可以直接從TransLog中查詢,這時候就成了RT(Real Time)實時系統。四是每隔一段比較長的時間,比如30分鐘後,Lucene會把內存中生成的新Segment刷新到磁碟上,刷新後索引文件已經持久化了,歷史的TransLog就沒用了,會清空掉舊的TransLog。
上面介紹了Elasticsearch在寫入時的兩個關鍵模塊,Replica和TransLog,接下來,我們看一下Update流程:
Lucene中不支持部分欄位的Update,所以需要在Elasticsearch中實現該功能,具體流程如下:
- 收到Update請求後,從Segment或者TransLog中讀取同id的完整Doc,記錄版本號為V1。
- 將版本V1的全量Doc和請求中的部分欄位Doc合併為一個完整的Doc,同時更新內存中的VersionMap。獲取到完整Doc後,Update請求就變成了Index請求。
- 加鎖。
- 再次從versionMap中讀取該id的最大版本號V2,如果versionMap中沒有,則從Segment或者TransLog中讀取,這裡基本都會從versionMap中獲取到。
- 檢查版本是否衝突(V1==V2),如果衝突,則回退到開始的「Update doc」階段,重新執行。如果不衝突,則執行最新的Add請求。
- 在Index Doc階段,首先將Version + 1得到V3,再將Doc加入到Lucene中去,Lucene中會先刪同id下的已存在doc id,然後再增加新Doc。寫入Lucene成功後,將當前V3更新到versionMap中。
- 釋放鎖,部分更新的流程就結束了。
介紹完部分更新的流程後,大家應該從整體架構上對Elasticsearch的寫入有了一個初步的映象,接下來我們詳細剖析下寫入的詳細步驟。
Elasticsearch寫入請求類型
Elasticsearch中的寫入請求類型,主要包括下列幾個:Index(Create),Update,Delete和Bulk,其中前3個是單文檔操作,後一個Bulk是多文檔操作,其中Bulk中可以包括Index(Create),Update和Delete。
在6.0.0及其之後的版本中,前3個單文檔操作的實現基本都和Bulk操作一致,甚至有些就是通過調用Bulk的介面實現的。估計接下來幾個版本後,Index(Create),Update,Delete都會被當做Bulk的一種特例化操作被處理。這樣,代碼和邏輯都會更清晰一些。
下面,我們就以Bulk請求為例來介紹寫入流程。
Elasticsearch寫入流程圖
- 紅色:Client Node。
- 綠色:Primary Node。
- 藍色:Replica Node。
註冊Action
在Elasticsearch中,所有action的入口處理方法都是註冊在ActionModule.java中,比如Bulk Request有兩個註冊入口,分別是Rest和Transport入口:
如果請求是Rest請求,則會在RestBulkAction中Parse Request,構造出BulkRequest,然後發給後面的TransportAction處理。
TransportShardBulkAction的基類TransportReplicationAction中註冊了對Primary,Replica等的不同處理入口:
這裡對原始請求,Primary Node請求和Replica Node請求各自註冊了一個handler處理入口。
Client Node
Client Node 也包括了前面說過的Parse Request,這裡就不再贅述了,接下來看一下其他的部分。
1. Ingest Pipeline
在這一步可以對原始文檔做一些處理,比如HTML解析,自定義的處理,具體處理邏輯可以通過插件來實現。在Elasticsearch中,由於Ingest Pipeline會比較耗費CPU等資源,可以設置專門的Ingest Node,專門用來處理Ingest Pipeline邏輯。
如果當前Node不能執行Ingest Pipeline,則會將請求發給另一台可以執行Ingest Pipeline的Node。
2. Auto Create Index
判斷當前Index是否存在,如果不存在,則需要自動創建Index,這裡需要和Master交互。也可以通過配置關閉自動創建Index的功能。
3. Set Routing
設置路由條件,如果Request中指定了路由條件,則直接使用Request中的Routing,否則使用Mapping中配置的,如果Mapping中無配置,則使用默認的_id欄位值。
在這一步中,如果沒有指定id欄位,則會自動生成一個唯一的_id欄位,目前使用的是UUID。
4. Construct BulkShardRequest
由於Bulk Request中會包括多個(Index/Update/Delete)請求,這些請求根據routing可能會落在多個Shard上執行,這一步會按Shard挑揀Single Write Request,同一個Shard中的請求聚集在一起,構建BulkShardRequest,每個BulkShardRequest對應一個Shard。
5. Send Request To Primary
這一步會將每一個BulkShardRequest請求發送給相應Shard的Primary Node。
Primary Node
Primary 請求的入口是在PrimaryOperationTransportHandler的messageReceived,我們來看一下相關的邏輯流程。
1. Index or Update or Delete
循環執行每個Single Write Request,對於每個Request,根據操作類型(CREATE/INDEX/UPDATE/DELETE)選擇不同的處理邏輯。
其中,Create/Index是直接新增Doc,Delete是直接根據_id刪除Doc,Update會稍微複雜些,我們下面就以Update為例來介紹。
2. Translate Update To Index or Delete
這一步是Update操作的特有步驟,在這裡,會將Update請求轉換為Index或者Delete請求。首先,會通過GetRequest查詢到已經存在的同_id Doc(如果有)的完整欄位和值(依賴_source欄位),然後和請求中的Doc合併。同時,這裡會獲取到讀到的Doc版本號,記做V1。
3. Parse Doc
這裡會解析Doc中各個欄位。生成ParsedDocument對象,同時會生成uid Term。在Elasticsearch中,_uid = type # _id,對用戶,_Id可見,而Elasticsearch中存儲的是_uid。這一部分生成的ParsedDocument中也有Elasticsearch的系統欄位,大部分會根據當前內容填充,部分未知的會在後面繼續填充ParsedDocument。
4. Update Mapping
Elasticsearch中有個自動更新Mapping的功能,就在這一步生效。會先挑選出Mapping中未包含的新Field,然後判斷是否運行自動更新Mapping,如果允許,則更新Mapping。
5. Get Sequence Id and Version
由於當前是Primary Shard,則會從SequenceNumber Service獲取一個sequenceID和Version。SequenceID在Shard級別每次遞增1,SequenceID在寫入Doc成功後,會用來初始化LocalCheckpoint。Version則是根據當前Doc的最大Version遞增1。
6. Add Doc To Lucene
這一步開始的時候會給特定_uid加鎖,然後判斷該_uid對應的Version是否等於之前Translate Update To Index步驟里獲取到的Version,如果不相等,則說明剛才讀取Doc後,該Doc發生了變化,出現了版本衝突,這時候會拋出一個VersionConflict的異常,該異常會在Primary Node最開始處捕獲,重新從「Translate Update To Index or Delete」開始執行。
如果Version相等,則繼續執行,如果已經存在同id的Doc,則會調用Lucene的UpdateDocument(uid, doc)介面,先根據uid刪除Doc,然後再Index新Doc。如果是首次寫入,則直接調用Lucene的AddDocument介面完成Doc的Index,AddDocument也是通過UpdateDocument實現。
這一步中有個問題是,如何保證Delete-Then-Add的原子性,怎麼避免中間狀態時被Refresh?答案是在開始Delete之前,會加一個Refresh Lock,禁止被Refresh,只有等Add完後釋放了Refresh Lock後才能被Refresh,這樣就保證了Delete-Then-Add的原子性。
Lucene的UpdateDocument介面中就只是處理多個Field,會遍歷每個Field逐個處理,處理順序是invert index,store field,doc values,point dimension,後續會有文章專門介紹Lucene中的寫入。
7. Write Translog
寫完Lucene的Segment後,會以keyvalue的形式寫TransLog,Key是_id,Value是Doc內容。當查詢的時候,如果請求是GetDocByID,則可以直接根據_id從TransLog中讀取到,滿足NoSQL場景下的實時性要去。
需要注意的是,這裡只是寫入到內存的TransLog,是否Sync到磁碟的邏輯還在後面。
這一步的最後,會標記當前SequenceID已經成功執行,接著會更新當前Shard的LocalCheckPoint。
8. Renew Bulk Request
這裡會重新構造Bulk Request,原因是前面已經將UpdateRequest翻譯成了Index或Delete請求,則後續所有Replica中只需要執行Index或Delete請求就可以了,不需要再執行Update邏輯,一是保證Replica中邏輯更簡單,性能更好,二是保證同一個請求在Primary和Replica中的執行結果一樣。
9. Flush Translog
這裡會根據TransLog的策略,選擇不同的執行方式,要麼是立即Flush到磁碟,要麼是等到以後再Flush。Flush的頻率越高,可靠性越高,對寫入性能影響越大。
10. Send Requests To Replicas
這裡會將剛才構造的新的Bulk Request並行發送給多個Replica,然後等待Replica的返回,這裡需要等待所有Replica返回後(可能有成功,也有可能失敗),Primary Node才會返回用戶。如果某個Replica失敗了,則Primary會給Master發送一個Remove Shard請求,要求Master將該Replica Shard從可用節點中移除。
這裡,同時會將SequenceID,PrimaryTerm,GlobalCheckPoint等傳遞給Replica。
發送給Replica的請求中,Action Name等於原始ActionName + [R],這裡的R表示Replica。通過這個[R]的不同,可以找到處理Replica請求的Handler。
11. Receive Response From Replicas
Replica中請求都處理完後,會更新Primary Node的LocalCheckPoint。
Replica Node
Replica 請求的入口是在ReplicaOperationTransportHandler的messageReceived,我們來看一下相關的邏輯流程。
1. Index or Delete
根據請求類型是Index還是Delete,選擇不同的執行邏輯。這裡沒有Update,是因為在Primary Node中已經將Update轉換成了Index或Delete請求了。
2. Parse Doc
3. Update Mapping
以上都和Primary Node中邏輯一致。
4. Get Sequence Id and Version
Primary Node中會生成Sequence ID和Version,然後放入ReplicaRequest中,這裡只需要從Request中獲取到就行。
5. Add Doc To Lucene
由於已經在Primary Node中將部分Update請求轉換成了Index或Delete請求,這裡只需要處理Index和Delete兩種請求,不再需要處理Update請求了。比Primary Node會更簡單一些。
6. Write Translog
7. Flush Translog
以上都和Primary Node中邏輯一致。
最後
上面詳細介紹了Elasticsearch的寫入流程及其各個流程的工作機制,我們在這裡再次總結下之前提出的分散式系統中的六大特性:
- 可靠性:由於Lucene的設計中不考慮可靠性,在Elasticsearch中通過Replica和TransLog兩套機制保證數據的可靠性。
- 一致性:Lucene中的Flush鎖只保證Update介面裡面Delete和Add中間不會Flush,但是Add完成後仍然有可能立即發生Flush,導致Segment可讀。這樣就沒法保證Primary和所有其他Replica可以同一時間Flush,就會出現查詢不穩定的情況,這裡只能實現最終一致性。
- 原子性:Add和Delete都是直接調用Lucene的介面,是原子的。當部分更新時,使用Version和鎖保證更新是原子的。
- 隔離性:仍然採用Version和局部鎖來保證更新的是特定版本的數據。
- 實時性:使用定期Refresh Segment到內存,並且Reopen Segment方式保證搜索可以在較短時間(比如1秒)內被搜索到。通過將未刷新到磁碟數據記入TransLog,保證對未提交數據可以通過ID實時訪問到。
- 性能:性能是一個系統性工程,所有環節都要考慮對性能的影響,在Elasticsearch中,在很多地方的設計都考慮到了性能,一是不需要所有Replica都返回後才能返回給用戶,只需要返回特定數目的就行;二是生成的Segment現在內存中提供服務,等一段時間後才刷新到磁碟,Segment在內存這段時間的可靠性由TransLog保證;三是TransLog可以配置為周期性的Flush,但這個會給可靠性帶來傷害;四是每個線程持有一個Segment,多線程時相互不影響,相互獨立,性能更好;五是系統的寫入流程對版本依賴較重,讀取頻率較高,因此採用了versionMap,減少熱點數據的多次磁碟IO開銷。Lucene中針對性能做了大量的優化。後面我們也會有文章專門介紹Lucene中的優化思路。
到此,Elasticsearch的寫入流程介紹完了,下一篇《Elasticsearch讀取流程》中再見。
最後,我們在招人,JAVA / Elasticsearch / Lucene研發,有興趣的可以私信聯繫我。
推薦閱讀:
※教你分析知乎用戶系列之陸
※SOU.com 真的會成為 360 搜索的域名嗎?
※下載素材?有這仨網站就夠了。
※互聯網搜索入門
※如何評價小米新版全局搜索?
TAG:Elasticsearch | 搜索 | 分散式存儲 |