通用技术 一文搞懂 Redis

京东云开发者 · 2023年02月07日 · 最后由 重来看雨 回复于 2023年02月07日 · 5956 次阅读

作者: 京东物流 刘丽侠 姚再毅 康睿 刘斌 李振

一、Redis 的特性

1.1 Redis 为什么快?

  • 基于内存操作,操作不需要跟磁盘交互,单次执行很快
  • 命令执行是单线程,因为是基于内存操作,单次执行的时间快于线程切换时间,同时通信采用多路复用
  • Redis 本身就是一个 k-v 结构,类似于 hashMap,所以查询性能接近于 O(1)
  • 同时 redis 自己底层数据结构支持,比如跳表、SDS
  • lO 多路复用,单个线程中通过记录跟踪每一个 sock(I/O 流)的状态来管理多个 I/O 流

1.2 Redis 其他特性

  • 更丰富的数据类型,虽然都是 k、v 结构,value 可以存储很多的数据类型
  • 完善的内存管理机制、保证数据一致性:持久化机制、过期策略
  • 支持多种编程语言
  • 高可用,集群、保证高可用

1.3 Redis 高可用

  • 很完善的内存管理机制,过期、淘汰、持久化
  • 集群模式,主从、哨兵、cluster 集群

二、Redis 数据类型以及使用场景

Redis 的数据类型有 String、Hash、Set、List、Zset、bitMap(基于 String 类型)、 Hyperloglog(基于 String 类型)、Geo(地理位置)、Streams 流。

2.1 String

2.1.1 基本指令

// 批量设置
> mset key1 value1 key2 value2
// 批量获取
> mget key1 key2
// 获取长度
> strlen key  
//  字符串追加内容
> append key xxx
// 获取指定区间的字符
> getrange key 0 5
// 整数值递增 (递增指定的值)
> incr intkey (incrby intkey 10)
// 当key 存在时将覆盖
> SETEX key seconds value
// 将 key 的值设为 value ,当且仅当 key 不存在。
> SETNX key value

2.1.2 应用场景

  1. 缓存相关场景 缓存、 token(跟过期属性完美契合)
  1. 线程安全的计数场景(软限流、分布式 ID 等)

2.2 Hash

2.2.1 基本指令

// 将哈希表 key 中的域 field 的值设为 value 。
> HSET key field value
// 返回哈希表 key 中给定域 field 的值。
>  HGET key field
// 返回哈希表 key 中的所有域。
> HKEYS key
// 返回哈希表 key 中所有域的值。
>  HVALS key
// 为哈希表 key 中的域 field 的值加上增量 increment 。
> HINCRBY key field increment
// 查看哈希表 key 中,给定域 field 是否存在。
> HEXISTS key field

2.2.2 应用场景

  1. 存储对象类的数据 (官网说的)比如一个对象下有多个字段
  1. 统计类的数据,可以对单个统计数据进行单独操作

2.3 List

存储有序的字符串列表,元素可以重复。列表的最大长度为 232 - 1 个元素(4294967295,每个列表超过 40 亿个元素)。

2.3.1 基本指令

// 将一个或多个值 value 插入到列表 key 的表头
> LPUSH key value [value ...]
// 将一个或多个值 value 插入到列表 key 的表尾(最右边)。
> RPUSH key value [value ...]
// 移除并返回列表 key 的头元素。
> LPOP key
// 移除并返回列表 key 的尾元素。
> RPOP key
// BLPOP 是列表的阻塞式(blocking)弹出原语。
> BLPOP key [key ...] timeout
// BRPOP 是列表的阻塞式(blocking)弹出原语。
> BRPOP key [key ...] timeout
// 获取指点位置元素
> LINDEX key index

2.3.2 应用场景

  1. 用户消息时间线

因为 list 是有序的,所以我们可以先进先出,先进后出的列表都可以做

2.4 Set

2.4.1 基本指令

// 将一个或多个 member 元素加入到集合 key 当中,已经存在于集合的 member 元素将被忽略。
> SADD key member [member ...]
// 返回集合 key 中的所有成员。
> SMEMBERS key
// 返回集合 key 的基数(集合中元素的数量)。
> SCARD key
// 如果命令执行时,只提供了 key 参数,那么返回集合中的一个随机元素。
> SRANDMEMBER key [count]
// 移除并返回集合中的一个随机元素。
> SPOP key
// 移除集合 key 中的一个或多个 member 元素,不存在的 member 元素会被忽略。
> SREM key member [member ...]
// 判断 member 元素是否集合 key 的成员。
> SISMEMBER key member
// 获取前一个集合有而后面1个集合没有的
> sdiff huihuiset huihuiset1
// 获取交集
> sinter huihuiset huihuiset1
// 获取并集
> sunion huihuiset huihuiset1

2.4.2 应用场景

  1. 抽奖 spop 跟 srandmember 随机弹出或者获取元素
  1. 点赞、签到等,sadd 集合存储
  1. 交集并集 关注等场景

2.5 ZSet(SortedSet)

sorted set,有序的 set,每个元素有个 score。

score 相同时,按照 key 的 ASCII 码排序。

2.5.1 基本指令

//将一个或多个 member 元素及其 score 值加入到有序集 key 当中。
> ZADD key score member [[score member] [score member] ...]
// 返回有序集 key 中,指定区间内的成员。其中成员的位置按 score 值递增(从小到大)来排序
> ZRANGE key start stop [WITHSCORES]
// 返回有序集 key 中,指定区间内的成员。其中成员的位置按 score 值递减(从大到小)来排列。
> ZREVRANGE key start stop [WITHSCORES]
// 返回有序集 key 中,所有 score 值介于 min 和 max 之间(包括等于 min 或 max )的成员。有序集成员按 score 值递增(从小到大)次序排列。
>  ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
// 移除有序集 key 中的一个或多个成员,不存在的成员将被忽略。
> ZREM key member [member ...]
// 返回有序集 key 的基数。
> ZCARD key
// 为有序集 key 的成员 member 的 score 值加上增量 increment 。
> ZINCRBY key increment member
// 返回有序集 key 中, score 值在 min 和 max 之间(默认包括 score 值等于 min 或 max )的成员的数量。
> ZCOUNT key min max
// 返回有序集 key 中成员 member 的排名。其中有序集成员按 score 值递增(从小到大)顺序排列。
> ZRANK key member

2.5.2 应用场景

  1. 排行榜

三、Redis 的事务

3.1 事务基本操作

// 1. multi命令开启事务,exec命令提交事务
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> set k1 v1
QUEUED
127.0.0.1:6379(TX)> set k2 v2
QUEUED
127.0.0.1:6379(TX)> exec
1) OK
2) OK
// 2. 组队的过程中可以通过discard来放弃组队。
 127.0.0.1:6379> multi
OK
 127.0.0.1:6379(TX)> set k3 v3
QUEUED
 127.0.0.1:6379(TX)> set k4 v4
QUEUED
 127.0.0.1:6379(TX)> discard
OK

3.2 Redis 事务特性

  1. 单独的隔离操作

事务中的所有命令都会序列化、按顺序地执行。

事务在执行过程中,不会被其他客户端发送来的命令请求所打断。

  1. 没有隔离级别的概念

队列中的命令没有提交之前,都不会被实际地执行,因为事务提交之前任何指令都不会被实际执行,也就不存在 “事务内的查询要看到事务里的更新,在事务外查询不能看到” 这个让人万分头疼的问题。

  1. 不保证原子性

Redis 同一个事务中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚。

四、Redis 分布式锁

4.1 Lua 实现分布式锁

local lockSet = redis.call('exists', KEYS[1])
if lockSet == 0 
    then
        redis.call('set', KEYS[1], ARG[1])
        redis.call('expire', KEYS[1], ARG[2])
    end
return lockSet

五、Redis 总体数据结构

5.1 Redis dict 字典

5.1.1 RedisDb 数据接口(server.h 文件)

数据最外层的结构为 RedisDb

typedef struct redisDb {
    dict *dict; /* The keyspace for this DB */ //数据库的键
    dict *expires; /* Timeout of keys with a timeout set */ //设置了超时时间的键
    dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ //客户端等待的keys
    dict *ready_keys; /* Blocked keys that received a PUSH */
    dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
    int id; /* Database ID */ //所在数 据库ID
    long long avg_ttl; /* Average TTL, just for tats */ //平均TTL,仅用于统计
    unsigned long expires_cursor; /* Cursor of the active expire cycle. */
    list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
} redisDb;

5.1.2 dict 数据结构(dict.h 文件)

我们发现数据存储在 dict 中

typedef struct dict {
    dictType *type; //理解为面向对象思想,为支持不同的数据类型对应dictType抽象方法,不同的数据类型可以不同实现
    void *privdata; //也可不同的数据类型相关,不同类型特定函数的可选参数。
    dictht ht[2]; //2个hash表,用来数据存储 2个dictht
    long rehashidx; /* rehashing not in progress if rehashidx == -1 */ // rehash标记 -1代表不再rehash
    unsigned long iterators; /* number of iterators currently running */
} dict;

5.1.3 dictht 结构(dict.h 文件)

typedef struct dictht {
    dictEntry **table; //dictEntry 数组
    unsigned long size; //数组大小 默认为4 #define DICT_HT_INITIAL_SIZE 4
    unsigned long sizemask; //size-1 用来取模得到数据的下标值
    unsigned long used; //改hash表中已有的节点数据
} dictht;

5.1.4 dictEntry 节点结构(dict.h 文件)

typedef struct dictEntry {
    void *key; //key
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v; //value
    struct dictEntry *next; //指向下一个节点
} dictEntry;

5.1.5 RedisObject(service.h 文件)

/* Objects encoding. Some kind of objects like Strings and
Hashes can be
* internally represented in multiple ways. The 'encoding'
field of the object
* is set to one of this fields for this object. */
#define OBJ_ENCODING_RAW 0 /* Raw representation */
#define OBJ_ENCODING_INT 1 /* Encoded as integer */
#define OBJ_ENCODING_HT 2 /* Encoded as hash table */
#define OBJ_ENCODING_ZIPMAP 3 /* Encoded as zipmap */
#define OBJ_ENCODING_LINKEDLIST 4 /* No longer used: old
list encoding. */
#define OBJ_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
#define OBJ_ENCODING_INTSET 6 /* Encoded as intset */
#define OBJ_ENCODING_SKIPLIST 7 /* Encoded as skiplist */
#define OBJ_ENCODING_EMBSTR 8 /* Embedded sds string
encoding */
#define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list
of ziplists */
#define OBJ_ENCODING_STREAM 10 /* Encoded as a radix tree
of listpacks */
#define LRU_BITS 24
#define LRU_CLOCK_MAX ((1<<LRU_BITS)-1) /* Max value of
obj->lru */
#define LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution
in ms */
#define OBJ_SHARED_REFCOUNT INT_MAX /* Global object
never destroyed. */
#define OBJ_STATIC_REFCOUNT (INT_MAX-1) /* Object
allocated in the stack. */
#define OBJ_FIRST_SPECIAL_REFCOUNT OBJ_STATIC_REFCOUNT
typedef struct redisObject {
unsigned type:4; //数据类型 string hash list
unsigned encoding:4; //底层的数据结构 跳表
unsigned lru:LRU_BITS; /* LRU time (relative to global
lru_clock) or
* LFU data (least significant
8 bits frequency
* and most significant 16 bits
access time). */
int refcount; //用于回收,引用计数法
void *ptr; //指向具体的数据结构的内存地址 比如 跳表、压缩列表
} robj;

5.2 Redis 数据结构图

2.jpg

5.3 Redis 扩容机制

因为我们的 dictEntry 数组默认大小是 4,如果不进行扩容,那么我们所有的数据都会以链表的形式添加至数组下标 随着数据量越来越大,之前只需要 hash 取模就能得到下标位置,现在得去循环我下标的链表,所以性能会越来越慢,当我们的数据量达到一定程度后,我们就得去触发扩容操作。

5.3.1 什么时候扩容

5.3.1.1 扩容机制

  1. 当没有 fork 子进程在进行 RDB 或 AOF 持久化时,ht[0] 的 used 大于 size 时,触发扩容
  1. 如果有子进程在进行 RDB 或者 AOF 时,ht[0] 的 used 大于等于 size 的 5 倍的时候,会触发扩容

5.3.1.2 源码验证

扩容,肯定是在添加数据的时候才会扩容,所以我们找一个添加数据的入口。

  1. 入口,添加或替换 dictReplace (dict.c 文件)
int dictReplace(dict *d, void *key, void *val) {
    dictEntry *entry, *existing, auxentry;
    /* Try to add the element. If the key
    * does not exists dictAdd will succeed. */
    entry = dictAddRaw(d,key,&existing);
    if (entry) {
        dictSetVal(d, entry, val);
        return 1;
    }
    /* Set the new value and free the old one. Note that
    it is important
    * to do that in this order, as the value may just be
    exactly the same
    * as the previous one. In this context, think to
    reference counting,
    * you want to increment (set), and then decrement
    (free), and not the
    * reverse. */
    auxentry = *existing;
    dictSetVal(d, existing, val);
    dictFreeVal(d, &auxentry);
    return 0;
}
  1. 进入 dictAddRaw 方法(dict.c 文件)
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing){
    long index;
    dictEntry *entry;
    dictht *ht;

    if (dictIsRehashing(d)) _dictRehashStep(d); //如果正在Rehash 进行渐进式hash 按步rehash

    /* Get the index of the new element, or -1 if
    * the element already exists. */
    if ((index = _dictKeyIndex(d, key, dictHashKey(d,key),existing)) == -1)
        return NULL;

    /* Allocate the memory and store the new entry.
    * Insert the element in top, with the assumption that
    in a database
    * system it is more likely that recently added
    entries are accessed
    * more frequently. */
    //如果在hash 放在ht[1] 否则放在ht[0]
    ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
    entry = zmalloc(sizeof(*entry));
    //采用头插法插入hash桶
    entry->next = ht->table[index];
    ht->table[index] = entry;
    //已使用++
    ht->used++;

    /* Set the hash entry fields. */
    dictSetKey(d, entry, key);
    return entry;
}
  1. 得到数据下标的方法 _dictKeyIndex (dict.c 文件)
static long _dictKeyIndex(dict *d, const void *key,uint64_t hash, dictEntry **existing){
    unsigned long idx, table;
    dictEntry *he;
    if (existing) *existing = NULL;

    /* Expand the hash table if needed */
    //扩容如果需要
    if (_dictExpandIfNeeded(d) == DICT_ERR)
        return -1;
    //比较是否存在这个值
    for (table = 0; table <= 1; table++) {
        idx = hash & d->ht[table].sizemask;
        /* Search if this slot does not already contain
        the given key */
        he = d->ht[table].table[idx];
        while(he) {
            if (key==he->key || dictCompareKeys(d, key,he->key)) {
                if (existing) *existing = he;
                    return -1;
                }
            he = he->next;
        }
        if (!dictIsRehashing(d)) break;
    }
    return idx;
}
  1. _dictExpandIfNeeded 验证是否需要扩容(dict.c 文件)
/* Expand the hash table if needed */
static int _dictExpandIfNeeded(dict *d){
    /* Incremental rehashing already in progress. Return.*/
    if (dictIsRehashing(d)) return DICT_OK; //正在rehash,返回

    /* If the hash table is empty expand it to the initialsize. */
    //如果ht[0]的长度为0,设置默认长度4 dictExpand为扩容的关键方法
    if (d->ht[0].size == 0) 
            return dictExpand(d,DICT_HT_INITIAL_SIZE);

    //扩容条件 hash表中已使用的数量大于等于 hash表的大小
    //并且dict_can_resize为1 或者 已使用的大小大于hash表大小的 5倍,大于等于1倍的时候,下面2个满足一个条件即可
    if (d->ht[0].used >= d->ht[0].size &&
        (dict_can_resize || d->ht[0].used/d->ht[0].size >dict_force_resize_ratio)){
        //扩容成原来的2倍
        return dictExpand(d, d->ht[0].used*2);
    }
    return DICT_OK;
}

5.3.2 怎么扩容

5.3.2.1 扩容步骤

  1. 当满足我扩容条件,触发扩容时,判断是否在扩容,如果在扩容,或者 扩容的大小跟我现在的 ht[0].size 一样,这次扩容不做
  1. new 一个新的 dictht,大小为 ht[0].used * 2(但是必须向上 2 的幂,比 如 6 ,那么大小为 8),并且 ht[1]=新创建的 dictht
  1. 我们有个更大的 table 了,但是需要把数据迁移到 ht[1].table ,所以将 dict 的 rehashidx(数据迁移的偏移量)赋值为 0 ,代表可以进行数据迁 移了,也就是可以 rehash 了
  1. 等待数据迁移完成,数据不会马上迁移,而是采用渐进式 rehash,慢慢的把数据迁移到 ht[1]
  1. 当数据迁移完成,ht[0].table=ht[1] ,ht[1] .table = NULL、ht[1] .size = 0、ht[1] .sizemask = 0、 ht[1] .used = 0
  1. 把 dict 的 rehashidex=-1

5.3.2.2 源码验证

  1. dictExpand 方法在_dictExpandIfNeeded 方法中调用 (dict.c 文件),为扩容的关键方法
int dictExpand(dict *d, unsigned long size){
    /* the size is invalid if it is smaller than the
    number of
    * elements already inside the hash table */
    //正在扩容,不允许第二次扩容,或者ht[0]的数据量大于扩容后的数量,没有意义,这次不扩容
    if (dictIsRehashing(d) || d->ht[0].used > size)
        return DICT_ERR;

    dictht n; /* the new hash table */
    //得到最接近2的幂 跟hashMap思想一样
    unsigned long realsize = _dictNextPower(size);

    /* Rehashing to the same table size is not useful. */
    //如果跟ht[0]的大小一致 直接返回错误
    if (realsize == d->ht[0].size) return DICT_ERR;

    /* Allocate the new hash table and initialize all
    pointers to NULL */
    //为新的dictht赋值
    n.size = realsize;
    n.sizemask = realsize-1;
    n.table = zcalloc(realsize*sizeof(dictEntry*));
    n.used = 0;

    /* Is this the first initialization? If so it's not
    really a rehashing
    * we just set the first hash table so that it can
    accept keys. */
    //ht[0].table为空,代表是初始化
    if (d->ht[0].table == NULL) {
        d->ht[0] = n;
        return DICT_OK;
    }

    /* Prepare a second hash table for incremental rehashing */
    //将扩容后的dictht赋值给ht[1]
    d->ht[1] = n;
    //标记为0,代表可以开始rehash
    d->rehashidx = 0;
    return DICT_OK;
}

5.3.3 数据怎么迁移(渐进式 hash)

5.3.3.1 迁移方式

假如一次性把数据全部迁移会很耗时间,会让单条指令等待很久,会形成阻塞。

所以,redis 采用渐进式 Rehash,所谓渐进式,就是慢慢的,不会一次性把所有数据迁移。

那什么时候会进行渐进式数据迁移?

  1. 每次进行 key 的 crud 操作都会进行一个 hash 桶的数据迁移
  1. 定时任务,进行部分数据迁移

5.3.3.2 源码验证

CRUD 触发都会触发_dictRehashStep(渐进式 hash)
  1. 每次增删改的时候都会调用_dictRehashStep 方法
if (dictIsRehashing(d)) _dictRehashStep(d); //这个代码在增删改查的方法中都会调用
  1. _dictRehashStep 方法
static void _dictRehashStep(dict *d) {
    if (d->iterators == 0) dictRehash(d,1);
}
  1. dictRehash 方法
int dictRehash(dict *d, int n) {
    int empty_visits = n*10; /* Max number of empty
    buckets to visit. */ //要访问的最大的空桶数 防止此次耗时过长

    if (!dictIsRehashing(d)) return 0; //如果没有在rehash返回0

    //循环,最多1次,并且ht[0]的数据没有迁移完 进入循环
    while(n-- && d->ht[0].used != 0) {
        dictEntry *de, *nextde;
        /* Note that rehashidx can't overflow as we are
        sure there are more
        * elements because ht[0].used != 0 */
        assert(d->ht[0].size > (unsigned long)d->rehashidx);
        //rehashidx rehash的索引,默认0 如果hash桶为空 进入自旋 并且判断是否到了最大的访问空桶数
        while(d->ht[0].table[d->rehashidx] == NULL) {
            d->rehashidx++;
            if (--empty_visits == 0) return 1;
        }
        de = d->ht[0].table[d->rehashidx]; //得到ht[0]的下标为rehashidx桶
        /* Move all the keys in this bucket from the old
        to the new hash HT */
        while(de) { //循环hash桶的链表 并且转移到ht[1]
            uint64_t h;
            nextde = de->next;
            /* Get the index in the new hash table */
            h = dictHashKey(d, de->key) & d-
            >ht[1].sizemask;
            de->next = d->ht[1].table[h]; //头插
            d->ht[1].table[h] = de;
            d->ht[0].used--;
            d->ht[1].used++;
            de = nextde;
        }
        //转移完后 将ht[0]相对的hash桶设置为null
        d->ht[0].table[d->rehashidx] = NULL;
        d->rehashidx++;
    }
    /* Check if we already rehashed the whole table... */
    //ht[0].used=0 代表rehash全部完成
    if (d->ht[0].used == 0) {
        //清空ht[0]table
        zfree(d->ht[0].table);
        //将ht[0] 重新指向已经完成rehash的ht[1]
        d->ht[0] = d->ht[1];
        //将ht[1]所有数据重新设置
        _dictReset(&d->ht[1]);
        //设置-1,代表rehash完成
        d->rehashidx = -1;
        return 0;
    }
    /* More to rehash... */
    return 1;
}

//将ht[1]的属性复位
static void _dictReset(dictht *ht)
{
    ht->table = NULL;
    ht->size = 0;
    ht->sizemask = 0;
    ht->used = 0;
}
定时任务触发

再讲定时任务之前,我们要知道 Redis 中有个时间事件驱动,在 server.c 文件下有个 serverCron 方法。

serverCron 每隔多久会执行一次,执行频率根据 redis.conf 中的 hz 配置,serverCorn 中有个方法 databasesCron()

  1. 定时方法 serverCron
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    //.......
    /* We need to do a few operations on clients
    asynchronously. */
    clientsCron();
    /* Handle background operations on Redis databases. */
    databasesCron(); //处理Redis数据库上的后台操作。
    //.......
}
  1. databasesCron 方法
void databasesCron(void) {
    /* Expire keys by random sampling. Not required for
    slaves
    * as master will synthesize DELs for us. */
    if (server.active_expire_enabled) {
        if (iAmMaster()) {
            activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
        } else {
            expireSlaveKeys();
        }
    }
    /* Defrag keys gradually. */
    activeDefragCycle();
    /* Perform hash tables rehashing if needed, but only
    if there are no
    * other processes saving the DB on disk. Otherwise
    rehashing is bad
    * as will cause a lot of copy-on-write of memory
    pages. */
    if (!hasActiveChildProcess()) {
        /* We use global counters so if we stop the
        computation at a given
        * DB we'll be able to start from the successive
        in the next
        * cron loop iteration. */
        static unsigned int resize_db = 0;
        static unsigned int rehash_db = 0;
        int dbs_per_call = CRON_DBS_PER_CALL;
        int j;
        /* Don't test more DBs than we have. */
        if (dbs_per_call > server.dbnum) 
            dbs_per_call = server.dbnum;
        /* Resize */
        for (j = 0; j < dbs_per_call; j++) {
            tryResizeHashTables(resize_db % server.dbnum);
            resize_db++;
        }
        /* Rehash */ //Rehash逻辑
        if (server.activerehashing) {
            for (j = 0; j < dbs_per_call; j++) {
                //进入incrementallyRehash
                int work_done = incrementallyRehash(rehash_db);
                if (work_done) {
                    /* If the function did some work, stop
                        here, we'll do
                        * more at the next cron loop. */
                    break;
                } else {
                    /* If this db didn't need rehash,
                        we'll try the next one. */
                    rehash_db++;
                    rehash_db %= server.dbnum;
                }
            }
        }
    }
} 
  1. 进入 incrementallyRehash 方法(server.c)
int incrementallyRehash(int dbid) {
    /* Keys dictionary */
    if (dictIsRehashing(server.db[dbid].dict)) {
        dictRehashMilliseconds(server.db[dbid].dict,1);
        return 1; 
        /* already used our millisecond for this
        loop... */
    }
    /* Expires */
    if (dictIsRehashing(server.db[dbid].expires)) {
        dictRehashMilliseconds(server.db[dbid].expires,1);
        return 1; /* already used our millisecond for this
        loop... */
    }
    return 0;
}
  1. 进入 dictRehashMilliseconds(dict.c)
int dictRehashMilliseconds(dict *d, int ms) {
    long long start = timeInMilliseconds();
    int rehashes = 0;
    //进入rehash方法 dictRehash 去完成渐进时HASH
    while(dictRehash(d,100)) {
        rehashes += 100;
        //判断是否超时
        if (timeInMilliseconds()-start > ms) break;
    }
    return rehashes;
}

5.4 String 类型数据结构

Redis 中 String 的底层没有用 c 的 char 来实现,而是采用了 SDS(simple Dynamic String)的数据结构来实现。并且提供了 5 种不同的类型

5.4.1 SDS 数据结构定义(sds.h 文件)

typedef char *sds;
/* Note: sdshdr5 is never used, we just access the flags
    byte directly.
    * However is here to document the layout of type 5 SDS
strings. */
struct __attribute__ ((__packed__)) sdshdr5 {
    unsigned char flags; /* 3 lsb of type, and 5 msb of
        string length */
    char buf[];
}

struct __attribute__ ((__packed__)) sdshdr8 {
    uint8_t len;  /* used */ //已使用的长度
    uint8_t alloc; /* excluding the header and null terminator */ //分配的总容量 不包含头和空终止符
    unsigned char flags; /* 3 lsb of type, 5 unused bits */ //低三位类型 高5bit未使用
    char buf[]; //数据buf 存储字符串数组
};

struct __attribute__ ((__packed__)) sdshdr16 {
    uint16_t len; /* used */
    uint16_t alloc; /* excluding the header and null
    terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits
    */
    char buf[];
};

struct __attribute__ ((__packed__)) sdshdr32 {
    uint32_t len; /* used */
    uint32_t alloc; /* excluding the header and null
    terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits
    */
    char buf[];
};

struct __attribute__ ((__packed__)) sdshdr64 {
    uint64_t len; /* used */
    uint64_t alloc; /* excluding the header and null
    terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits
    */
    char buf[];
};

5.4.2 SDS 数据结构的优势

  1. char 字符串不记录自身长度,所以获取一个字符串长度的复杂度是 O(n),但是 SDS 记录分配的长度 alloc,已使用的长度 len,获取长度为 O(1)
  1. 减少修改字符串带来的内存重分配次数
  1. 空间预分配,SDS 长度如果小于 1MB,预分配跟长度一样的,大于 1M,每次跟 len 的大小多 1M
  1. 惰性空间释放,截取的时候不马上释放空间,供下次使用!同时提供相应的释放 SDS 未使用空间的 API
  1. 二进制安全

5.5 Hash 类型数据结构

Redis 的字典相当于 Java 语言中的 HashMap,他是无序字典。内部结构上同样是数组 + 链表二维结构。第一维 hash 的数组位置碰撞时,就会将碰撞的元素使用链表串起来。

5.5.1 Hash 数据结构图

3.png

5.5.2 Hash 的压缩列表

压缩列表会根据存入的数据的不同类型以及不同大小,分配不同大小的空间。所以是为了节省内存而采用的。

因为是一块完整的内存空间,当里面的元素发生变更时,会产生连锁更新,严重影响我们的访问性能。所以,只适用于数据量比较小的场景。

所以,Redis 会有相关配置,Hashes 只有小数据量的时候才会用到 ziplist,当 hash 对象同时满足以下两个条件的时候,使用 ziplist 编码:

  1. 哈希对象保存的键值对数量<512 个
  1. 所有的键值对的键和值的字符串长度都<64byte(一个英文字母一个字节)
hash-max-ziplist-value 64 // ziplist中最大能存放的值长度
hash-max-ziplist-entries 512 // ziplist中最多能存放的entry节点数量

5.6 List 类型数据结构

5.6.1 quickList 快速列表

兼顾了 ziplist 的节省内存,并且一定程度上解决了连锁更新的问题,我们的 quicklistNode 节点里面是个 ziplist,每个节点是分开的。那么就算发生了连锁更新,也只会发生在一个 quicklistNode 节点。

quicklist.h 文件

typedef struct{
    struct quicklistNode *prev; //前指针
    struct quicklistNode *next; //后指针
    unsigned char *zl; //数据指针 指向ziplist结果
    unsigned int sz; //ziplist大小 /* ziplist
    size in bytes */
    unsigned int count : 16; /* count of items in ziplist */ //ziplist的元素
    unsigned int encoding : 2; /* RAW==1 or LZF==2 */ //
    是否压缩, 1没有压缩 2 lzf压缩
    unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */ //预留容器字段
    unsigned int recompress : 1; /* was this node previous compressed? */
    unsigned int attempted_compress : 1; /* node can't compress; too small */
    unsigned int extra : 10; /* more bits to steal for  future usage */ //预留字段
} quicklistNode;

5.6.2 list 数据结构图

4.png

5.7 Set 类型数据结构

Redis 用 intset 或 hashtable 存储 set。满足下面条件,就用 inset 存储。

  1. 如果不是整数类型,就用 dictht hash 表(数组 + 链表)
  1. 如果元素个数超过 512 个,也会用 hashtable 存储。跟一个配置有关:
set-max-intset-entries 512

不满足上述条件的,都使用 hash 表存储,value 存 null。

5.8 ZSet 数据结构

5.8.1 ZipList

默认使用 ziplist 编码。

在 ziplist 的内部,按照 score 排序递增来存储。插入的时候要移动之后的数据。

如果元素数量大于等于 128 个,或者任一 member 长度大于等于 64 字节使用 skiplist+dict 存储。

zset-max-ziplist-entries 128
zset-max-ziplist-value 64

如果不满足条件,采用跳表。

5.8.2 跳表(skiplist)

结构定义(servier.h)

/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
 sds ele; //sds数据
 double score; //score
 struct zskiplistNode *backward; //后退指针
 //层级数组
 struct zskiplistLevel {
     struct zskiplistNode *forward; //前进指针
     unsigned long span; //跨度
 } level[];
} zskiplistNode;

//跳表列表
typedef struct zskiplist {
 struct zskiplistNode *header, *tail; //头尾节点
 unsigned long length; //节点数量
 int level; //最大的节点层级
} zskiplist;

zslInsert 添加节点方法(t_zset.c)

zskiplistNode *zslInsert(zskiplist *zsl, double score, sds
ele) {
 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
 unsigned int rank[ZSKIPLIST_MAXLEVEL];
 int i, level;
 serverAssert(!isnan(score));
 x = zsl->header;
 for (i = zsl->level-1; i >= 0; i--) {
     /* store rank that is crossed to reach the insert
     position */
     rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
     while (x->level[i].forward &&
         (x->level[i].forward->score < score ||
         咕泡出品 必属精品精品属精品 咕泡出品 必属精品 咕泡出品 必属精品 咕泡咕泡
         (x->level[i].forward->score == score
         &&
         sdscmp(x->level[i].forward->ele,ele) <
         0)))
         {
         rank[i] += x->level[i].span;
         x = x->level[i].forward;
     }
     update[i] = x;
 }
 /* we assume the element is not already inside, since
 we allow duplicated
    * scores, reinserting the same element should never
    happen since the
    * caller of zslInsert() should test in the hash table
    if the element is
    * already inside or not. */
    level = zslRandomLevel();
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }
    x = zslCreateNode(level,score,ele);
    //不同层级建立链表联系
    for (i = 0; i < level; i++) {
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;
        /* update span covered by update[i] as x is
        inserted here */
        x->level[i].span = update[i]->level[i].span -
        (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) +
        1;
    }
    /* increment span for untouched levels */
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }
    x->backward = (update[0] == zsl->header) ? NULL :
    update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
        zsl->length++;
    return x;
}

ZSKIPLIST_MAXLEVEL 默认 32 定义在 server.h

#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^64
elements */

六、Redis 过期策略

6.1 惰性过期

所谓惰性过期,就是在每次访问操作 key 的时候,判断这个 key 是不是过期了,过期了就删除。

6.1.1 源码验证

  1. expireIfNeeded 方法(db.c 文件)
int expireIfNeeded(redisDb *db, robj *key) {
    if (!keyIsExpired(db,key)) return 0;
    /* If we are running in the context of a slave,
    instead of
    * evicting the expired key from the database, we
    return ASAP:
    * the slave key expiration is controlled by the
    master that will
    * send us synthesized DEL operations for expired
    keys.
    *
    * Still we try to return the right information to the
    caller,
    * that is, 0 if we think the key should be still
    valid, 1 if
    * we think the key is expired at this time. */
    // 如果配置有masterhost,说明是从节点,那么不操作删除
    if (server.masterhost != NULL) return 1;
    /* Delete the key */
    server.stat_expiredkeys++;
    propagateExpire(db,key,server.lazyfree_lazy_expire);
    notifyKeyspaceEvent(NOTIFY_EXPIRED,
    "expired",key,db->id);
    //是否是异步删除 防止单个Key的数据量很大 阻塞主线程 是4.0之后添加的新功能,默认关闭
    int retval = server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) : dbSyncDelete(db,key);
    if (retval) signalModifiedKey(NULL,db,key);
    return retval;
}

每次调用到相关指令时,才会执行 expireIfNeeded 判断是否过期,平时不会去判断是否过期。

该策略可以最大化的节省 CPU 资源。

但是却对内存非常不友好。因为如果没有再次访问,就可能一直堆积再内存中。占用内存

所以就需要另一种策略来配合使用,就是定期过期

6.2 定期过期

那么多久去清除一次,我们在讲 Rehash 的时候,有个方法是 serverCron,执行频率根据 redis.conf 中的 hz 配置。

这方法除了做 Rehash 以外,还会做很多其他的事情,比如:

  1. 清理数据库中的过期键值对
  1. 更新服务器的各类统计信息,比如时间、内存占用、数据库占用情况等
  1. 关闭和清理连接失效的客户端
  1. 尝试进行持久化操作

6.2.1 实现流程

  1. 定时 serverCron 方法去执行清理,执行频率根据 redis.cong 中的 hz 配置的值(默认是 10,也就是 1s 执行 10 次,100ms 执行一次)
  1. 执行清理的时候,不是去扫描所有的 key,而是去扫描所有设置了过期时间的 key redisDB.expires
  1. 如果每次去把所有的 key 都拿出来,那么假如过期的 key 很多,就会很慢,所以也不是一次性拿出所有的 key
  1. 根据 hash 桶的维度去扫描 key,扫到 20(可配置) 个 key 为止。假如第一个桶是 15 个 key 没有满足 20,继续扫描第二个桶,第二个桶 20 个 key,由于是以 hash 桶的维度扫描的,所以第二个扫到了就会全扫,总共扫描 35 个 key
  1. 找到扫描的 key 里面过期的 key,并进行删除
  1. 如果取了 400 个空桶(最多拿 400 个桶),或者扫描的删除比例跟扫描的总数超过 10%,继续执行 4、5 步
  1. 也不能无限的循环,循环 16 次后回去检测时间,超过指定时间会跳出

6.2.2 源码验证

  1. 入口 serverCrom
// serverCron方法调用databasesCron方法(server.c)
/* Handle background operations on Redis databases. */
databasesCron();

void databasesCron(void) {
    /* Expire keys by random sampling. Not required for
    slaves
    * as master will synthesize DELs for us. */
    if (server.active_expire_enabled) {
        if (iAmMaster()) {
            activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
        //过期删除方法
        } else {
            expireSlaveKeys();
        }
    }
}

七、Redis 淘汰策略

由于 Redis 内存是有大小的,并且我可能里面的数据都没有过期,当快满的时候,我又没有过期的数据进行淘汰,那么这个时候内存也会满。内存满了,Redis 也会不能放入新的数据。所以,我们不得已需要一些策略来解决这个问题来保证可用性。

7.1 淘汰策略

7.1.1 noeviction

New values aren’t saved when memory limit is reached. When a database uses replication, this applies to the primary database.

默认,不淘汰 能读不能写

7.1.2 allkeys-lru

Keeps most recently used keys; removes least recently used (LRU) keys

基于伪 LRU 算法,在所有的 key 中去淘汰

7.1.3 allkeys-lfu

Keeps frequently used keys; removes least frequently used (LFU) keys.

基于伪 LFU 算法,在所有的 key 中去淘汰

7.1.4 volatile-lru

Removes least recently used keys with the expire field set to true.

基于伪 LRU 算法,在设置了过期时间的 key 中去淘汰

7.1.5 volatile-lfu

Removes least frequently used keys with the expire field set to true.

基于伪 LFU 算法 在设置了过期时间的 key 中去淘汰

7.1.6 allkeys-random

Randomly removes keys to make space for the new data added.

基于随机算法,在所有的 key 中去淘汰

7.1.7 volatile-random

Randomly removes keys with expire field set to true.

基于随机算法 在设置了过期时间的 key 中去淘汰

7.1.8 volatile-ttl

Removes least frequently used keys with expire field set to true and the shortest remaining time-to-live (TTL) value.

根据过期时间来,淘汰即将过期的

7.2 淘汰流程

  1. 首先,我们会有个淘汰池,默认大小是 16,并且里面的数据是末尾淘汰制
  1. 每次指令操作的时候,自旋会判断当前内存是否满足指令所需要的内存
  1. 如果当前不满足,会从淘汰池的尾部拿一个最适合淘汰的数据
  1. 会取样(配置 maxmemory-samples),从 Redis 中获取随机获取到取样的数据,解决一次性读取所有的数据慢的问题
  1. 在取样的数据中,根据淘汰算法找到最适合淘汰的数据
  1. 将最合适的那个数据跟淘汰池中的数据进行比较,是否比淘汰池的数据更适合淘汰,如果更适合,放入淘汰池
  1. 按照适合的程度进行排序,最适合淘汰的放入尾部
  1. 将需要淘汰的数据从 Redis 删除,并且从淘汰池移除

7.3 LRU 算法

Lru,Least Recently Used 翻译过来是最久未使用,根据时间轴来走,仍很久没用的数据。只要最近有用过,我就默认是有效的。

那么它的一个衡量标准是啥?时间!根据使用时间,从近到远,越远的越容易淘汰。

7.3.1 实现原理

需求:得到对象隔多久没访问,隔的时间越久,越容易被淘汰

  1. 首先,LRU 是根据这个对象的访问操作时间来进行淘汰的,那么我们需要知道这个对象最后操作的访问时间
  1. 知道了对象的最后操作访问时间后,我们只需要跟当前系统时间来进行对比,就能算出对象多久没访问了

7.3.2 源码验证

redis 中,对象都会被 redisObject 对象包装,其中有个字段叫 lru。

redisObject 对象(server.h 文件)

typedef struct redisObject {
 unsigned type:4;
 unsigned encoding:4;
 unsigned lru:LRU_BITS; /* LRU time (relative to global
 lru_clock) or
 * LFU data (least significant 8 bits frequency
 * and most significant 16 bits access time). */
 int refcount;
 void *ptr;
} robj;

看 LRU 的字段的说明,那么我们大概能猜出来,redis 去实现 lru 淘汰算法肯定跟我们这个对象的 lru 相关

并且这个字段的大小为 24bit,那么这个字段记录的是对象操作访问时候的秒单位时间的后 24bit

相当于:

long timeMillis=System.currentTimeMillis();
System.out.println(timeMillis/1000); //获取当前秒
System.out.println(timeMillis/1000 & ((1<<24)-1)); //获取秒的后24位

我们知道了这个对象的最后操作访问的时间。如果我们要得到这个对象多久没访问了,我们是不是就很简单,用我当前的时间 - 这个对象的访问时间就可以了!但是这个访问时间是秒单位时间的后 24bit 所以,也是用当前的时间的秒单位的后 24bit 去减。

estimateObjectIdleTime 方法 (evict.c)

