Elasticsearch查詢解析

Elasticsearch查詢解析

來自專欄 Elasticsearch實驗室4 人贊了文章

1. 背景

Elasticsearch(ES)可用於全文檢索、日誌分析、指標分析、APM等眾多場景,而且搭建部署容易,後期彈性擴容、故障處理簡單。ES在一定程度上實現了一套系統支持多個場景的希望,大幅度降低使用多套專用系統的運維成本(當然ES不是萬能的,不能滿足事務等場景)。正是因為其通用性和易用性,ES自2010年發布首個版本以來得到爆髮式的發展,廣泛應用於各類互聯網公司的不同業務場景。

ES的查詢介面具有分散式的數據檢索、聚合分析能力,數據檢索能力用於支持全文檢索、日誌分析等場景,如Github平台上的代碼搜索、基於ES的各類日誌分析服務等;聚合分析能力用於支持指標分析、APM等場景,如監控場景、應用的日活/留存分析等。本文基於ES 5.6.4,主要分析ES的分散式執行框架及查詢主體流程,探究ES如何實現分散式查詢、數據檢索、聚合分析等能力。

2. 分散式查詢框架及類型

ES使用開源的Lucene作為存儲引擎,它賦予ES高性能的數據檢索能力,但Lucene僅僅是一個單機索引庫。ES基於Lucene進行分散式封裝,以支持集群管理、分散式查詢、聚合分析等功能。從使用的直觀感受看,ES按照下圖方式實現了分散式查詢:

  • 查詢可發送到任意節點,接收到某查詢的節點會作為該查詢的協調節點(Coordinating Node)。
  • 協調節點解析查詢,向對應數據分片分發查詢子任務。
  • 各數據分片檢索本地數據並返回協調節點,經匯聚處理後返回用戶。

而從實現角度看,協調節點的調度邏輯實際遠比上述流程複雜,不同查詢對應的協調節點的處理邏輯有一定差別。下面我們先簡單介紹ES中常見的3類查詢:

2.1 QUERY_THEN_FETCH

這是最常用的查詢類型,可以完成大多數的分散式查詢和聚合分析功能。在這類查詢中,協調節點實際需要向其他節點分發兩輪任務,也就說前面流程圖描述的任務分發階段(2&3)會有兩輪,具體如下:

  • Query Phase:進行分片粒度的數據檢索和聚合,注意此輪調度僅返迴文檔id集合,並不返回實際數據。
    • 協調節點:解析查詢後,向目標數據分片發送查詢命令。
    • 數據節點:在每個分片內,按照過濾、排序等條件進行分片粒度的文檔id檢索和數據聚合,返回結果。
  • Fetch Phase:生成最終的檢索、聚合結果。
    • 協調節點:歸併Query Phase的結果,得到最終的文檔id集合和聚合結果,並向目標數據分片發送數據抓取命令。
    • 數據節點:按需抓取實際需要的數據內容。

2.2 QUERY_AND_FETCH

對於查詢僅涉及單個分片的場景,ES會自動對查詢流程做優化,在數據節點進行Query Phase的最後,直接執行Fetch操作。此類查詢為QUERY_AND_FETCH。通過去除一輪任務調度優化查詢性能,優化過程由ES自動完成,用戶不感知。

2.3 DFS_QUERY_THEN_FETCH

這類查詢用於解決ES在多分片、少數據量的場景下計算相關度不準確的問題:以TF/IDF演算法為例,ES在計算相關度時僅考慮單個分片內的IDF,可能導致查詢結果中,類似的文檔因為在不同分片而相關度大為不同的問題。此時可以使用此類查詢,在QUERY_THEN_FETCH之前再增加一輪任務調度,用於計算分散式的IDF。但通常情況下,局部和全局IDF的差異會隨著索引里文檔數的增多漸漸消失,在真實世界的數據量下,這個問題幾乎沒有影響,沒有必要使用此類查詢增加一輪任務調度的開銷。

關於這類問題的具體描述,可以參考如下文檔:

  • 被破壞的相關度
  • How Shards Affect Relevance Scoring in Elasticsearch

3. 查詢執行流程

本節我們深入到代碼層面,以QUERY_THEN_FETCH類型查詢為例,捋著代碼主線,從實際執行角度分析ES的查詢流程。查詢流程的代碼邏輯可以整體劃分為兩個部分:

  • 查詢入口:ES接收到用戶請求後,根據請求分發框架,進入對應介面的處理邏輯。這部分處理對任何ES請求都是類似的。
  • 查詢調度:根據查詢請求條件,進行查詢的Query Phase、Fetch Phase等執行流程,返回查詢結果。

在分析具體的查詢處理邏輯之前,我們先介紹查詢入口部分,看看用戶請求在ES中是如何被分發的。

3.1 查詢入口

ES提供用戶Transport和Rest兩種介面:用戶可以通過ES官方提供的Transport Client訪問ES集群,這種介面使用的協議與ES集群內部節點間的通訊協議一致;也可以使用簡單易用的Rest介面,直接發送Http請求訪問ES集群,由ES完成Rest請求到Transport請求的轉換。考慮Rest介面的易用性,以及Rest層極低的額外開銷,建議用戶直接使用Rest介面。

以Rest介面為例,查詢入口部分的基本流程如下:

  • Rest分發

Rest分發由RestController模塊完成。在ES節點啟動時,會載入所有內置請求的Rest Action,並把對應請求的Http路徑和Rest Action作為二元組註冊到RestController中。這樣對於任意的Rest請求,RestController模塊只需根據Http路徑,即可輕鬆找到對應的Rest Action進行請求分發。RestSearchAction的註冊樣例如下:

public RestSearchAction(Settings settings, RestController controller) { super(settings); controller.registerHandler(GET, "/_search", this); controller.registerHandler(POST, "/_search", this); controller.registerHandler(GET, "/{index}/_search", this); controller.registerHandler(POST, "/{index}/_search", this); controller.registerHandler(GET, "/{index}/{type}/_search", this); controller.registerHandler(POST, "/{index}/{type}/_search", this);}

  • RestSearchAction【Rest層】

Rest層用於解析Http請求參數,轉化為ES內部使用的Transport請求,然後轉發給Transport層。其核心邏輯在prepareRequest(...)函數中:

@Overridepublic RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { SearchRequest searchRequest = new SearchRequest(); request.withContentOrSourceParamParserOrNull(parser -> parseSearchRequest(searchRequest, request, parser)); return channel -> client.search(searchRequest, new RestStatusToXContentListener<>(channel));}

  • Transport分發

Transport分發由NodeClient完成。在ES節點啟動進行ActionModule.setupActions(...)時,會把對應請求的Transport路徑和Transport Action作為二元組註冊到NodeClient中。NodeClient向外暴露的各種介面(如bulk/search),實際均通過Action對請求進行分發。

actions.register(BulkAction.INSTANCE.class, TransportShardBulkAction.class);actions.register(SearchAction.INSTANCE, TransportSearchAction.class);actions.register(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class);

  • TransportSearchAction【Transport層】

Transport層的doExecute(...)函數是請求處理的核心入口,實現了多數請求處理的主要邏輯。在查詢請求中,TransportSearchAction首先負責解析獲取查詢涉及的具體Index:

indices = indexNameExpressionResolver.concreteIndices(clusterState, searchRequest.indicesOptions(), timeProvider.getAbsoluteStartMillis(), localIndices.indices());

然後結合routing信息、perference信息獲取後續用於任務分發的分片信息:

GroupShardsIterator<ShardIterator> localShardsIterator = clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap, searchRequest.preference());

最後生成查詢請求的調度類並啟動調度執行:

searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup, aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators, timeProvider, clusterStateVersion, task);

上述即為查詢入口的處理流程,它對任何Rest請求都適用。實際上,除了自帶的Rest請求外,ES提供強大的擴展能力,用戶可以通過自定義插件實現自己的請求及處理邏輯。此外,ES還支持自定義過濾器Filter,在實際進行Transport層處理前進行統一的預處理工作。

介紹完查詢入口後,下面我們具體介紹查詢執行過程中的調度部分。

3.2 查詢調度

調用SearchQueryThenFetchAsyncAction.start(...)之後,查詢即進入了以協調節點為中心的查詢調度過程,即兩個核心階段Query Phase、Fetch Phase的執行,具體如下面時序圖所示。此外,查詢調度還包含兩個輕量級階段Expand Phase、Reponse Phase,後面我們按照實際執行順序,依次介紹他們。

3.2.1 Query Phase

  • 協調節點

SearchQueryThenFetchAsyncAction實際是Query Phase的入口,Phase名稱由其構造函數體現:

super("query", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener, shardsIts, timeProvider, clusterStateVersion, task, searchPhaseController.newSearchPhaseResults(request, shardsIts.size()), request.getMaxConcurrentShardRequests());

進入Query Phase後,會立即根據並發度參數進行Query任務的分發,具體由祖父類InitialSearchPhase的run(...)函數進行:

for (int index = 0; index < maxConcurrentShardRequests; index++) { final SearchShardIterator shardRoutings = shardsIts.get(index); assert shardRoutings.skip() == false; performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull());}

然後通過SearchTransportService的sendExecuteQuery(...)函數,向具體分片發送Query子任務進行非同步執行:

transportService.sendChildRequest(connection, QUERY_ACTION_NAME, request, task, new ActionListenerResponseHandler<>(listener, supplier));

在介紹每個分片的執行邏輯前,我們先提前了解分片執行結果的處理:每個分片在執行完畢Query子任務後,通過節點間通信,回調祖父類InitialSearchPhase的onShardSuccess(...)函數,把查詢結果記錄在協調節點保存的數組結構results中,並增加計數:

successfulOps.incrementAndGet();results.consumeResult(result);

當返回結果的分片數等於預期的總分片數時,協調節點會進入當前Phase的結束處理,啟動下一個階段Fetch Phase的執行。注意,這裡有個有意思的地方,ES中只需要一個分片執行成功,即會進行後續Phase處理得到部分結果,當然它會在結果中提示用戶實際有多少分片執行成功。

if (xTotalOps == expectedTotalOps) { onPhaseDone(); # 參考下面onPhaseDone代碼}……public final void onPhaseDone() { executeNextPhase(this, getNextPhase(results, this)); # 參考下面getNextPhase代碼}……protected SearchPhase getNextPhase(final SearchPhaseResults<SearchPhaseResult> results, final SearchPhaseContext context) { return new FetchSearchPhase(results, searchPhaseController, context);}

  • 數據節點

協調節點通過SearchTransportService的sendExecuteQuery(...)函數向目標數據節點發送QUERY_ACTION_NAME類型的查詢子任務,通過請求路徑QUERY_ACTION_NAME可以在SearchTransportService中找到對應的處理函數SearchService.executeQueryPhase(...):

transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH, new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() { @Override public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception { SearchPhaseResult result = searchService.executeQueryPhase(request, (SearchTask)task); channel.sendResponse(result); } });

數據節點會嘗試走canCache分支的Query Phase處理,這樣可以利用Cache優化查詢,否則走普通執行流程:

private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final SearchContext context) throws Exception { final boolean canCache = indicesService.canCache(request, context); context.getQueryShardContext().freezeContext(); if (canCache) { indicesService.loadIntoContext(request, context, queryPhase); } else { queryPhase.execute(context); }}

QueryPhase.execute(...)為數據節點進行Query Phase子任務的核心邏輯,它首先從searchContext中獲取查詢參數和查詢對象query,然後生產處理查詢結果的collector,最終調用Lucene的IndexSearcher.search(...)函數進行查詢,具體參考下面關鍵代碼。

這裡先簡單介紹下query、collector,幫助理解:

a. query :查詢對象用於指定查詢條件,比如"host:host001 AND timestamp>1514736000",在分片內進行數據檢索。

b. collector :用於消費檢索結果,進行Shard級別的limit N(Top N)、聚合計算等操作。它的實現也較為容易理解,如優先順序隊列、多層嵌套的hash分桶等。注意這裡僅獲取排序 或 聚合涉及的欄位,source、store等內容需要在Fetch Phase中獲取。

# 獲取參數和查詢對象queryResult.from(searchContext.from());queryResult.size(searchContext.size());Query query = searchContext.query();……# 生產處理查詢結果的collector# limit N對應的collectorcollector = TopScoreDocCollector.create(numDocs, after);……final List<Collector> subCollectors = new ArrayList<>();subCollectors.add(collector);# 聚合分析對應的collectorsubCollectors.addAll(searchContext.queryCollectors().values());collector = MultiCollector.wrap(subCollectors);……searcher.search(query, collector);

另外,如果查詢僅涉及一個分片,數據節點會在Query Phase結尾處,直接執行Fetch Phase,即QUERY_AND_FETCH類型查詢:

if (request.numberOfShards() == 1) { return executeFetchPhase(context, operationListener, afterQueryTime);}

3.2.2 Fetch Phase

  • 協調節點

Fetch Phase首先會歸併Query Phase得到的文檔id集合,並排序得到最終的limit N,同時歸併多個分片的聚合數據得到最終的聚合結果。這一步通過reduce操作完成:

final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = resultConsumer.reduce();

然後對需要抓取具體數據的文檔id按照分片粒度進行劃分,並向對應分片發送抓取請求:

final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, reducedQueryPhase.scoreDocs);……for (int i = 0; i < docIdsToLoad.length; i++) { …… executeFetch(i, searchShardTarget, counter, fetchSearchRequest, queryResult.queryResult(), connection); ……}

後續執行邏輯和Query Phase類似,每個分片在執行完畢Query子任務後,通過節點間通信,回調innerOnResponse(...)函數通知協調節點,結果會使用shard id作為下標放入數組結構fetchResults中:

successfulOps.incrementAndGet();results.consumeResult(result);

當最後一個分片執行完成後,協調節點會進入當前Phase結束處理:合併fetch階段的結果集,並啟動下一個階段執行。

final Runnable finishPhase = () -> moveToNextPhase(searchPhaseController, scrollId, reducedQueryPhase, queryAndFetchOptimization ? queryResults : fetchResults); # 參考下面moveToNextPhase代碼……private void moveToNextPhase(SearchPhaseController searchPhaseController, String scrollId, SearchPhaseController.ReducedQueryPhase reducedQueryPhase, AtomicArray<? extends SearchPhaseResult> fetchResultsArr) { final InternalSearchResponse internalResponse = searchPhaseController.merge(context.getRequest().scroll() != null, reducedQueryPhase, fetchResultsArr.asList(), fetchResultsArr::get); context.executeNextPhase(this, nextPhaseFactory.apply(internalResponse, scrollId));}

Fetch Phase的構造函數也向我們展示了後續需要執行的兩個簡單階段,後面我們會簡要介紹:

FetchSearchPhase(InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> resultConsumer, SearchPhaseController searchPhaseController, SearchPhaseContext context) { this(resultConsumer, searchPhaseController, context, (response, scrollId) -> new ExpandSearchPhase(context, response, // collapse only happens if the request has inner hits (finalResponse) -> sendResponsePhase(finalResponse, scrollId, context)));}

  • 數據節點

協調節點通過SearchTransportService的sendExecuteFetch(...)函數向目標數據節點發送Transport路徑為FETCH_ID_ACTION_NAME的查詢子任務,通過FETCH_ID_ACTION_NAME可以在SearchTransportService中找到對應的處理函數SearchService.executeFetchPhase(...):

transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ShardFetchSearchRequest::new, ThreadPool.Names.SEARCH, new TaskAwareTransportRequestHandler<ShardFetchSearchRequest>() { @Override public void messageReceived(ShardFetchSearchRequest request, TransportChannel channel, Task task) throws Exception { FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task); channel.sendResponse(result); } });

然後進入實際的Fetch處理邏輯FetchPhase.execute(...),在這裡fetchSubPhases是一系列簡單的抓取任務,會按照docid抓取對應文檔的source、store fields、highlight、docvalue fields等信息:

for (FetchSubPhase fetchSubPhase : fetchSubPhases) { fetchSubPhase.hitsExecute(context, hits);}

3.2.3 Expand Phase

在Fetch Phase協調節點處理的結束階段,我們看到下一個執行階段為Expand Phase,用於完成ES 5.3版本以後支持的Field Collapsing查詢。通過該類查詢可以輕鬆實現按Field值進行分類,每個分類獲取排名前N的文檔。如在餐廳的菜單系統中按菜系(川菜、湘菜等)分類,獲取每個菜系排名前3的美食。用戶也可以按Field進行Aggregation實現類似功能,但Field Collapsing會更易用、高效。

Field Collapsing屬於一類特殊的查詢場景,這裡不詳細介紹。

3.2.4 Response Phase

Expand Phase的下一執行階段為Response Phase,用於將查詢結果返回用戶:

private static SearchPhase sendResponsePhase(InternalSearchResponse response, String scrollId, SearchPhaseContext context) { return new SearchPhase("response") { @Override public void run() throws IOException { context.onResponse(context.buildSearchResponse(response, scrollId)); } };}

4. 小結

本文主要分析了ES的分散式執行框架及查詢主體流程,對ES其它他流程及Lucene相關內容未做詳細介紹,後續騰訊雲Elasticsearch團隊會通過具體文章詳細介紹,歡迎大家一起交流討論。


推薦閱讀:

TAG:搜索引擎 | 科技 | Elasticsearch |