Flink CEP實現源碼解析
來自專欄 Flink源碼閱讀分享
引言
本文主要分享flink cep原理的深入理解,很多人可能還不知道flink cep是什麼,flink cep其實實現自一篇論文,具體論文細節見我之前的一篇文章的分享[flink-cep-paper]. flink cep的全稱是Complex Event Processing,在我看來它主要能做的是在一個連續不斷的事件中提取出用戶所關心的事件序列,他和flink的filter運算元的區別在於filter只能去實現單個元素的過濾,而cep是能完成先後順序事件的過濾。下面讓我們來走進他的源碼實現原理吧。以下代碼基於社區1.4.2分支分析。
我們的文章以一系列問題來展開:
- 用戶定義的Pattern最後會以什麼形式工作
- 當CEP Operator獲取到上游一個運算元的時候會做什麼事情?
- 在ProcessingTime和Eventtime的語義下處理邏輯有什麼不同點?
- 匹配成功的元素如何存儲,狀態機轉化流程是怎麼樣的?
- 超時未匹配成功的元素會做什麼?
問題一
用戶在定義Pattern和condition之後,會通過NFAcompiler將Pattern翻譯成一個一個相關聯的State,表明了這一組規則的狀態機的走向流程。
State包括 ,start表示一個起始狀態例如
begin(A).followedBy(B)
這裡面A就是一個start狀態。Final狀態表示整個序列已經匹配完成可以向下游發送了,Stop狀態是用來處理否定類型的規則,一旦到達Stop狀態即意味著整個匹配過程失敗。各個狀態之間通過StateTransition來連接,連接方式有:
ignore: 忽略此次匹配的元素proceed: 相當於forward的意思,到達下一個狀態,不存儲元素,繼續做下一個狀態的condition判斷take: 存儲本次的元素
這是一段創建中間狀態的代碼
private State<T> createMiddleStates(final State<T> sinkState) { State<T> lastSink = sinkState; // 不斷往上遍歷pattern進行state的生成 while (currentPattern.getPrevious() != null) { if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW) { //skip notFollow patterns, they are converted into edge conditions } else if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_NEXT) { final State<T> notNext = createState(currentPattern.getName(), State.StateType.Normal); final IterativeCondition<T> notCondition = getTakeCondition(currentPattern); // 否定類型的pattern需要創建一個stop state final State<T> stopState = createStopState(notCondition, currentPattern.getName()); if (lastSink.isFinal()) { //so that the proceed to final is not fired 結尾狀態不用proceed過去做下一次計算了,可以直接ignore到Final,然後輸出結果 notNext.addIgnore(lastSink, new NotCondition<>(notCondition)); } else { notNext.addProceed(lastSink, new NotCondition<>(notCondition)); } // 在滿足Not_NEXT的條件的時候就轉化成stop狀態即匹配失敗 notNext.addProceed(stopState, notCondition); lastSink = notNext; } else { // 非否定類型的狀態的處理邏輯都在這個方法中 lastSink = convertPattern(lastSink); } // we traverse the pattern graph backwards followingPattern = currentPattern; currentPattern = currentPattern.getPrevious(); final Time currentWindowTime = currentPattern.getWindowTime(); if (currentWindowTime != null && currentWindowTime.toMilliseconds() < windowTime) { // the window time is the global minimum of all window times of each state windowTime = currentWindowTime.toMilliseconds(); } } return lastSink; }
生成這樣的state列表之後,最終會創建一個NFA,一個NFA中包含了兩個重要組件:
一個是SharedBuffer用於存儲中間匹配命中的數據,這是一個基於論文實現的帶版本的內存共享,主要解決的事情是在同一個元素觸發多個分支的時候避免存儲多次。
另一個是ComputationState隊列表示的是一系列當前匹配到的計算狀態,每一個狀態在拿到下一個元素的時候都會根據condition判斷自己是能夠繼續往下匹配生成下一個computation state還是匹配失敗。
問題二、三
問題二和問題三一起解釋,在消費到上游一個元素之後會判斷時間語義,這裡主要是為了處理亂序問題,如果是processingtime的話就會直接經由nfa#process進行處理,因為processing time不需要考慮事件是否亂序,他給每個事件都打上了當前的時間戳。而event語義下,會先將該數據buffer到rocksdb中,並且註冊一個比當前時間戳大1的eventimer,用以觸發真正的計算,也就是說,eventtime其實是每毫秒獲取過去存儲的數據做一次匹配計算。
protected void saveRegisterWatermarkTimer() { long currentWatermark = timerService.currentWatermark(); // protect against overflow if (currentWatermark + 1 > currentWatermark) { timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, currentWatermark + 1); }}
問題四、五
nfa#process做了什麼?取出前面說到的nfa中所有的當前computationState去做計算,當然計算之前會先判斷時間和computation的starttime比較匹配是否超出時間,即within運算元所做的時間,如果設置了超時處理的方式,就會將超時未匹配完成,已匹配到的部分元素向下游發送,並做sharebuffer的清理工作
if (!computationState.isStartState() && windowTime > 0L && timestamp - computationState.getStartTimestamp() >= windowTime) { if (handleTimeout) { // extract the timed out event pattern Map<String, List<T>> timedOutPattern = extractCurrentMatches(computationState); timeoutResult.add(Tuple2.of(timedOutPattern, timestamp)); } eventSharedBuffer.release( NFAStateNameHandler.getOriginalNameFromInternal(computationState.getPreviousState().getName()), computationState.getEvent(), computationState.getTimestamp(), computationState.getCounter()); newComputationStates = Collections.emptyList(); nfaChanged = true; } else if (event != null) { // 在computeNextState的時候判斷成功的take條件會將元素put到eventSharedBuffer中 newComputationStates = computeNextStates(computationState, event, timestamp); if (newComputationStates.size() != 1) { nfaChanged = true; } else if (!newComputationStates.iterator().next().equals(computationState)) { nfaChanged = true; } } else { newComputationStates = Collections.singleton(computationState);}
在完成匹配之後達到final狀態將數據提取出來向下游發送完成匹配。
以上便是cep的大致原理,說白了其實這個就是基於flink runntime開發出來的一個衍生lib,flink runtime其實是一個分散式的阻塞隊列,通過這個概念可以在上面開發出很多有意思的產品,cep就是其中一個。 分析結束,歡迎拍磚~
推薦閱讀: