kylin 同步原理及加入重試邏輯



  1. 主節點更新了schema而從節點未sync
  2. 從節點中部分sync成功,而不是全部

而很明顯的是kylin中所有的數據,包括所有元數據都是落地在HBase中的,那唯一導致節點間數據不一致的可能就只有各個節點都有本地緩存的情況了。為了理解原理方便debug,我對kylin master-slave的同步原理做了一些源代碼層面的剖析。






1. 先來看看整個流程中比較關鍵的部分BroadCaster類的實現

//Broadcaster的構造函數 private Broadcaster(final KylinConfig config) { this.config = config; //獲取kylin.properties中"kylin.server.cluster-servers"配置的值 //也就是集群中所有節點的配置了 final String[] nodes = config.getRestServers(); if (nodes == null || nodes.length < 1) { logger.warn("There is no available rest server; check the kylin.server.cluster-servers config"); broadcastEvents = null; // disable the broadcaster return; } logger.debug(nodes.length + " nodes in the cluster: " + Arrays.toString(nodes)); //開一個單線程,不間斷的循環從broadcastEvents隊列裡面獲取註冊的事件。 Executors.newSingleThreadExecutor(new DaemonThreadFactory()).execute(new Runnable() { @Override public void run() { final List<RestClient> restClients = Lists.newArrayList(); for (String node : config.getRestServers()) { //根據配置的節點信息註冊RestClient restClients.add(new RestClient(node)); } final ExecutorService wipingCachePool = Executors.newFixedThreadPool(restClients.size(), new DaemonThreadFactory()); while (true) { try { final BroadcastEvent broadcastEvent = broadcastEvents.takeFirst();"Announcing new broadcast event: " + broadcastEvent); for (final RestClient restClient : restClients) { wipingCachePool.execute(new Runnable() { @Override public void run() { try { restClient.wipeCache(broadcastEvent.getEntity(), broadcastEvent.getEvent(), broadcastEvent.getCacheKey()); } catch (IOException e) { logger.warn("Thread failed during wipe cache at " + broadcastEvent, e); } } }); } } catch (Exception e) { logger.error("error running wiping", e); } } } }); }


2. 再來看看廣播的實際邏輯實現,基本封裝在RestClient中

//此處是根據配置的節點信息正則匹配:"user:pwd@host:port" public RestClient(String uri) { Matcher m = fullRestPattern.matcher(uri); if (!m.matches()) throw new IllegalArgumentException("URI: " + uri + " -- does not match pattern " + fullRestPattern); String user =; String pwd =; String host =; String portStr =; int port = Integer.parseInt(portStr == null ? "7070" : portStr); init(host, port, user, pwd); }

根據配置的節點信息實例化RestClient,然後在init方法中,拼接wipe cache的url