unsigned long long estimateObjectIdleTime(robj *o) {
 //获取秒单位时间的最后24位
 unsigned long long lruclock = LRU_CLOCK();
 //因为只有24位,所有最大的值为2的24次方-1
 //超过最大值从0开始,所以需要判断lruclock(当前系统时间)跟缓存对象的lru字段的大小
 if (lruclock >= o->lru) {
     //如果lruclock>=robj.lru,返回lruclock-o->lru,再转换单位
     return (lruclock - o->lru) * LRU_CLOCK_RESOLUTION;
 } else {
     //否则采用lruclock + (LRU_CLOCK_MAX - o->lru),得到对
     象的值越小,返回的值越大,越大越容易被淘汰
     return (lruclock + (LRU_CLOCK_MAX - o->lru)) *
     LRU_CLOCK_RESOLUTION;
 }
}

7.3.3 LRU 总结

用 lruclock 与 redisObject.lru 进行比较,因为 lruclock 只获取了当前秒单位时间的后 24 位,所以肯定有个轮询。

所以,我们会用 lruclock 跟 redisObject.lru 进行比较,如果 lruclock>redisObject.lru,那么我们用 lruclock- redisObject.lru,否则 lruclock+(24bit 的最大值-redisObject.lru),得到 lru 越小,那么返回的数据越大,相差越大的越优先会被淘汰!

https://www.processon.com/view/link/5fbe0541e401fd2d6ed8fc38

7.4 LFU 算法

LFU,英文 Least Frequently Used,翻译成中文就是最不常用的优先淘汰。

不常用,它的衡量标准就是次数,次数越少的越容易被淘汰。

这个实现起来应该也很简单,对象被操作访问的时候,去记录次数,每次操作访问一次,就 +1;淘汰的时候,直接去比较这个次数,次数越少的越容易淘汰!

7.4.1 LFU 的时效性问题

何为时效性问题?就是只会去考虑数量,但是不会去考虑时间。

ps:去年的某个新闻很火,点击量 3000W,今年有个新闻刚出来点击量 100 次,本来我们是想保留今年的新的新闻的,但是如果根据 LFU 来做的话,我们发现淘汰的是今年的新闻,明显是不合理的

导致的问题:新的数据进不去,旧的数据出不来。

那么如何解决呢,且往下看

7.4.2 源码分析

我们还是来看 redisObject(server.h)

typedef struct redisObject {
    unsigned type:4;
    unsigned encoding:4;
    unsigned lru:LRU_BITS; /* LRU time (relative to global
    lru_clock) or
    * LFU data (least significant 8 bits frequency
    * and most significant 16 bits access time). */
    int refcount;
    void *ptr;
} robj;

我们看它的 lru,它如果在 LFU 算法的时候!它前面 16 位代表的是时间,后 8 位代表的是一个数值,frequency 是频率,应该就是代表这个对象的访问次数,我们先给它叫做 counter。

前 16bits 时间有啥用?

跟时间相关,我猜想应该是跟解决时效性相关的,那么怎么解决的?从生活中找例子

大家应该充过一些会员,比如我这个年纪的,小时候喜欢充腾讯的黄钻、绿钻、蓝钻等等。但是有一个点,假如哪天我没充钱了的话,或者没有续 VIP 的时候,我这个钻石等级会随着时间的流失而降低。比如我本来是黄钻 V6,但是一年不充钱的话,可能就变成了 V4。

那么回到 Redis,大胆的去猜测,那么这个时间是不是去记录这个对象有多久没访问了,如果多久没访问,我就去减少对应的次数。

LFUDecrAndReturn 方法(evict.c)

unsigned long LFUDecrAndReturn(robj *o) {
    //lru字段右移8位,得到前面16位的时间
    unsigned long ldt = o->lru >> 8;
    //lru字段与255进行&运算(255代表8位的最大值),
    //得到8位counter值
    unsigned long counter = o->lru & 255;
    //如果配置了lfu_decay_time,用LFUTimeElapsed(ldt) 除以配置的值
    //LFUTimeElapsed(ldt)源码见下
    //总的没访问的分钟时间/配置值,得到每分钟没访问衰减多少
    unsigned long num_periods = server.lfu_decay_time ?LFUTimeElapsed(ldt) / server.lfu_decay_time : 0;
    if (num_periods)
    //不能减少为负数,非负数用couter值减去衰减值
    counter = (num_periods > counter) ? 0 : counter - num_periods;
    return counter;
}

衰减因子由配置

lfu-decay-time 1 //多少分钟没操作访问就去衰减一次

后 8bits 的次数,最大值是 255,够不够?

肯定不够,一个对象的访问操作次数肯定不止 255 次。

但是我们可以让数据达到 255 的很难。那么怎么做的?这个比较简单,我们先看源码,然后再总结

LFULogIncr 方法(evict.c 文件)

uint8_t LFULogIncr(uint8_t counter) {
    //如果已经到最大值255,返回255 ,8位的最大值
    if (counter == 255) return 255;
    //得到随机数(0-1)
    double r = (double)rand()/RAND_MAX;
    //LFU_INIT_VAL表示基数值(在server.h配置)
    double baseval = counter - LFU_INIT_VAL;
    //如果达不到基数值,表示快不行了,baseval =0
    if (baseval < 0) baseval = 0;
    //如果快不行了,肯定给他加counter
    //不然,按照几率是否加counter,同时跟baseval与lfu_log_factor相关
    //都是在分子,所以2个值越大,加counter几率越小
    double p = 1.0/(baseval*server.lfu_log_factor+1);
    if (r < p) counter++;
    return counter;
}

八、Redis 持久化

8.1 RBD

8.1.1 何时触发 RBD

自动触发

  1. 配置触发
save 900 1 900s检查一次,至少有1个key被修改就触发
save 300 10 300s检查一次,至少有10个key被修改就触发
save 60 10000 60s检查一次,至少有10000个key被修改
  1. shutdown 正常关闭

任何组件在正常关闭的时候,都会去完成应该完成的事。比如 Mysql 中的 Redolog 持久化,正常关闭的时候也会去持久化。

  1. flushall 指令触发

数据清空指令会触发 RDB 操作,并且是触发一个空的 RDB 文件,所以, 如果在没有开启其他的持久化的时候,flushall 是可以删库跑路的,在生产环境慎用。

8.1.2 RDB 的优劣

优势

  1. 是个非常紧凑型的文件,非常适合备份与灾难恢复
  1. 最大限度的提升了性能,会 fork 一个子进程,父进程永远不会产于磁盘 IO 或者类似操作
  1. 更快的重启

不足

  1. 数据安全性不是很高,因为是根据配置的时间来备份,假如每 5 分钟备份 一次,也会有 5 分钟数据的丢失
  1. 经常 fork 子进程,所以比较耗 CPU,对 CPU 不是很友好

8.2 AOF

由于 RDB 的数据可靠性非常低,所以 Redis 又提供了另外一种持久化方案: Append Only File 简称:AOF

