知識布局-jediscluster-pipeline
前言
由於水平有限,如果存在知識點上面的錯誤,請大神明確指出,我會認真聽取,並及時修正自己的錯誤。
目錄
1.為什麼要寫這篇文章
2.源碼分析
3.源碼展示
4.引用
1.為什麼要寫這篇文章
由於使用redis cluster模式,在操作redis時,有時會有大量的命令,如果每次都進行提交,會非常慢(比如一分鐘之內操作幾百萬次甚至千萬次)。通過一次請求提交多個命令,這個在某些場景下,能夠提升很多性能。
但是,查看jediscluster的代碼,卻發現其不支持pipeline。這就很鬱悶了。通過在網上查詢,我找到了這篇文章:
redis集群客戶端JedisCluster優化 - 管道(pipeline)模式支持
於是,抱著試一試的心態,我拿到了這個源碼。通過使用,我發現存在線程安全性問題。當然博主也在代碼中注釋了,not thread safe。我就想著能不能將其改成線程安全的,畢竟性能確實不錯。
2.源碼分析
下面是截取引用的博主中的segment
下面是我自己的優化
3.源碼展示
package pipeline;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import redis.clients.jedis.*;import redis.clients.jedis.exceptions.JedisMovedDataException;import redis.clients.jedis.exceptions.JedisRedirectionException;import redis.clients.util.JedisClusterCRC16;import redis.clients.util.SafeEncoder;import java.io.Closeable;import java.lang.reflect.Field;import java.util.*;/** * 在集群模式下提供批量操作的功能。 <br/> * 由於集群模式存在節點的動態添加刪除,且client不能實時感知(只有在執行命令時才可能知道集群發生變更), * 因此,該實現不保證一定成功,建議在批量操作之前調用 refreshCluster() 方法重新獲取集群信息。<br /> * 應用需要保證不論成功還是失敗都會調用close() 方法,否則可能會造成泄露。<br/> * 如果失敗需要應用自己去重試,因此每個批次執行的命令數量需要控制。防止失敗後重試的數量過多。<br /> * 基於以上說明,建議在集群環境較穩定(增減節點不會過於頻繁)的情況下使用,且允許失敗或有對應的重試策略。<br /> * * @author youaremoon * @since Ver 1.1 */public class JedisClusterPipeline extends PipelineBase implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(JedisClusterPipeline.class); private static final String SPLIT_WORD = ":"; // 部分欄位沒有對應的獲取方法,只能採用反射來做 // 你也可以去繼承JedisCluster和JedisSlotBasedConnectionHandler來提供訪問介面 private static final Field FIELD_CONNECTION_HANDLER; private static final Field FIELD_CACHE; static { FIELD_CONNECTION_HANDLER = getField(BinaryJedisCluster.class, "connectionHandler"); FIELD_CACHE = getField(JedisClusterConnectionHandler.class, "cache"); } private JedisSlotBasedConnectionHandler connectionHandler; private JedisClusterInfoCache clusterInfoCache; private Queue<Client> clients = new LinkedList<Client>(); // 根據順序存儲每個命令對應的Client private Map<JedisPool, Map<Long, Jedis>> jedisMap = new HashMap<JedisPool, Map<Long, Jedis>>(); // 用於緩存連接 private boolean hasDataInBuf = false; // 是否有數據在緩存區 public JedisClusterPipeline(JedisCluster jedisCluster) { setJedisCluster(jedisCluster); } /** * 刷新集群信息,當集群信息發生變更時調用 * * @param * @return */ public void refreshCluster() { connectionHandler.renewSlotCache(); } /** * 同步讀取所有數據. 與syncAndReturnAll()相比,sync()只是沒有對數據做反序列化 */ public void sync() { innerSync(null); } @Override public void close() { clean(); clients.clear(); for (Map.Entry<JedisPool, Map<Long, Jedis>> poolEntry : jedisMap.entrySet()) { for (Map.Entry<Long, Jedis> jedisEntry : poolEntry.getValue().entrySet()) { if (hasDataInBuf) { flushCachedData(jedisEntry.getValue()); } jedisEntry.getValue().close(); } } jedisMap.clear(); hasDataInBuf = false; } /** * 同步讀取所有數據 並按命令順序返回一個列表 * * @return 按照命令的順序返回所有的數據 */ public List<Object> syncAndReturnAll() { List<Object> responseList = new ArrayList<Object>(); innerSync(responseList); return responseList; } private void setJedisCluster(JedisCluster jedis) { connectionHandler = getValue(jedis, FIELD_CONNECTION_HANDLER); clusterInfoCache = getValue(connectionHandler, FIELD_CACHE); } private void innerSync(List<Object> formatted) { HashSet<Client> clientSet = new HashSet<Client>(); try { for (Client client : clients) { // 在sync()調用時其實是不需要解析結果數據的,但是如果不調用get方法,發生了JedisMovedDataException這樣的錯誤應用是不知道的,因此需要調用get()來觸發錯誤。 // 其實如果Response的data屬性可以直接獲取,可以省掉解析數據的時間,然而它並沒有提供對應方法,要獲取data屬性就得用反射,不想再反射了,所以就這樣了 Object data = generateResponse(client.getOne()).get(); if (null != formatted) { formatted.add(data); } // size相同說明所有的client都已經添加,就不用再調用add方法了 if (clientSet.size() != jedisMap.size()) { clientSet.add(client); } } } catch (JedisRedirectionException jre) { if (jre instanceof JedisMovedDataException) { // if MOVED redirection occurred, rebuilds clusters slot cache, // recommended by Redis cluster specification refreshCluster(); } throw jre; } finally { if (clientSet.size() != jedisMap.size()) { // 所有還沒有執行過的client要保證執行(flush),防止放回連接池後後面的命令被污染 for (Map.Entry<JedisPool, Map<Long, Jedis>> poolEntry : jedisMap.entrySet()) { for (Map.Entry<Long, Jedis> jedisEntry : poolEntry.getValue().entrySet()) { if (clientSet.contains(jedisEntry.getValue().getClient())) { continue; } flushCachedData(jedisEntry.getValue()); } } } hasDataInBuf = false; close(); } } private void flushCachedData(Jedis jedis) { try { jedis.getClient().getAll(); } catch (RuntimeException ex) { } } @Override protected Client getClient(String key) { byte[] bKey = SafeEncoder.encode(key); return getClient(bKey); } @Override protected Client getClient(byte[] key) { Jedis jedis = getJedis(JedisClusterCRC16.getSlot(key)); Client client = jedis.getClient(); clients.add(client); return client; } private Jedis getJedis(int slot) { // 根據線程id從緩存中獲取Jedis Jedis jedis = null; Map<Long, Jedis> tmpMap = null; //獲取線程id long id = Thread.currentThread().getId(); //獲取jedispool JedisPool pool = clusterInfoCache.getSlotPool(slot); if (jedisMap.containsKey(pool)) { tmpMap = jedisMap.get(pool); if (tmpMap.containsKey(id)) { jedis = tmpMap.get(id); } else { jedis = pool.getResource(); tmpMap.put(id, jedis); } } else { tmpMap = new HashMap<Long, Jedis>(); jedis = pool.getResource(); tmpMap.put(id, jedis); jedisMap.put(pool,tmpMap); } hasDataInBuf = true; return jedis; } private static Field getField(Class<?> cls, String fieldName) { try { Field field = cls.getDeclaredField(fieldName); field.setAccessible(true); return field; } catch (NoSuchFieldException | SecurityException e) { throw new RuntimeException("cannot find or access field " + fieldName + " from " + cls.getName(), e); } } @SuppressWarnings({"unchecked"}) private static <T> T getValue(Object obj, Field field) { try { return (T) field.get(obj); } catch (IllegalArgumentException | IllegalAccessException e) { LOGGER.error("get value fail", e); throw new RuntimeException(e); } }}
4.引用
redis集群客戶端JedisCluster優化 - 管道(pipeline)模式支持
推薦閱讀: