標籤:

Redis源碼剖析--跳躍表zskiplist

跳躍表是一種有序的數據結構,它通過在每個節點中維持多個指向其他節點的指針,從而達到快速訪問的目的。跳躍表在插入、刪除和查找操作上的平均複雜度為O(logN),最壞為O(N),可以和紅黑樹相媲美,但是在實現起來,比紅黑樹簡單很多。

說起跳躍表,在前段時間面試中可幫了我的大忙。騰訊一面的時候面試官要求設計一個數據結構,裡面的元素要求按一定順序存放,能以最低的複雜度獲取每個元素的名次,且增、刪等操作的複雜度儘可能低,博主最終就是用跳躍表來解決了這個問題,平均複雜度能達到O(logN)。

跳躍表數據結構

跳躍表的結構體定義在server.h文件中。其中包括跳躍表節點zskiplistNode和跳躍表zskiplist兩個結構體。

typedef struct zskiplistNode {n robj *obj; // 成員對象n double score; // 分值n struct zskiplistNode *backward; // 後向指針n // 層n struct zskiplistLevel {n struct zskiplistNode *forward; // 前向指針n unsigned int span; // 跨度n } level[];n} zskiplistNode;nntypedef struct zskiplist {n // 跳躍表的表頭節點和表尾節點n struct zskiplistNode *header, *tail;n // 表中節點的數量n unsigned long length;n // 表中層數最大的節點層數n int level;n} zskiplist;n

對於跳躍表節點來說:

  • obj 存放著該節點對於的成員對象,一般指向一個sds結構
  • score 表示該節點你的分值,跳躍表按照分值大小進行順序排列
  • backward 指向跳躍表的前一個節點
  • level[] 這個屬性至關重要,是跳躍表的核心所在,初始化一個跳躍表節點的時候會為其隨機生成一個層大小,每個節點的每一層以鏈表的形式連接起來。

看完上面的解釋之後,可能讀者對跳躍表還沒有一個清晰的認識,下面我畫了一張圖來形象的描述一下跳躍表結構。

跳躍表基本操作

Redis中關於跳躍表的相關操作函數定義在t_zset.c文件中,下面分別介紹幾個基本操作函數的實現源碼。

創建跳躍表

Redis在創建一個跳躍表的時候完成以下操作:

  • 創建一個zskiplist結構
  • 設定其level為1,長度length為0
  • 初始化一個表頭結點,其層數為32層,每一層均指向NULL

typedef struct zskiplistNode {n robj *obj; // 成員對象n double score; // 分值n struct zskiplistNode *backward; // 後向指針n // 層n struct zskiplistLevel {n struct zskiplistNode *forward; // 前向指針n unsigned int span; // 跨度n } level[];n} zskiplistNode;nntypedef struct zskiplist {n // 跳躍表的表頭節點和表尾節點n struct zskiplistNode *header, *tail;n // 表中節點的數量n unsigned long length;n // 表中層數最大的節點層數n int level;n} zskiplist;n

插入節點

往跳躍表中插入一個節點,必然會改變跳錶的長度,可能會改變其長度。而且對於插入位置處的前後節點的backward和forward指針均要改變。

插入節點的關鍵在找到在何處插入該節點,跳躍表是按照score分值進行排序的,其查找步驟大致是:從當前最高的level開始,向前查找,如果當前節點的score小於插入節點的score,繼續向前;反之,則降低一層繼續查找,直到第一層為止。此時,插入點就位於找到的節點之後。

zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {n // updata[]數組記錄每一層位於插入節點的前一個節點n zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;n // rank[]記錄每一層位於插入節點的前一個節點的排名n unsigned int rank[ZSKIPLIST_MAXLEVEL];n int i, level;nn serverAssert(!isnan(score));n x = zsl->header; // 表頭節點n // 從最高層開始查找n for (i = zsl->level-1; i >= 0; i--) {n // 存儲rank值是為了交叉快速地到達插入位置n rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];n // 前向指針不為空,前置指針的分值小於score或當前向指針的分值等// 於空但成員對象不等於o的情況下,繼續向前查找n while (x->level[i].forward &&n (x->level[i].forward->score < score ||n (x->level[i].forward->score == score &&n compareStringObjects(x->level[i].forward->obj,obj) < 0))) {n rank[i] += x->level[i].span;n x = x->level[i].forward;n }n // 存儲當前層上位於插入節點的前一個節點n update[i] = x;n }n // 此處假設插入節點的成員對象不存在於當前跳躍表內,即不存在重複的節點n // 隨機生成一個level值n level = zslRandomLevel();n if (level > zsl->level) {n // 如果level大於當前存儲的最大level值n // 設定rank數組中大於原level層以上的值為0n // 同時設定update數組大於原level層以上的數據n for (i = zsl->level; i < level; i++) {n rank[i] = 0;n update[i] = zsl->header;n update[i]->level[i].span = zsl->length;n }n // 更新level值n zsl->level = level;n }n // 創建插入節點n x = zslCreateNode(level,score,obj);n for (i = 0; i < level; i++) {n // 針對跳躍表的每一層,改變其forward指針的指向n x->level[i].forward = update[i]->level[i].forward;n update[i]->level[i].forward = x;nn // 更新插入節點的span值n x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);n // 更新插入點的前一個節點的span值n update[i]->level[i].span = (rank[0] - rank[i]) + 1;n }nn // 更新高層的span值n for (i = level; i < zsl->level; i++) {n update[i]->level[i].span++;n }n // 設定插入節點的backward指針n x->backward = (update[0] == zsl->header) ? NULL : update[0];n if (x->level[0].forward)n x->level[0].forward->backward = x;n elsen zsl->tail = x;n // 跳躍表長度+1n zsl->length++;n return x;n}n

跳躍表刪除

Redis提供了三種跳躍表節點刪除操作。分別如下:

  • 根據給定分值和成員來刪除節點,由zslDelete函數實現
  • 根據給定分值來刪除節點,由zslDeleteByScore函數實現
  • 根據給定排名來刪除節點,由zslDeleteByRank函數實現

上述三種操作的刪除節點部分都由zslDeleteNode函數完成。zslDeleteNode函數用於刪除某個節點,需要給定當前節點和每一層下當前節點的前一個節點。

void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {n int i;n for (i = 0; i < zsl->level; i++) {n if (update[i]->level[i].forward == x) {n // 如果x存在於該層,則需要修改前一個節點的前向指針n update[i]->level[i].span += x->level[i].span - 1;n update[i]->level[i].forward = x->level[i].forward;n } else {n // 反之,則只需要將span-1n update[i]->level[i].span -= 1;n }n }n // 修改backward指針,需要考慮x是否為尾節點n if (x->level[0].forward) {n x->level[0].forward->backward = x->backward;n } else {n zsl->tail = x->backward;n }n // 如果被刪除的節點為當前層數最多的節點,n while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)n zsl->level--;n zsl->length--;n}n

以zslDelete為例,根據節點的分值和成員來刪除該節點,其他兩個操作無非是在查找節點上有區別。

int zslDelete(zskiplist *zsl, double score, robj *obj) {n zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;n int i;nn x = zsl->header;n // 找到要刪除的節點,以及每一層上該節點的前一個節點n for (i = zsl->level-1; i >= 0; i--) {n while (x->level[i].forward &&n (x->level[i].forward->score < score ||n (x->level[i].forward->score == score &&n compareStringObjects(x->level[i].forward->obj,obj) < 0)))n x = x->level[i].forward;n update[i] = x;n }n // 跳躍表中可能存在分值相同的節點n // 所以此處需要判斷成員是否相等n x = x->level[0].forward;n if (x && score == x->score && equalStringObjects(x->obj,obj)) {n // 調用底層刪除節點函數n zslDeleteNode(zsl, x, update);n zslFreeNode(x);n return 1;n }n // 沒有刪除成功n return 0; n}n

獲取給定分值和成員的節點的排名

開篇提到博主在騰訊一面中被問的問題,需要獲取每個玩家的排名,跳躍表獲取排名的平均複雜度為O(logN),最壞為O(n)。其實現如下:

unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {n zskiplistNode *x;n unsigned long rank = 0;n int i;nn x = zsl->header;n // 從最高層開始查詢n for (i = zsl->level-1; i >= 0; i--) {n while (x->level[i].forward &&n (x->level[i].forward->score < score ||n (x->level[i].forward->score == score &&n compareStringObjects(x->level[i].forward->obj,o) <= 0))) {n // 前向指針不為空,前置指針的分值小於score或當前向指針的// 分值等於空但成員對象不等於o的情況下,繼續向前查找n rank += x->level[i].span;n x = x->level[i].forward;n }nn // 此時x可能是header,所以此處需要判斷一下n if (x->obj && equalStringObjects(x->obj,o)) {n return rank;n }n }n return 0;n}n

這裡粗略的畫了一張圖來說明查找過程,紅線代表查找的路線。

區間操作

Redis提供了一些區間操作,用於獲取某段區間上的節點或者刪除某段區間上的所有節點等操作,這些操作大大提高了Redis的易用性。

// 獲取某個區間上第一個符合範圍的節點。nzskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range) {n zskiplistNode *x;n int i;nn // 判斷給定的分值範圍是否在跳躍表的範圍內n if (!zslIsInRange(zsl,range)) return NULL;nn x = zsl->header;nn for (i = zsl->level-1; i >= 0; i--) {n // 如果當前節點的分值小於給定範圍的下限則一直向前查找n while (x->level[i].forward &&n !zslValueGteMin(x->level[i].forward->score,range))n x = x->level[i].forward;n }nn // x的下一個節點才是我們要找的節點n x = x->level[0].forward;n serverAssert(x != NULL);nn // 檢查該節點不超過給定範圍範圍n if (!zslValueLteMax(x->score,range)) return NULL;n return x;n}n// 獲取某個區間上最後一個符合範圍的節點。nzskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec *range) {n zskiplistNode *x;n int i;nn // 判斷給定的分值範圍是否在跳躍表的範圍內n if (!zslIsInRange(zsl,range)) return NULL;nn x = zsl->header;n for (i = zsl->level-1; i >= 0; i--) {n // 如果在給定範圍內則一直向前查找n while (x->level[i].forward &&n zslValueLteMax(x->level[i].forward->score,range))n x = x->level[i].forward;n }nn // x即為要找的節點n serverAssert(x != NULL);nn // 判斷該分值是否在給定範圍內n if (!zslValueGteMin(x->score,range)) return NULL;n return x;n}n// 刪除給定分值範圍內的所有元素nunsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec *range, dict *dict) {n zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;n unsigned long removed = 0;n int i;nn x = zsl->header;n // 找到小於或等於給定範圍最小分值的節點n // 並將每層上的節點保存到update數組n for (i = zsl->level-1; i >= 0; i--) {n while (x->level[i].forward && (range->minex ?n x->level[i].forward->score <= range->min :n x->level[i].forward->score < range->min))n x = x->level[i].forward;n update[i] = x;n }nn // x的下一個節點則是給定區間內分值最小的節點n x = x->level[0].forward;nn // 刪除該區間下的所有節點n while (x &&n (range->maxex ? x->score < range->max : x->score <= range->max))n {n // 保存下一個節點n zskiplistNode *next = x->level[0].forward;n // 刪除該節點n zslDeleteNode(zsl,x,update);n // 刪除該節點的成員n dictDelete(dict,x->obj);n // 釋放該節點n zslFreeNode(x);n removed++;n x = next;n }n // 返回刪除節點的個數n return removed;n}n// 刪除給定排名區間內的所有節點nunsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {n zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;n unsigned long traversed = 0, removed = 0;n int i;nn x = zsl->header;n // 找到給定排名區間內名次最小的節點n // 並保存每一層下該節點的前一個節點n for (i = zsl->level-1; i >= 0; i--) {n while (x->level[i].forward && (traversed + x->level[i].span) < start) {n traversed += x->level[i].span;n x = x->level[i].forward;n }n update[i] = x;n }n // traversed保存當前刪除節點的排名值n traversed++;n x = x->level[0].forward;n while (x && traversed <= end) {n // 記錄下一個節點n zskiplistNode *next = x->level[0].forward;n // 刪除該節點n zslDeleteNode(zsl,x,update);n // 刪除該節點的成員n dictDelete(dict,x->obj);n // 釋放該節點n zslFreeNode(x);n // 個數+1n removed++;n // 排名值加1n traversed++;n x = next;n }n // 返回刪除的節點個數n return removed;n}n

跳躍表小結

跳躍表是有序集合的底層實現之一。在同一個跳躍表中,多個節點可以包含相同的分值,但每個節點的成員對象必須是唯一的。跳躍表的節點是按照分值進行排序的,當分值相同時,節點按照成員對象的大小進行排序。


推薦閱讀:

Redis內部數據結構詳解(4)——ziplist
Redis存儲文章點擊量,string類型和hash類型用哪種比較好?
R語言操作redis資料庫詳解
redis+mysql有幾種用法?
Redis服務支持5000萬的QPS,有什麼好的思路?

TAG:Redis |