AOF 默认是关闭的,你可以在配置文件中进行开启:

appendonly no //默认关闭,可以进行开启
# The name of the append only file (default:"appendonly.aof")
appendfilename "appendonly.aof" //AOF文件名

追加文件,即每次更改的命令都会附加到我的 AOF 文件中。

8.2.1 同步机制

AOF 会记录每个写操作,那么问题来了。我难道每次操作命令都要和磁盘交互?

当然不行,所以 redis 支持几种策略,由用户来决定要不要每次都和磁盘交互

# appendfsync always 表示每次写入都执行fsync(刷新)函数 性能会非常非常慢 但是非常安全
appendfsync everysec 每秒执行一次fsync函数 可能丢失1s的数据
# appendfsync no 由操作系统保证数据同步到磁盘,速度最快 你的数据只需要交给操作系统就行

默认 1s 一次,最多有 1s 丢失

8.2.2 重写机制

由于 AOF 是追加的形式,所以文件会越来越大,越大的话,数据加载越慢。 所以我们需要对 AOF 文件进行重写。

何为重写

比如 我们的 incr 指令,假如我们 incr 了 100 次,现在数据是 100,但是我们的 aof 文件中会有 100 条 incr 指令,但是我们发现这个 100 条指令用处不大, 假如我们能把最新的内存里的数据保存下来的话。所以,重写就是做了这么一件事情,把当前内存的数据重写下来,然后把之前的追加的文件删除。

重写流程

  • 在 Redis7 之前
  1. Redis fork 一个子进程,在一个临时文件中写入新的 AOF(当前内存的数据生成的新的 AOF)
  1. 那么在写入新的 AOF 的时候,主进程还会有指令进入,那么主进程会在内存缓存区中累计新的指令(但是同时也会写在旧的 AOF 文件中,就算 重写失败,也不会导致 AOF 损坏或者数据丢失)
  1. 如果子进程重写完成,父进程会收到完成信号,并且把内存缓存中的指令追加到新的 AOF 文件中
  1. 替换旧的 AOF 文件 ,并且将新的指令附加到重写好的 AOF 文件中
  • 在 Redis7 之后,AOF 文件不再是一个,所以会有临时清单的概念
  1. 子进程开始在一个临时文件中写入新的基础 AOF
  1. 父级打开一个新的增量 AOF 文件以继续写入更新。如果重写失败,旧的基础和增量文件(如果有的话)加上这个新打开的增量文件就代表了完整的更新数据集,所以我们是安全的
  1. 当子进程完成基础文件的重写后,父进程收到一个信号,并使用新打开 的增量文件和子进程生成的基础文件来构建临时清单,并将其持久化
  1. 现在 Redis 对清单文件进行原子交换,以便此 AOF 重写的结果生效。 Redis 还会清理旧的基础文件和任何未使用的增量文件

但是重写是把当前内存的数据,写入一个新的 AOF 文件,如果当前数据比较大,然后以指令的形式写入的话,会很慢很慢

所以在 4.0 之后,在重写的时候是采用 RDB 的方式去生成新的 AOF 文件,然 后后续追加的指令,还是以指令追加的形式追加的文件末尾

aof-use-rdb-preamble yes //是否开启RDB与AOF混合模式

什么时候重写

配置文件 redis.conf

# 重写触发机制
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb 就算达到了第一个百分比的大小,也必须大于 64M

在 aof 文件小于64mb的时候不进行重写,当到达64mb的时候,就重写一
次。重写后的 aof 文件可能是40mb。上面配置了auto-aof-rewritepercentag为100,即 aof 文件到了80mb的时候,进行重写。

8.2.3 AOF 的优势与不足

优势

  1. 安全性高,就算是默认的持久化同步机制,也最多只会丢失 1s 数据
  1. AOF 由于某些原因,比如磁盘满了等导致追加失败,也能通过 redischeck-aof 工具来修复
  1. 格式都是追加的日志,所以可读性更高

不足

  1. 数据集一般比 RDB 大
  1. 持久化跟数据加载比 RDB 更慢
  1. 在 7.0 之前,重写的时候,因为重写的时候,新的指令会缓存在内存区,所以会导致大量的内存使用
  1. 并且重写期间,会跟磁盘进行 2 次 IO,一个是写入老的 AOF 文件,一个是写入新的 AOF 文件

九、Redis 常见问题总结

9.1 Redis 数据丢失场景

9.1.1 持久化丢失

采用 RDB 或者不持久化, 会有数据丢失,因为是手动或者配置以快照的形式来进行备份。

解决:启用 AOF,以命令追加的形式进行备份,但是默认也会有 1s 丢失, 这是在性能与数据安全性中寻求的一个最适合的方案,如果为了保证数据 一致性,可以将配置更改为 always,但是性能很慢,一般不用。

9.1.2 主从切换

因为 Redis 的数据是主异步同步给从的,提升了性能,但是由于是异步同步到从。所以存在数据丢失的可能

  1. master 写入数据 k1 ,由于是异步同步到 slave,当 master 没有同步给 slave 的时候,master 挂了
  1. slave 会成为新的 master,并且没有同步 k1
  1. master 重启,会成为新 master 的 slave,同步数据会清空自己的数据,从新的 master 加载
  1. k1 丢失

9.2 Redis 缓存雪崩、穿透、击穿问题分析

9.2.1 缓存雪崩

缓存雪崩就是 Redis 的大量热点数据同时过期(失效),因为设置了相同的过期时间,刚好这个时候 Redis 请求的并发量又很大,就会导致所有的请求落到数据库。

  1. 保证 Redis 的高可用,防止由于 Redis 不可用导致全部打到 DB
  1. 加互斥锁或者使用队列,针对同一个 key 只允许一个线程到数据库查询
  1. 缓存定时预先更新,避免同时失效
  1. 通过加随机数,使 key 在不同的时间过期

9.2.2 缓存穿透

缓存穿透是指缓存和数据库中都没有的数据,但是用户一直请求不存在的数据!这时的用户很可能就是攻击者,恶意搞你们公司的,攻击会导致数据库压力过大。

解决方案:布隆过滤器

布隆过滤器的思想:既然是因为你 Redis 跟 DB 都没有,然后用户恶意一直访问不存在的 key,然后全部打到 Redis 跟 DB。那么我们能不能单独把 DB 的数据单独存到另外一个地方,但是这个地方只存一个简单的标记,标记这个 key 存不存在,如果存在,就去访问 Redis 跟 DB,否则直接返回。并且这个内存最好很小。

9.2.3 缓存击穿

单个 key 过期的时候有大量并发,使用互斥锁,回写 redis,并且采用双重检查锁来提升性能!减少对 DB 的访问。

共收到 1 条回复 时间 点赞

redis 还可以当 MQ 用

需要 登录 后方可回复, 如果你还没有账号请点击这里 注册