redis學習系列(五)--JedisPool與spring集成的實現及一致性哈希分析和基於Redis的分散式鎖
05-25
redis學習系列(五)--JedisPool與spring集成的實現及一致性哈希分析和基於Redis的分散式鎖
推薦閱讀:
來自專欄 編碼前線
Redis與spring的整合
相關依賴jar包spring把專門的數據操作獨立封裝在spring-data系列中,spring-data-redis是對Redis的封裝<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>1.4.2.RELEASE</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.2</version>
</dependency>
Spring 配置文件applicationContext.xml<!--命令空間中加入下面這行-->
xmlns:p="http://www.springframework.org/schema/p"
<!-- redis連接池配置文件 -->
<context:property-placeholder location="classpath:redis.properties"
/>
<bean id="poolConfig"
class="redis.clients.jedis.JedisPoolConfig">
<property name="maxIdle"
value="${redis.maxIdle}"
/>
<property name="maxTotal"
value="${redis.maxTotal}"
/>
<property name="MaxWaitMillis"
value="${redis.MaxWaitMillis}"
/>
<property name="testOnBorrow"
value="${redis.testOnBorrow}"
/>
</bean>
<bean id="connectionFactory"
class="org.springframework.data. redis.connection.jedis.JedisConnectionFactory"
p:host-name="${redis.host}"
p:port="${redis.port}"
p:password="${redis.pass}"
p:pool-config-ref="poolConfig"/>
<bean id="redisTemplate"
class="org.springframework.data. redis.core.RedisTemplate">
<property name="connectionFactory"
ref="connectionFactory"
/>
</bean>
注意新版的maxTotal,MaxWaitMillis這兩個欄位與舊版的不同。redis連接池配置文件redis.propertiesredis.host=192.168.2.129
redis.port=6379
redis.pass=redis129
redis.maxIdle=300
redis.maxTotal=600
redis.MaxWaitMillis=1000
redis.testOnBorrow=true
好了,配置完成,下面寫上代碼回到頂部測試代碼User@Entity
@Table(name = "t_user")
public
class
User {
//主鍵
private
String id;
//用戶名
private
String userName;
//...省略get,set...
}
BaseRedisDao@Repository
public
abstract
class
BaseRedisDao<K,V> {
@Autowired(required=true)
protected
RedisTemplate<K, V> redisTemplate;
}
IUserDao public
interface
IUserDao {
public
boolean
save(User user);
public
boolean
update(User user);
public
boolean
delete(String userIds);
public
User find(String userId);
}
UserDao @Repository
public
class
UserDao extends
BaseRedisDao<String, User> implements
IUserDao {
@Override
public
boolean
save(final
User user) {
boolean
res = redisTemplate.execute(new
RedisCallback<Boolean>() {
public
Boolean doInRedis(RedisConnection connection) throws
DataAccessException {
RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
byte[] key = serializer.serialize(user.getId());
byte[] value = serializer.serialize(user.getUserName());
//set not exits
return
connection.setNX(key, value);
}
});
return
res;
}
@Override
public
boolean
update(final
User user) {
boolean
result = redisTemplate.execute(new
RedisCallback<Boolean>() {
public
Boolean doInRedis(RedisConnection connection) throws
DataAccessException {
RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
byte[] key = serializer.serialize(user.getId());
byte[] name = serializer.serialize(user.getUserName());
//set
connection.set(key, name);
return
true;
}
});
return
result;
}
@Override
public
User find(final
String userId) {
User result = redisTemplate.execute(new
RedisCallback<User>() {
public
User doInRedis(RedisConnection connection) throws
DataAccessException {
RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
byte[] key = serializer.serialize(userId);
//get
byte[] value = connection.get(key);
if
(value == null) {
return
null;
}
String name = serializer.deserialize(value);
User resUser = new
User();
resUser.setId(userId);
resUser.setUserName(name);
return
resUser;
}
});
return
result;
}
@Override
public
boolean
delete(final
String userId) {
boolean
result = redisTemplate.execute(new
RedisCallback<Boolean>() {
public
Boolean doInRedis(RedisConnection connection) throws
DataAccessException {
RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
byte[] key = serializer.serialize(userId);
//delete
connection.del(key);
return
true;
}
});
return
result;
}
}
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath*:applicationContext.xml"})
public
class
RedisTest extends
AbstractJUnit4SpringContextTests {
@Autowired
private
IUserDao userDao;
@Test
public
void
testSaveUser() {
User user = new
User();
user.setId("402891815170e8de015170f6520b0000");
user.setUserName("zhangsan");
boolean
res = userDao.save(user);
Assert.assertTrue(res);
}
@Test
public
void
testGetUser() {
User user = new
User();
user = userDao.find("402891815170e8de015170f6520b0000");
System.out.println(user.getId() + "-"
+ user.getUserName() );
}
@Test
public
void
testUpdateUser() {
User user = new
User();
user.setId("402891815170e8de015170f6520b0000");
user.setUserName("lisi");
boolean
res = userDao.update(user);
Assert.assertTrue(res);
}
@Test
public
void
testDeleteUser() {
boolean
res = userDao.delete("402891815170e8de015170f6520b0000");
Assert.assertTrue(res);
}
}
String類型的增刪該查已完成,Hash,List,Set數據類型的操作就不舉例了,和使用命令的方式差不多。如下connection.hSetNX(key, field, value);
connection.hDel(key, fields);
connection.hGet(key, field);
connection.lPop(key);
connection.lPush(key, value);
connection.rPop(key);
connection.rPush(key, values);
connection.sAdd(key, values);
connection.sMembers(key);
connection.sDiff(keys);
connection.sPop(key);
回到頂部整合可能遇到的問題1.NoSuchMethodErrorjava.lang.NoSuchMethodError: org.springframework.core.serializer.support.DeserializingConverter.<init>(Ljava/lang/ClassLoader;)V
Caused by: java.lang.NoSuchMethodError: redis.clients.jedis.JedisShardInfo.setTimeout(I)V
類似找不到類,找不到方法的問題,當確定依賴的jar已經引入之後,此類問題多事spring-data-redis以及jedis版本問題,多換個版本試試,本文上面提到的版本可以使用。1.No qualifying bean1No qualifying bean of type [org.springframework.data.redis.core.RedisTemplate] found for
dependency
找不到bean,考慮applicationContext.xml中配置redisTemplate bean時實現類是否寫錯。例如,BaseRedisDao注入的是RedisTemplate類型的對象,applicationContext.xml中配置的實現類卻是RedisTemplate的子類StringRedisTemplate,那肯定報錯。整合好後,下面我們著重學習基於redis的分散式鎖的實現。基於redis實現的分散式鎖我們知道,在多線程環境中,鎖是實現共享資源互斥訪問的重要機制,以保證任何時刻只有一個線程在訪問共享資源。鎖的基本原理是:用一個狀態值表示鎖,對鎖的佔用和釋放通過狀態值來標識,因此基於redis實現的分散式鎖主要依賴redis的SETNX命令和DEL命令,SETNX相當於上鎖,DEL相當於釋放鎖,當然,在下面的具體實現中會更複雜些。之所以稱為分散式鎖,是因為客戶端可以在redis集群環境中向集群中任一個可用Master節點請求上鎖(即SETNX命令存儲key到redis緩存中是隨機的)。 現在相信你已經對在基於redis實現的分散式鎖的基本概念有了解,需要注意的是,這個和前面文章提到的使用WATCH 命令對key值進行鎖操作沒有直接的關係。java中synchronized和Lock對象都能對共享資源進行加鎖,下面我們將學慣用java實現的redis分散式鎖。java中的鎖技術在分析java實現的redis分散式鎖之前,我們先來回顧下java中的鎖技術,為了直觀的展示,我們採用「多個線程共享輸出設備」來舉例。不加鎖共享輸出設備public
class
LockTest {
//不加鎖
static
class
Outputer {
public
void
output(String name) {
for(int
i=0; i<name.length(); i++) {
System.out.print(name.charAt(i));
}
System.out.println();
}
}
public
static
void
main(String[] args) {
final
Outputer output = new
Outputer();
//線程1列印zhangsan
new
Thread(new
Runnable(){
@Override
public
void
run() {
while(true) {
try{
Thread.sleep(1000);
}catch(InterruptedException e) {
e.printStackTrace();
}
output.output("zhangsan");
}
}
}).start();
//線程2列印lingsi
new
Thread(new
Runnable(){
@Override
public
void
run() {
while(true) {
try{
Thread.sleep(1000);
}catch(InterruptedException e) {
e.printStackTrace();
}
output.output("lingsi");
}
}
}).start();
//線程3列印wangwu
new
Thread(new
Runnable(){
@Override
public
void
run() {
while(true) {
try{
Thread.sleep(1000);
}catch(InterruptedException e) {
e.printStackTrace();
}
output.output("huangwu");
}
}
}).start();
}
}
上面例子中,三個線程同時共享輸出設備output,線程1需要列印zhangsan,線程2需要列印lingsi,線程3需要列印wangwu。在不加鎖的情況,這三個線程會不會因為得不到輸出設備output打架呢,我們來看看運行結果: huangwu
zhangslingsi
an
huangwu
zlingsi
hangsan
huangwu
lzhangsan
ingsi
huangwu
lingsi
從運行結果可以看出,三個線程打架了,線程1沒列印完zhangsan,線程2就來搶輸出設備......可見,這不是我們想要的,我們想要的是線程之間能有序的工作,各個線程之間互斥的使用輸出設備output。使用java5中的Lock對輸出設備加鎖現在我們對Outputer進行改進,給它加上鎖,加鎖之後每次只有一個線程能訪問它。 //使用java5中的鎖
static
class
Outputer{
Lock lock = new
ReentrantLock();
public
void
output(String name) {
//傳統java加鎖
//synchronized (Outputer.class){
lock.lock();
try
{
for(int
i=0; i<name.length(); i++) {
System.out.print(name.charAt(i));
}
System.out.println();
}finally{
//任何情況下都有釋放鎖
lock.unlock();
}
//}
}
}
看看加鎖後的輸出結果:zhangsan
lingsi
huangwu
zhangsan
lingsi
huangwu
zhangsan
lingsi
huangwu
zhangsan
lingsi
huangwu
zhangsan
lingsi
huangwu
......
從運行結果中可以看出,三個線程之間不打架了,線程之間的列印變得有序。有個這個基礎,下面我們來學習基於Redis實現的分散式鎖就更容易了。Redis分散式鎖實現分析從上面java鎖的使用中可以看出,鎖對象主要有lock與unlock方法,在lock與unlock方法之間的代碼(臨界區)能保證線程互斥訪問。基於redis實現的Java分散式鎖主要依賴redis的SETNX命令和DEL命令,SETNX相當於上鎖(lock),DEL相當於釋放鎖(unlock)。我們只要實現Lock介面重寫lock()和unlock()即可。但是這還不夠,安全可靠的分散式鎖應該滿足滿足下面三個條件:l 互斥,不管任何時候,只有一個客戶端能持有同一個鎖。l 不會死鎖,最終一定會得到鎖,即使持有鎖的客戶端對應的master節點宕掉。l 容錯,只要大多數Redis節點正常工作,客戶端應該都能獲取和釋放鎖。那麼什麼情況下回不滿足上面三個條件呢。多個線程(客戶端)同時競爭鎖可能會導致多個客戶端同時擁有鎖。比如,(1)線程1在master節點拿到了鎖(存入key)(2)master節點在把線程1創建的key寫入slave之前宕機了,此時集群中的節點已經沒有鎖(key)了,包括master節點的slaver節點(3)slaver節點升級為master節點(4)線程2向新的master節點發起鎖(存入key)請求,很明顯,能請求成功。可見,線程1和線程2同時獲得了鎖。如果在更高並發的情況,可能會有更多線程(客戶端)獲取鎖,這種情況就會導致上文所說的線程「打架」問題,線程之間的執行雜亂無章。 那什麼情況下又會發生死鎖的情況呢。如果擁有鎖的線程(客戶端)長時間的執行或者因為某種原因造成阻塞,就會導致鎖無法釋放(unlock沒有調用),其它線程就不能獲取鎖而而產生無限期死鎖的情況。其它線程在執行lock失敗後即使粗暴的執行unlock刪除key之後也不能正常釋放鎖,因為鎖就只能由獲得鎖的線程釋放,鎖不能正常釋放其它線程仍然獲取不到鎖。解決死鎖的最好方式是設置鎖的有效時間(redis的expire命令),不管是什麼原因導致的死鎖,有效時間過後,鎖將會被自動釋放。 為了保障容錯功能,即只要有Redis節點正常工作,客戶端應該都能獲取和釋放鎖,我們必須用相同的key不斷循環向Master節點請求鎖,當請求時間超過設定的超時時間則放棄請求鎖,這個可以防止一個客戶端在某個宕掉的master節點上阻塞過長時間,如果一個master節點不可用了,應該儘快嘗試下一個master節點。釋放鎖比較簡單,因為只需要在所有節點都釋放鎖就行,不管之前有沒有在該節點獲取鎖成功。Redlock演算法根據上面的分析,官方提出了一種用Redis實現分散式鎖的演算法,這個演算法稱為RedLock。RedLock演算法的主要流程如下: RedLock演算法主要流程 Java實現 結合上面的流程圖,加上下面的代碼解釋,相信你一定能理解redis分散式鎖的實現原理public
class
RedisLock implements
Lock{
protected
StringRedisTemplate redisStringTemplate;
// 存儲到redis中的鎖標誌
private
static
final
String LOCKED = "LOCKED";
// 請求鎖的超時時間(ms)
private
static
final
long
TIME_OUT = 30000;
// 鎖的有效時間(s)
public
static
final
int
EXPIRE = 60;
// 鎖標誌對應的key;
private
String key;
// state flag
private
volatile
boolean
isLocked = false;
public
RedisLock(String key) {
this.key = key;
@SuppressWarnings("resource")
ApplicationContext ctx = new
ClassPathXmlApplicationContext("classpath*:applicationContext.xml");
redisStringTemplate = (StringRedisTemplate)ctx.getBean("redisStringTemplate");
}
@Override
public
void
lock() {
//系統當前時間,毫秒
long
nowTime = System.nanoTime();
//請求鎖超時時間,毫秒
long
timeout = TIME_OUT*1000000;
final
Random r = new
Random();
try
{
//不斷循環向Master節點請求鎖,當請求時間(System.nanoTime() - nano)超過設定的超時時間則放棄請求鎖
//這個可以防止一個客戶端在某個宕掉的master節點上阻塞過長時間
//如果一個master節點不可用了,應該儘快嘗試下一個master節點
while
((System.nanoTime() - nowTime) < timeout) {
//將鎖作為key存儲到redis緩存中,存儲成功則獲得鎖
if
(redisStringTemplate.getConnectionFactory().getConnection().setNX(key.getBytes(),
LOCKED.getBytes())) {
//設置鎖的有效期,也是鎖的自動釋放時間,也是一個客戶端在其他客戶端能搶佔鎖之前可以執行任務的時間
//可以防止因異常情況無法釋放鎖而造成死鎖情況的發生
redisStringTemplate.expire(key, EXPIRE, TimeUnit.SECONDS);
isLocked = true;
//上鎖成功結束請求
break;
}
//獲取鎖失敗時,應該在隨機延時後進行重試,避免不同客戶端同時重試導致誰都無法拿到鎖的情況出現
//睡眠3毫秒後繼續請求鎖
Thread.sleep(3, r.nextInt(500));
}
} catch
(Exception e) {
e.printStackTrace();
}
}
@Override
public
void
unlock() {
//釋放鎖
//不管請求鎖是否成功,只要已經上鎖,客戶端都會進行釋放鎖的操作
if
(isLocked) {
redisStringTemplate.delete(key);
}
}
@Override
public
void
lockInterruptibly() throws
InterruptedException {
// TODO Auto-generated method stub
}
@Override
public
boolean
tryLock() {
// TODO Auto-generated method stub
return
false;
}
@Override
public
boolean
tryLock(long
time, TimeUnit unit) throws
InterruptedException {
// TODO Auto-generated method stub
return
false;
}
@Override
public
Condition newCondition() {
// TODO Auto-generated method stub
return
null;
}
}
好了,RedisLock已經實現,我們對Outputer使用RedisLock進行修改 /使用RedisLock
static
class
Outputer {
//創建一個名為redisLock的RedisLock類型的鎖
RedisLock redisLock = new
RedisLock("redisLock");
public
void
output(String name) {
//上鎖
redisLock.lock();
try
{
for(int
i=0; i<name.length(); i++) {
System.out.print(name.charAt(i));
}
System.out.println();
}finally{
//任何情況下都要釋放鎖
redisLock.unlock();
}
}
}
看看使用RedisLock加鎖後的的運行結果 lingsi
zhangsan
huangwu
lingsi
zhangsan
huangwu
lingsi
zhangsan
huangwu
lingsi
zhangsan
huangwu
lingsi
zhangsan
huangwu
......
可見,使用RedisLock加鎖後線程之間不再「打架」,三個線程互斥的訪問output。問題現在我無法論證RedLock演算法在分散式、高並發環境下的可靠性,但從本例三個線程的運行結果看,RedLock演算法確實保證了三個線程互斥的訪問output(redis.maxIdle=300 redis.maxTotal=600,運行到Timeout waiting for idle object都沒有出現線程「打架」的問題)。我認為RedLock演算法仍有些問題沒說清楚,比如,如何防止宕機時多個線程同時獲得鎖;RedLock演算法在釋放鎖的處理上,不管線程是否獲取鎖成功,只要上了鎖,就會到每個master節點上釋放鎖,這就會導致一個線程上的鎖可能會被其他線程釋放掉,這就和每個鎖只能被獲得鎖的線程釋放相互矛盾。這些有待後續進一步交流學習研究。推薦閱讀:
※數據處理及分析-讀取MySQL資料庫
※MySQL鎖之源碼探索
※寫給菜鳥和老鳥,mysql5.6、5.7如何飛速安裝
※pyspark的 Mysql寫入
※mysql已經有cache了,為啥還要在它前面加一層memcached?