FunTester 布式锁地图:Redis、Redlock、Advisory Lock、CAS

FunTester · 2026年04月18日 · 27 次阅读

每个后端工程师迟早都会遇到这样的场景:两个请求在同一毫秒内同时打到系统,结果客户被重复扣费、座位被重复预订,或者库存被扣成负数。在单体架构里,我们还可以依赖数据库事务或进程内互斥锁兜底;但在有多个服务实例的分布式系统里,这套方法就不够用了。

这时候,真正要解决的问题不是某一段代码该怎么写,而是如何在多个进程、多个节点之间协调对同一份共享资源的访问。也就是说,我们需要一套可靠的分布式锁机制。

本文会结合 Redis、PostgreSQL、DynamoDB 等常见组件,拆解几种在生产环境里常见的分布式并发控制方案,包括 SET NX EX、Redlock、建议锁、乐观锁 和 CAS。重点不只是讲怎么做,还会讲清楚每种方案适合什么场景、有哪些边界,以及如何在工程上做取舍。

分布式锁为什么存在

假设一个支付服务在负载均衡器后面跑着 3 个副本。用户点击一次支付按钮,但因为网络抖动触发了重试,结果其中 2 个副本几乎同时收到了请求。它们都读到订单状态是 pending,都执行了扣款逻辑,又都把订单更新为 paid。最终用户看到的,就是两笔扣费记录。

售票系统里也有类似问题。两个用户同时抢最后一个座位,两个请求几乎同时查到库存里还剩 1 张票,于是都通过了校验,最后系统卖出了一张根本不存在的票。

这些都不是实验室里的极端案例,而是线上系统里非常常见的并发冲突。根因只有一个:多个进程需要在没有共享内存的前提下,协调对同一份资源的访问。

分布式锁的本质,是把原本只在单进程内成立的互斥语义,扩展到跨进程、跨节点的执行环境里。 同一时刻只能有一个持有者进入关键区,其他请求要么等待,要么尽快失败。

基于 Redis 的锁:SET NX EX 模式

最常见、也最容易落地的分布式锁方案,是使用单个 Redis 实例,通过原子 SET 命令完成加锁:

import Redis from "ioredis";

const redis = new Redis(process.env.REDIS_URL);

interface LockResult {
  acquired: boolean;
  release: () => Promise<boolean>;
}

async function acquireLock(
  key: string,
  ttlMs: number,
  ownerId: string
): Promise<LockResult> {
  const lockKey = `lock:${key}`;

  // 只有首次写入成功的调用方才能拿到锁
  const result = await redis.set(lockKey, ownerId, "PX", ttlMs, "NX");

  if (result !== "OK") {
    return { acquired: false, release: async () => false };
  }

  // 释放锁时先校验 owner,避免误删别人的锁
  const release = async (): Promise<boolean> => {
    const script = `
      if redis.call("get", KEYS[1]) == ARGV[1] then
        return redis.call("del", KEYS[1])
      else
        return 0
      end
    `;
    const removed = await redis.eval(script, 1, lockKey, ownerId);
    return removed === 1;
  };

  return { acquired: true, release };
}

在支付处理流程里的典型用法如下:

import { randomUUID } from "crypto";

async function processPayment(orderId: string, amount: number) {
  const ownerId = randomUUID();
  const lock = await acquireLock(`payment:${orderId}`, 10_000, ownerId);

  if (!lock.acquired) {
    throw new Error("Payment already being processed");
  }

  try {
    const order = await db.orders.findById(orderId);
    if (order.status !== "pending") {
      return { status: "already_processed" };
    }

    const charge = await paymentGateway.charge(order.customerId, amount);
    await db.orders.update(orderId, {
      status: "paid",
      chargeId: charge.id,
    });

    return { status: "success", chargeId: charge.id };
  } finally {
    await lock.release();
  }
}

这个模式能成立,关键在 4 个点:

  • NX:只有 key 不存在时才写入,确保加锁动作本身是原子的。
  • PX:给锁设置毫秒级过期时间,避免持有者宕机后形成永久死锁。
  • Lua 释放脚本:只有 owner 匹配时才删除 key,防止慢请求在锁过期后误删后来者的锁。
  • 唯一 ownerId:每次加锁都必须生成新的持有者标识,不能复用流程级 ID。

Go 版本的实现思路完全一样:

package distlock

import (
    "context"
    "crypto/rand"
    "encoding/hex"
    "time"

    "github.com/redis/go-redis/v9"
)

type Lock struct {
    client  *redis.Client
    key     string
    ownerID string
}

func Acquire(ctx context.Context, client *redis.Client, key string, ttl time.Duration) (*Lock, error) {
    ownerID := generateOwnerID()
    lockKey := "lock:" + key

    ok, err := client.SetNX(ctx, lockKey, ownerID, ttl).Result()
    if err != nil {
        return nil, err
    }
    if !ok {
        return nil, ErrLockNotAcquired
    }

    return &Lock{client: client, key: lockKey, ownerID: ownerID}, nil
}

var releaseScript = redis.NewScript(`
    if redis.call("get", KEYS[1]) == ARGV[1] then
        return redis.call("del", KEYS[1])
    else
        return 0
    end
`)

func (l *Lock) Release(ctx context.Context) error {
    result, err := releaseScript.Run(ctx, l.client, []string{l.key}, l.ownerID).Int64()
    if err != nil {
        return err
    }
    if result == 0 {
        return ErrLockAlreadyReleased
    }
    return nil
}

func generateOwnerID() string {
    b := make([]byte, 16)
    rand.Read(b)
    return hex.EncodeToString(b)
}

这个方案的短板也很明确:如果你只依赖单个 Redis 节点,那它本身就是单点。一旦 Redis 宕机,锁就全部丢失;如果发生主从切换,尚未复制出去的锁状态也可能丢失,结果就是两个进程在故障窗口里同时认为自己拿到了同一把锁。

如果你的业务本来就有数据库唯一约束、幂等键等二次保护,这种极低概率的双重获取通常还可以接受;但如果场景是金融交易、资源分配或其他高损失操作,就不能只停留在这里。

Redlock 算法

当单节点 Redis 的可靠性不够时,很多团队会考虑 Redlock。它把锁分散到多个独立的 Redis 实例上,只有在拿到多数节点的成功响应后,才认为加锁成功:

import Redis from "ioredis";
import { randomUUID } from "crypto";

class Redlock {
  private nodes: Redis[];
  private quorum: number;

  constructor(urls: string[]) {
    this.nodes = urls.map((url) => new Redis(url));
    this.quorum = Math.floor(this.nodes.length / 2) + 1;
  }

  async acquire(
    resource: string,
    ttlMs: number
  ): Promise<{ acquired: boolean; release: () => Promise<void> }> {
    const ownerId = randomUUID();
    const lockKey = `lock:${resource}`;
    const startTime = Date.now();

    // 并行向所有节点发起加锁请求
    const results = await Promise.allSettled(
      this.nodes.map((node) =>
        node.set(lockKey, ownerId, "PX", ttlMs, "NX")
      )
    );

    const acquired = results.filter(
      (r) => r.status === "fulfilled" && r.value === "OK"
    ).length;

    const elapsed = Date.now() - startTime;
    const remainingTtl = ttlMs - elapsed;

    // 多数派成功,且剩余 TTL 足够支撑业务执行
    if (acquired >= this.quorum && remainingTtl > ttlMs * 0.1) {
      return {
        acquired: true,
        release: () => this.releaseAll(lockKey, ownerId),
      };
    }

    // 获取失败时,要把已经抢到的锁一并释放
    await this.releaseAll(lockKey, ownerId);
    return { acquired: false, release: async () => {} };
  }

  private async releaseAll(key: string, ownerId: string): Promise<void> {
    const script = `
      if redis.call("get", KEYS[1]) == ARGV[1] then
        return redis.call("del", KEYS[1])
      else
        return 0
      end
    `;
    await Promise.allSettled(
      this.nodes.map((node) => node.eval(script, 1, key, ownerId))
    );
  }
}

Redlock 的价值主要体现在 3 点:

  1. 多数派机制:少量节点故障时,系统仍然能继续工作。
  2. 有效窗口计算:锁的可用时间不是原始 TTL,而是扣掉获取耗时后的剩余时间。
  3. 实现复杂度更高:它对时钟、网络延迟和节点独立性更敏感,写错细节就会埋雷。

需要特别提醒的是,Redlock 不是银弹。它没有天然的围栏令牌能力,如果业务执行时间拖得过长,TTL 到期后仍然可能出现慢请求写回的问题。工程上更稳妥的做法,通常是直接使用成熟库,比如 Node 生态里的 redlock,或者 Go 生态里的 redsync,而不是自己从零手写。

心跳续期

固定 TTL 有个经典两难:设短了,业务还没跑完锁就过期;设长了,持有者一旦崩溃,其他请求就要白白等很久。常见解法是加一个后台续期线程,也就是所谓的心跳机制:

type RenewableLock struct {
    *Lock
    cancel context.CancelFunc
    done   chan struct{}
}

func AcquireWithRenewal(
    ctx context.Context,
    client *redis.Client,
    key string,
    ttl time.Duration,
) (*RenewableLock, error) {
    lock, err := Acquire(ctx, client, key, ttl)
    if err != nil {
        return nil, err
    }

    renewCtx, cancel := context.WithCancel(ctx)
    done := make(chan struct{})

    go func() {
        defer close(done)
        // 每 1/3 TTL 续一次,给失败重试留出缓冲
        ticker := time.NewTicker(ttl / 3)
        defer ticker.Stop()

        for {
            select {
            case <-renewCtx.Done():
                return
            case <-ticker.C:
                err := renew(renewCtx, client, lock.key, lock.ownerID, ttl)
                if err != nil {
                    // 续期失败通常意味着锁已经丢失,不应继续执行业务
                    return
                }
            }
        }
    }()

    return &RenewableLock{Lock: lock, cancel: cancel, done: done}, nil
}

var renewScript = redis.NewScript(`
    if redis.call("get", KEYS[1]) == ARGV[1] then
        return redis.call("pexpire", KEYS[1], ARGV[2])
    else
        return 0
    end
`)

func renew(ctx context.Context, client *redis.Client, key, ownerID string, ttl time.Duration) error {
    result, err := renewScript.Run(ctx, client, []string{key}, ownerID, ttl.Milliseconds()).Int64()
    if err != nil {
        return err
    }
    if result == 0 {
        return ErrLockAlreadyReleased
    }
    return nil
}

func (rl *RenewableLock) Release(ctx context.Context) error {
    rl.cancel() // 先停掉续期协程
    <-rl.done   // 再等后台协程退出
    return rl.Lock.Release(ctx)
}

这里把续期间隔设置成 TTL / 3,本质上是在业务执行期间给自己留出两次补救机会。如果进程挂起、线程卡死或者发生网络分区,心跳自然停止,锁也会在过期后自动释放。

很多成熟框架内部都是这个思路。比如 Redisson 的看门狗机制,本质上就是在锁仍然归自己所有时持续刷新过期时间。

PostgreSQL 建议锁

如果你的系统本来就强依赖 PostgreSQL,而且锁保护的范围和数据库操作天然一致,那建议锁往往是一个非常务实的选择。它不需要额外引入外部基础设施,也能和数据库事务很好地配合:

import { Pool } from "pg";

const pool = new Pool({ connectionString: process.env.DATABASE_URL });

// 会话级建议锁:显式释放前一直有效,连接断开也会释放
async function withAdvisoryLock<T>(
  lockId: number,
  fn: () => Promise<T>
): Promise<T> {
  const client = await pool.connect();

  try {
    // pg_try_advisory_lock 非阻塞,拿不到会直接返回 false
    const { rows } = await client.query(
      "SELECT pg_try_advisory_lock($1) AS acquired",
      [lockId]
    );

    if (!rows[0].acquired) {
      throw new Error(`Could not acquire advisory lock ${lockId}`);
    }

    try {
      return await fn();
    } finally {
      await client.query("SELECT pg_advisory_unlock($1)", [lockId]);
    }
  } finally {
    client.release();
  }
}

// 业务 ID 映射为稳定的锁 ID
function lockIdFromOrderId(orderId: string): number {
  let hash = 0;
  for (const char of orderId) {
    hash = (hash * 31 + char.charCodeAt(0)) | 0;
  }
  return Math.abs(hash);
}

// 退款流程通过建议锁串行化
await withAdvisoryLock(lockIdFromOrderId("order-789"), async () => {
  const order = await db.orders.findById("order-789");
  if (order.status === "refunded") return;
  await paymentGateway.refund(order.chargeId);
  await db.orders.update("order-789", { status: "refunded" });
});

事务级建议锁也很常用,它会在事务提交或回滚时自动释放:

-- 事务结束时自动释放,适合和同事务内的数据更新一起使用
SELECT pg_advisory_xact_lock(hashtext('booking:seat-42'));

UPDATE seats
SET booked_by = 'user-123'
WHERE id = 'seat-42' AND booked_by IS NULL;

