当前位置: 代码迷 >> 综合 >> redis 源码系列(6):实现简单又能打 --- skiplist
  详细解决方案

redis 源码系列(6):实现简单又能打 --- skiplist

热度:33   发布时间:2024-01-24 15:09:23.0

跳跃表(skiplist)是一种高级数据结构,其插入、检索、删除的时间复杂度均为 O(logN)。因为其实实现简单,很多项目将其作为红黑树的一个替代品。

今天我们来看一下 redis 中 skiplist 的实现,涉及到的文件在 src/t_zset.c 和 src/redis.h。

redis 中的跳表基本上与常见跳表有以下几点区别:

  1. 节点的 key 其实相当于 score + obj,所以在比较节点的时候,会在 score 相同的情况下,继续比较 obj
  2. 每个节点都有一个后退指针,所以 level 0 是一个双向链表

结构体定义

// 跳表节点
typedef struct zskiplistNode {// 成员对象robj *obj;// 分值double score;// 后退指针, 指向 level 0 的前驱节点struct zskiplistNode *backward;// 层struct zskiplistLevel {// 前进指针struct zskiplistNode *forward;// 跨度, 代表节点在 level i 层通过 forward 能跳过的节点数目unsigned int span;} level[];} zskiplistNode;typedef struct zskiplist {// 表头节点和表尾节点struct zskiplistNode *header, *tail;// 表中节点的数量unsigned long length;// 表中层数最大的节点的层数int level;} zskiplist;

因为 skiplist node 的层数是动态的,所以我们又见到了我们的老朋友柔性数组。

构造和析构

// 构造一个 level 层的 skiplistNode
zskiplistNode *zslCreateNode(int level, double score, robj *obj) {// 分配空间zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));// 设置属性zn->score = score;zn->obj = obj;return zn;
}// 构造跳表
zskiplist *zslCreate(void) {int j;zskiplist *zsl;// 分配空间zsl = zmalloc(sizeof(*zsl));// 设置高度和起始层数zsl->level = 1;zsl->length = 0;// 初始化表头节点zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {zsl->header->level[j].forward = NULL;zsl->header->level[j].span = 0;}zsl->header->backward = NULL;// 设置表尾, zsl-tail == NULL 则代表跳表为空zsl->tail = NULL;return zsl;
}// 释放 node
void zslFreeNode(zskiplistNode *node) {// 注意 obj 是引用计数decrRefCount(node->obj);zfree(node);
}// 释放跳表
void zslFree(zskiplist *zsl) {zskiplistNode *node = zsl->header->level[0].forward, *next;// 释放表头zfree(zsl->header);// 释放表中所有节点// T = O(N)while(node) {next = node->level[0].forward;zslFreeNode(node);node = next;}// 释放跳跃表结构zfree(zsl);
}

插入

// 辅助函数,返回一个随机值作为节点的层数
int zslRandomLevel(void) {int level = 1;while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))level += 1;return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}/*插入节点,不允许 score 和 obj 都相同的节点出现,但是代码并未做强制检查为了防止有同学不清楚跳表的工作原理,这里简单讲一下。传统的链表, 即使是有序的,查找一个元素的时间复杂度是 O(N)。这是因为链表节点的内存布局不是连续的,我们要想从第一个节点到第 N 节点,一次只能跳过一个节点(1-2-3-...N)。跳表通过在节点内加入一个 level动态数组,链表可以一次跳过不止一个节点, 如下所示level2 |Node1|---------------------->|NodeX| level1 |Node1|-------------> |Node3|----->|NodeN-1| level0 |Node1| -> |Node2| -> |Node3|->... |NodeN-1| -> |NodeN|*/
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;unsigned int rank[ZSKIPLIST_MAXLEVEL];int i, level;redisAssert(!isnan(score));// 查找新节点在每一层的插入位置,循环结束后 update[i] 会是 level i// 所有节点中,最后一个小于新节点的节点x = zsl->header;for (i = zsl->level-1; i >= 0; i--) {// rank[i] 是累加过程,rank[i+1] 是到达该节点需要跨越的节点数// rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];// 沿着前进指针遍历跳跃表, 直到// 1. x 是该层最后一个节点 or// 2. x 在该层的后继节点的分数大于 score or// 3. x 在该层的后继节点的分数等于 score,但是后继节点的 obj 大于等于objwhile (x->level[i].forward &&(x->level[i].forward->score < score ||// 比对分值(x->level[i].forward->score == score &&// 比对成员compareStringObjects(x->level[i].forward->obj,obj) < 0))) {// 记录沿途跨越了多少个节点rank[i] += x->level[i].span;// 移动至下一指针x = x->level[i].forward;}// 新节点应该插入到 x 后面update[i] = x;}/* we assume the key is not already inside, since we allow duplicated* scores, and the re-insertion of score and redis object should never* happen since the caller of zslInsert() should test in the hash table* if the element is already inside or not. */// 获取一个随机值作为新节点的层数level = zslRandomLevel();// 如果新节点的层数比表中其他节点的层数都要大, 那么需要更新 skiplist// 的高度,并且将高出来的 level 的前驱节点设置为 skiplist->headif (level > zsl->level) {// 初始化未使用层for (i = zsl->level; i < level; i++) {rank[i] = 0;update[i] = zsl->header;// 指向 nil 所以跨越所有元素update[i]->level[i].span = zsl->length;}// 更新表中节点最大层数zsl->level = level;}// 创建新节点x = zslCreateNode(level,score,obj);// 将节点在每层,分别插入到其前驱节点之后, update[i] 即为节点在// level i 的前驱节点for (i = 0; i < level; i++) {// 设置新节点的 forward 指针x->level[i].forward = update[i]->level[i].forward;// 将沿途记录的各个节点的 forward 指针指向新节点update[i]->level[i].forward = x;// update[i]->level[i].span 本来是直接调到 level i 下一个节点的跨越节点数// 因为现在在中间插入了一个节点,所以有:// x->level[i].span + (rank[0]-rank[i]) + 1== update[i]->level[i].spanx->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);update[i]->level[i].span = (rank[0] - rank[i]) + 1;}// 当 level 小于 zsh->level 时,需要更新 > level 的前驱节点的 spanfor (i = level; i < zsl->level; i++) {update[i]->level[i].span++;}// 设置新节点的后退指针x->backward = (update[0] == zsl->header) ? NULL : update[0];if (x->level[0].forward)x->level[0].forward->backward = x;elsezsl->tail = x;// 跳跃表的节点计数增一zsl->length++;return x;
}

删除节点

void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {int i;// 更新所有和被删除节点 x 有关的节点的指针,解除它们之间的关系for (i = 0; i < zsl->level; i++) {if (update[i]->level[i].forward == x) {// 在 level i update[i] 是要删除节点的前驱节点update[i]->level[i].span += x->level[i].span - 1;update[i]->level[i].forward = x->level[i].forward;} else {// x 不在 level i 的链表中update[i]->level[i].span -= 1;}}// 更新被删除节点 x 的前进和后退指针if (x->level[0].forward) {x->level[0].forward->backward = x->backward;} else {// 如果 x 是尾节点,需要更新跳表中的 tailzsl->tail = x->backward;}// 如果 x 是最高层链表中唯一节点,skiplist 的高度降低1while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)zsl->level--;// 跳跃表节点计数器减一zsl->length--;
}int zslDelete(zskiplist *zsl, double score, robj *obj) {zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;int i;// 遍历跳跃表,查找目标节点,并记录所有沿途节点x = zsl->header;for (i = zsl->level-1; i >= 0; i--) {while (x->level[i].forward &&(x->level[i].forward->score < score ||(x->level[i].forward->score == score &&compareStringObjects(x->level[i].forward->obj,obj) < 0)))// 沿着前进指针移动x = x->level[i].forward;// 记录沿途节点update[i] = x;}// 检查找到的元素 x ,只有在它的分值和对象都相同时,才将它删除。x = x->level[0].forward;if (x && score == x->score && equalStringObjects(x->obj,obj)) {zslDeleteNode(zsl, x, update);zslFreeNode(x);return 1;} else {return 0; /* not found */}
}

查找

要说 skiplist 比红黑树优越的一点,就是它查找第 N 个元素的时间复杂度是 O(logN),这一点对于 redis 提供的 zset 命令很重要。
因为 skiplist 有一个 head 元素,所以其元素 index 是从 1 开始的(0 对应的是 head)。它的 index 又叫做 rank,
这个我们在前面的插入例程已经有所接触了。

// 获取元素 rank,如果元素不存在,返回 0 
unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {zskiplistNode *x;unsigned long rank = 0;int i;// 遍历整个跳跃表x = zsl->header;for (i = zsl->level-1; i >= 0; i--) {// 遍历节点并对比元素while (x->level[i].forward &&(x->level[i].forward->score < score ||(x->level[i].forward->score == score &&compareStringObjects(x->level[i].forward->obj,o) <= 0))) {// 累积跨越的节点数量rank += x->level[i].span;// 沿着前进指针遍历跳跃表x = x->level[i].forward;}// 必须确保不仅分值相等,而且成员对象也要相等if (x->obj && equalStringObjects(x->obj,o)) {return rank;}}// 没找到return 0;
}// 根据 rank 查找元素,时间复杂度 O(logN)
zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {zskiplistNode *x;unsigned long traversed = 0;int i;x = zsl->header;for (i = zsl->level-1; i >= 0; i--) {// 遍历跳跃表并累积越过的节点数量while (x->level[i].forward && (traversed + x->level[i].span) <= rank){traversed += x->level[i].span;x = x->level[i].forward;}// 已经跨越了 rank 个节点,返回节点if (traversed == rank) {return x;}}// 没找到目标节点, 只存在 rank 太大的情况return NULL;
}

总结

  1. redis 使用 skiplist + dict 来实现 zset
  2. redis 选用 skiplist 的一个原因是 skiplist 可以在 O(logN) 内查找到指定 rank 的元素