private void init(String host, int port, String userName, String password) { = host; this.port = port; this.userName = userName; this.password = password; //拼接rest介面 this.baseUrl = "http://" + host + ":" + port + "/kylin/api"; client = new DefaultHttpClient(); if (userName != null && password != null) { CredentialsProvider provider = new BasicCredentialsProvider(); UsernamePasswordCredentials credentials = new UsernamePasswordCredentials(userName, password); provider.setCredentials(AuthScope.ANY, credentials); client.setCredentialsProvider(provider); } }

發現kylin所有的交互介面基本上底層都是調用的自己的rest介面,它自己所謂的jdbc的查詢方式其實也只是在rest介面上封裝了一層,底層還是http請求。可謂是掛羊頭賣狗肉了。看看RestClient中怎麼去通知其他節點wipe cache的

public void wipeCache(String entity, String event, String cacheKey) throws IOException { String url = baseUrl + "/cache/" + entity + "/" + cacheKey + "/" + event; HttpPut request = new HttpPut(url); try { HttpResponse response = client.execute(request); String msg = EntityUtils.toString(response.getEntity()); if (response.getStatusLine().getStatusCode() != 200) throw new IOException("Invalid response " + response.getStatusLine().getStatusCode() + " with cache wipe url " + url + "
" + msg); } catch (Exception ex) { throw new IOException(ex); } finally { request.releaseConnection(); } }



public void notifyMetadataChange(String entity, Event event, String cacheKey) throws IOException { Broadcaster broadcaster = Broadcaster.getInstance(getConfig()); //這裡會判斷當前節點是否註冊為listener了,如果註冊了,此邏輯會被ignored broadcaster.registerListener(cacheSyncListener, "cube"); broadcaster.notifyListener(entity, event, cacheKey); } //註冊listener的邏輯 public void registerListener(Listener listener, String... entities) { synchronized (CACHE) { // ignore re-registration List<Listener> all = listenerMap.get(SYNC_ALL); if (all != null && all.contains(listener)) { return; } for (String entity : entities) { if (!StringUtils.isBlank(entity)) addListener(entity, listener); } //註冊幾種事件類型 addListener(SYNC_ALL, listener); addListener(SYNC_PRJ_SCHEMA, listener); addListener(SYNC_PRJ_DATA, listener); } }


public void notifyListener(String entity, Event event, String cacheKey) throws IOException { synchronized (CACHE) { List<Listener> list = listenerMap.get(entity); if (list == null) return; logger.debug("Broadcasting metadata change: entity=" + entity + ", event=" + event + ", cacheKey=" + cacheKey + ", listeners=" + list); // prevents concurrent modification exception list = Lists.newArrayList(list); switch (entity) { case SYNC_ALL: for (Listener l : list) { l.onClearAll(this); } clearCache(); // clear broadcaster too in the end break; case SYNC_PRJ_SCHEMA: ProjectManager.getInstance(config).clearL2Cache(); for (Listener l : list) { l.onProjectSchemaChange(this, cacheKey); } break; case SYNC_PRJ_DATA: ProjectManager.getInstance(config).clearL2Cache(); // cubes first becoming ready leads to schema change too for (Listener l : list) { l.onProjectDataChange(this, cacheKey); } break; //大部分的走向 default: for (Listener l : list) { l.onEntityChange(this, entity, event, cacheKey); } break; } logger.debug("Done broadcasting metadata change: entity=" + entity + ", event=" + event + ", cacheKey=" + cacheKey); } }


private Broadcaster.Listener cacheSyncListener = new Broadcaster.Listener() { @Override public void onClearAll(Broadcaster broadcaster) throws IOException { removeAllOLAPDataSources(); cleanAllDataCache(); } @Override public void onProjectSchemaChange(Broadcaster broadcaster, String project) throws IOException { removeOLAPDataSource(project); cleanDataCache(project); } @Override public void onProjectDataChange(Broadcaster broadcaster, String project) throws IOException { removeOLAPDataSource(project); // data availability (cube enabled/disabled) affects exposed schema to SQL cleanDataCache(project); } @Override public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException { if ("cube".equals(entity) && event == Event.UPDATE) { final String cubeName = cacheKey; new Thread() { // do not block the event broadcast thread public void run() { try { Thread.sleep(1000); cubeService.updateOnNewSegmentReady(cubeName); } catch (Throwable ex) { logger.error("Error in updateOnNewSegmentReady()", ex); } } }.start(); } } };




for (final RestClient restClient : restClients) { wipingCachePool.execute(new Runnable() { @Override public void run() { try { restClient.wipeCache(broadcastEvent.getEntity(), broadcastEvent.getEvent(), broadcastEvent.getCacheKey()); } catch (IOException e) { logger .warn("Thread failed during wipe cache at {}, error msg: {}", broadcastEvent, e.getMessage()); try { //這裡重新塞入隊列尾部,等待重新執行 broadcastEvents.putLast(broadcastEvent);"put failed broadcastEvent to queue. broacastEvent: {}", broadcastEvent); } catch (InterruptedException ex) { logger.warn("error reentry failed broadcastEvent to queue, broacastEvent:{}, error: {} ", broadcastEvent, ex); } } } }); } }



```Thread failed during wipe cache at java.lang.IllegalStateException: Invalid use of BasicClientConnManager: connection still allocated.


public void wipeCache(String entity, String event, String cacheKey) throws IOException { String url = baseUrl + "/cache/" + entity + "/" + cacheKey + "/" + event; HttpPut request = new HttpPut(url); HttpResponse response =null; try { response = client.execute(request); String msg = EntityUtils.toString(response.getEntity()); if (response.getStatusLine().getStatusCode() != 200) throw new IOException("Invalid response " + response.getStatusLine().getStatusCode() + " with cache wipe url " + url + "
" + msg); } catch (Exception ex) { throw new IOException(ex); } finally { //確保釋放連接 if(response!=null) { EntityUtils.consume(response.getEntity()); } request.releaseConnection(); } }