建议锁的优点很直接:

  • 无需额外组件:已经用了 PostgreSQL,就能直接上手。
  • 事务整合自然:事务级锁和数据修改天然同生命周期。
  • 数据库负责死锁检测:不用自己额外维护复杂的锁等待关系。

它的边界也很清楚:

  • 只能绑定在单个 PostgreSQL 主实例上,不能跨库协调。
  • 锁 ID 是整数,字符串资源需要设计稳定哈希。
  • 如果用会话级锁,还要特别注意和连接池的交互,避免连接归还后锁还没释放。

带版本列的乐观锁

很多时候,我们根本不需要严格互斥,只需要在写入时发现冲突即可。乐观锁就是这个思路:默认假设冲突很少发生,真正更新时再检查版本号是否变化。

// Schema: orders 表里有一个 version 整数字段,默认值为 1

async function updateOrderStatus(
  orderId: string,
  newStatus: string,
  expectedVersion: number
): Promise<boolean> {
  const result = await db.query(
    `UPDATE orders
     SET status = $1, version = version + 1, updated_at = now()
     WHERE id = $2 AND version = $3`,
    [newStatus, orderId, expectedVersion]
  );

  // 没有更新到任何行,说明有人先一步改过了
  return result.rowCount === 1;
}

async function processOrderWithRetry(
  orderId: string,
  maxRetries = 3
) {
  for (let attempt = 0; attempt < maxRetries; attempt++) {
    const order = await db.orders.findById(orderId);

    if (order.status !== "pending") {
      return { status: "already_processed" };
    }

    const updated = await updateOrderStatus(
      orderId,
      "processing",
      order.version
    );

    if (updated) {
      // 成功抢到更新权后,再继续执行业务逻辑
      await fulfillOrder(order);
      return { status: "success" };
    }

    // 冲突后回读并退避重试,避免热点资源被持续打爆
    await sleep(Math.pow(2, attempt) * 10);
  }

  throw new Error("Max retries exceeded due to concurrent modifications");
}

Go 版本用 database/sql 也是同一个套路:

func updateOrderStatus(ctx context.Context, db *sql.DB, orderID, newStatus string, expectedVersion int) (bool, error) {
    result, err := db.ExecContext(ctx,
        `UPDATE orders SET status = $1, version = version + 1, updated_at = now()
         WHERE id = $2 AND version = $3`,
        newStatus, orderID, expectedVersion,
    )
    if err != nil {
        return false, err
    }

    rows, err := result.RowsAffected()
    return rows == 1, err
}

这种方案特别适合下面几类场景:

  • 冲突概率本来就不高,大多数请求操作的是不同资源。
  • 业务操作支持安全重试。
  • 你更在意吞吐量,不希望每次都先走一次显式加锁。

但它并不适合高竞争场景。因为一旦热点资源被大量并发修改,失败请求会不断回读、重试,反过来把数据库压力继续放大。

CAS 模式

CAS(Compare-And-Swap)可以理解成把乐观锁的思想扩展到数据库之外。只要底层存储支持条件写入,就能实现类似效果。下面是一个基于 DynamoDB 条件表达式的例子:

import { DynamoDBClient, UpdateItemCommand } from "@aws-sdk/client-dynamodb";

const ddb = new DynamoDBClient({});

async function claimTask(taskId: string, workerId: string): Promise<boolean> {
  try {
    await ddb.send(
      new UpdateItemCommand({
        TableName: "tasks",
        Key: { id: { S: taskId } },
        UpdateExpression:
          "SET #status = :claimed, worker_id = :worker, claimed_at = :now",
        ConditionExpression: "#status = :unclaimed",
        ExpressionAttributeNames: { "#status": "status" },
        ExpressionAttributeValues: {
          ":claimed": { S: "claimed" },
          ":unclaimed": { S: "unclaimed" },
          ":worker": { S: workerId },
          ":now": { S: new Date().toISOString() },
        },
      })
    );
    return true;
  } catch (err: any) {
    if (err.name === "ConditionalCheckFailedException") {
      return false; // 任务已经被其他 worker 抢走
    }
    throw err;
  }
}

CAS 也是分布式状态机、领导者选举和很多共识协议的基础能力。像 etcd、ZooKeeper、Consul 这类系统,本质上都提供了条件更新或版本校验这一类原语。

锁竞争与性能

锁的代价从来不只是加锁那一瞬间。只要你引入了锁,就等于主动把一部分访问串行化了;一旦竞争上来,锁就会从保护机制变成吞吐瓶颈。工程上可以优先做 5 件事:

1. 缩小锁粒度。 能锁 payment:order-123,就不要锁整个 payments 资源池。

// Bad: 全局锁,所有支付请求都会互相阻塞
await acquireLock("payment-processing", 10_000, ownerId);

// Good: 细粒度锁,只串行化同一订单
await acquireLock(`payment:${orderId}`, 10_000, ownerId);

2. 缩短持锁时间。 把校验、准备、发消息这些非关键步骤尽量放到锁外面,只在真正变更共享状态的那一小段时间里持有锁。

// 锁外先完成参数校验和上下文准备
const validatedPayment = await validatePaymentDetails(request);
const idempotencyKey = deriveIdempotencyKey(request);

// 只在状态变更期间持有锁
const lock = await acquireLock(`payment:${orderId}`, 5_000, ownerId);
try {
  const order = await db.orders.findById(orderId);
  if (order.status !== "pending") return;
  await db.orders.update(orderId, { status: "paid" });
} finally {
  await lock.release();
}

// 收据发送等后置操作放到锁外执行
await sendReceipt(validatedPayment);

3. 优先考虑无锁设计。 幂等键、唯一约束和 CAS 在很多业务里比显式锁更稳,也更容易扩展。

4. 监控锁指标。 至少要持续看获取耗时、持锁时长、争用率和超时次数。锁等待时间突然飙升,通常说明设计出现了热点,而不是单纯机器变慢。

5. 设置严格超时。 对多数 Web 请求来说,1~2 秒的锁等待上限已经很常见。等太久通常没意义,尽快失败反而更容易保护系统。

如何选择合适的方案

方案 最适合的场景 基础设施 一致性特征
Redis SET NX EX 通用型、高吞吐量场景 Redis 单节点级耐久性
Redlock 需要跨节点提升安全性的 Redis 场景 5 个 Redis 节点 多数派保证
PostgreSQL 建议锁 以数据库为中心的工作流 PostgreSQL 事务一致性强
乐观锁 低冲突、少量写入场景 任意关系型数据库 行级版本校验
CAS Serverless、多区域、托管存储场景 托管服务 依赖底层服务能力

如果你需要一个简单的决策框架,可以按下面这个顺序判断:

  1. 如果冲突本来就少,而且业务支持重试,优先考虑乐观锁。 它实现最轻,扩展性通常也最好。
  2. 如果关键区天然和数据库事务绑定,而且系统已经在重度使用 PostgreSQL,优先考虑建议锁。 事务级建议锁尤其干净。
  3. 如果你需要的是通用分布式互斥,且能接受少量故障窗口里的重复执行风险,可以先上 Redis SET NX EX
  4. 如果双重执行的代价极高,比如金融扣费、资源分配,再考虑 Redlock 或 etcd / Consul 这一类更强协调方案。
  5. 真正稳健的生产系统,往往不是单层防线。 分布式锁负责减少并发冲突,数据库约束负责兜住极端情况,这样才更像工程方案。

多层防线的典型写法如下:

async function processPaymentSafely(orderId: string, amount: number) {
  // Layer 1: 分布式锁,先挡住同一订单的并发处理
  const lock = await acquireLock(`payment:${orderId}`, 10_000, randomUUID());
  if (!lock.acquired) throw new Error("Payment in progress");

  try {
    // Layer 2: 数据库状态检查,避免重复处理
    const order = await db.orders.findById(orderId);
    if (order.status !== "pending") return { status: "already_processed" };

    const charge = await paymentGateway.charge(order.customerId, amount);

    // Layer 3: 唯一约束兜底,防止重复写入账单记录
    await db.orders.update(orderId, {
      status: "paid",
      chargeId: charge.id, // schema 中应对 chargeId 建唯一约束
    });

    return { status: "success" };
  } finally {
    await lock.release();
  }
}

总结

分布式并发控制里,最危险的误区不是不会加锁,而是把锁当成唯一答案。很多时候,我们真正想要的并不是某个技术名词本身,而是并发场景下的正确性。

如果你的场景比较常规,Redis 的 SET NX EX 配合 owner 校验,已经能覆盖大多数需求;如果系统本来就以数据库为中心,PostgreSQL 建议锁往往更自然;如果冲突很少,乐观锁 和 CAS 甚至可以完全替代显式锁。

真正靠谱的方案,通常不是追求最重的锁,而是在锁、幂等、唯一约束和条件更新之间做组合。 这样即使某一层失效,系统也不至于直接滑向错误结果。


FunTester 原创精华
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册