kylin 同步原理及加入重試邏輯

背景

最近倆個月,團隊整個數據基礎架構慢慢轉移到kylin上面來。而kylin也不負眾望,對於一些複雜的聚合查詢響應速度遠超於hive。隨著數據量的上來,kylin的單體部署逐漸無法支撐大量的並行讀寫任務。於是,自然而然的考慮到kylin的讀寫分離。一寫多讀,正好也符合kylin官方文檔上的cluster架構。然而在實際的使用中也出現了一些問題:

  1. 主節點更新了schema而從節點未sync
  2. 從節點中部分sync成功,而不是全部

而很明顯的是kylin中所有的數據,包括所有元數據都是落地在HBase中的,那唯一導致節點間數據不一致的可能就只有各個節點都有本地緩存的情況了。為了理解原理方便debug,我對kylin master-slave的同步原理做了一些源代碼層面的剖析。

原理剖析

正確配置方式:

//kylin.properties下面的配置,根據源碼,配置的格式為:user:pwd@host:portkylin.server.cluster-servers=user:password@host:port,user:password@host:port,user:password@host:port

主從同步流程解析:

源碼解析:

1. 先來看看整個流程中比較關鍵的部分BroadCaster類的實現

//Broadcaster的構造函數 private Broadcaster(final KylinConfig config) { this.config = config; //獲取kylin.properties中"kylin.server.cluster-servers"配置的值 //也就是集群中所有節點的配置了 final String[] nodes = config.getRestServers(); if (nodes == null || nodes.length < 1) { logger.warn("There is no available rest server; check the kylin.server.cluster-servers config"); broadcastEvents = null; // disable the broadcaster return; } logger.debug(nodes.length + " nodes in the cluster: " + Arrays.toString(nodes)); //開一個單線程,不間斷的循環從broadcastEvents隊列裡面獲取註冊的事件。 Executors.newSingleThreadExecutor(new DaemonThreadFactory()).execute(new Runnable() { @Override public void run() { final List<RestClient> restClients = Lists.newArrayList(); for (String node : config.getRestServers()) { //根據配置的節點信息註冊RestClient restClients.add(new RestClient(node)); } final ExecutorService wipingCachePool = Executors.newFixedThreadPool(restClients.size(), new DaemonThreadFactory()); while (true) { try { final BroadcastEvent broadcastEvent = broadcastEvents.takeFirst(); logger.info("Announcing new broadcast event: " + broadcastEvent); for (final RestClient restClient : restClients) { wipingCachePool.execute(new Runnable() { @Override public void run() { try { restClient.wipeCache(broadcastEvent.getEntity(), broadcastEvent.getEvent(), broadcastEvent.getCacheKey()); } catch (IOException e) { logger.warn("Thread failed during wipe cache at " + broadcastEvent, e); } } }); } } catch (Exception e) { logger.error("error running wiping", e); } } } }); }

過Broadcaster的構造函數其實就能清楚整個同步過程的大概邏輯了。無非就是啟動一個線程去輪詢阻塞隊列裡面的元素(簡單的生產消費的模式),有的話就消費下來廣播到其他從節點從而達到清理緩存的目的。

2. 再來看看廣播的實際邏輯實現,基本封裝在RestClient中

//此處是根據配置的節點信息正則匹配:"user:pwd@host:port" public RestClient(String uri) { Matcher m = fullRestPattern.matcher(uri); if (!m.matches()) throw new IllegalArgumentException("URI: " + uri + " -- does not match pattern " + fullRestPattern); String user = m.group(1); String pwd = m.group(2); String host = m.group(3); String portStr = m.group(4); int port = Integer.parseInt(portStr == null ? "7070" : portStr); init(host, port, user, pwd); }

根據配置的節點信息實例化RestClient,然後在init方法中,拼接wipe cache的url

private void init(String host, int port, String userName, String password) { this.host = host; this.port = port; this.userName = userName; this.password = password; //拼接rest介面 this.baseUrl = "http://" + host + ":" + port + "/kylin/api"; client = new DefaultHttpClient(); if (userName != null && password != null) { CredentialsProvider provider = new BasicCredentialsProvider(); UsernamePasswordCredentials credentials = new UsernamePasswordCredentials(userName, password); provider.setCredentials(AuthScope.ANY, credentials); client.setCredentialsProvider(provider); } }

發現kylin所有的交互介面基本上底層都是調用的自己的rest介面,它自己所謂的jdbc的查詢方式其實也只是在rest介面上封裝了一層,底層還是http請求。可謂是掛羊頭賣狗肉了。看看RestClient中怎麼去通知其他節點wipe cache的

public void wipeCache(String entity, String event, String cacheKey) throws IOException { String url = baseUrl + "/cache/" + entity + "/" + cacheKey + "/" + event; HttpPut request = new HttpPut(url); try { HttpResponse response = client.execute(request); String msg = EntityUtils.toString(response.getEntity()); if (response.getStatusLine().getStatusCode() != 200) throw new IOException("Invalid response " + response.getStatusLine().getStatusCode() + " with cache wipe url " + url + "
" + msg); } catch (Exception ex) { throw new IOException(ex); } finally { request.releaseConnection(); } }

已經很明了了,就是調的rest介面:/kylin/api/cache/{entity}/{cacaheKey}/{event}

當slave節點接收到wipeCache的指令時的處理邏輯如下:

public void notifyMetadataChange(String entity, Event event, String cacheKey) throws IOException { Broadcaster broadcaster = Broadcaster.getInstance(getConfig()); //這裡會判斷當前節點是否註冊為listener了,如果註冊了,此邏輯會被ignored broadcaster.registerListener(cacheSyncListener, "cube"); broadcaster.notifyListener(entity, event, cacheKey); } //註冊listener的邏輯 public void registerListener(Listener listener, String... entities) { synchronized (CACHE) { // ignore re-registration List<Listener> all = listenerMap.get(SYNC_ALL); if (all != null && all.contains(listener)) { return; } for (String entity : entities) { if (!StringUtils.isBlank(entity)) addListener(entity, listener); } //註冊幾種事件類型 addListener(SYNC_ALL, listener); addListener(SYNC_PRJ_SCHEMA, listener); addListener(SYNC_PRJ_DATA, listener); } }

notifyListener主要就是對所有事件處理邏輯的劃分,根據事件類型選擇處理邏輯,一般sheme的更新走的是默認邏輯

public void notifyListener(String entity, Event event, String cacheKey) throws IOException { synchronized (CACHE) { List<Listener> list = listenerMap.get(entity); if (list == null) return; logger.debug("Broadcasting metadata change: entity=" + entity + ", event=" + event + ", cacheKey=" + cacheKey + ", listeners=" + list); // prevents concurrent modification exception list = Lists.newArrayList(list); switch (entity) { case SYNC_ALL: for (Listener l : list) { l.onClearAll(this); } clearCache(); // clear broadcaster too in the end break; case SYNC_PRJ_SCHEMA: ProjectManager.getInstance(config).clearL2Cache(); for (Listener l : list) { l.onProjectSchemaChange(this, cacheKey); } break; case SYNC_PRJ_DATA: ProjectManager.getInstance(config).clearL2Cache(); // cubes first becoming ready leads to schema change too for (Listener l : list) { l.onProjectDataChange(this, cacheKey); } break; //大部分的走向 default: for (Listener l : list) { l.onEntityChange(this, entity, event, cacheKey); } break; } logger.debug("Done broadcasting metadata change: entity=" + entity + ", event=" + event + ", cacheKey=" + cacheKey); } }

看到default分支會執行onEntityChange這個方法,看一下這個方法乾的是什麼

private Broadcaster.Listener cacheSyncListener = new Broadcaster.Listener() { @Override public void onClearAll(Broadcaster broadcaster) throws IOException { removeAllOLAPDataSources(); cleanAllDataCache(); } @Override public void onProjectSchemaChange(Broadcaster broadcaster, String project) throws IOException { removeOLAPDataSource(project); cleanDataCache(project); } @Override public void onProjectDataChange(Broadcaster broadcaster, String project) throws IOException { removeOLAPDataSource(project); // data availability (cube enabled/disabled) affects exposed schema to SQL cleanDataCache(project); } @Override public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException { if ("cube".equals(entity) && event == Event.UPDATE) { final String cubeName = cacheKey; new Thread() { // do not block the event broadcast thread public void run() { try { Thread.sleep(1000); cubeService.updateOnNewSegmentReady(cubeName); } catch (Throwable ex) { logger.error("Error in updateOnNewSegmentReady()", ex); } } }.start(); } } };

看到對於cache的同步是單獨實現了一個listener的,Event為update的時候,會單獨啟動一個線程去執行刷新緩存操作

加入簡單的重試邏輯

由於目前對於同步失敗的猜想是目標服務短暫不可用(響應超時或者處於失敗重啟階段),於是我只是單純的將失敗的任務重新塞入broadcastEvents隊列尾部供再一次調用。當然這種操作過於草率和暴力,卻也是驗證猜想最簡單快速的方式。

for (final RestClient restClient : restClients) { wipingCachePool.execute(new Runnable() { @Override public void run() { try { restClient.wipeCache(broadcastEvent.getEntity(), broadcastEvent.getEvent(), broadcastEvent.getCacheKey()); } catch (IOException e) { logger .warn("Thread failed during wipe cache at {}, error msg: {}", broadcastEvent, e.getMessage()); try { //這裡重新塞入隊列尾部,等待重新執行 broadcastEvents.putLast(broadcastEvent); logger.info("put failed broadcastEvent to queue. broacastEvent: {}", broadcastEvent); } catch (InterruptedException ex) { logger.warn("error reentry failed broadcastEvent to queue, broacastEvent:{}, error: {} ", broadcastEvent, ex); } } } }); } }

```

編譯部署之後,日誌中出現了如下錯誤:

```Thread failed during wipe cache at java.lang.IllegalStateException: Invalid use of BasicClientConnManager: connection still allocated.

比較意外,不過也終於發現了問題的所在。Kylin在啟動的時候會按照配置的nodes實例化一次RestClient,之後就直接從緩存中拿了,而kylin用的DefaultHttpClient每次只允許一次請求,請求完必須釋放鏈接,否則無法復用HttpClient。所以需要修改wipeCache方法的邏輯如下:

public void wipeCache(String entity, String event, String cacheKey) throws IOException { String url = baseUrl + "/cache/" + entity + "/" + cacheKey + "/" + event; HttpPut request = new HttpPut(url); HttpResponse response =null; try { response = client.execute(request); String msg = EntityUtils.toString(response.getEntity()); if (response.getStatusLine().getStatusCode() != 200) throw new IOException("Invalid response " + response.getStatusLine().getStatusCode() + " with cache wipe url " + url + "
" + msg); } catch (Exception ex) { throw new IOException(ex); } finally { //確保釋放連接 if(response!=null) { EntityUtils.consume(response.getEntity()); } request.releaseConnection(); } }

推薦閱讀:

大數據下看打虎
MaxCompoute禁止Full Scan功能開放
扯個關於大數據的淡
一文讀懂物聯網、雲計算與大數據的關係
鹿豹座平台(1.1-1.7)大數據新聞每周精選

TAG:大數據 | 大數據分析 | 數據分析工具 |