延迟任务:定时触发的艺术

有些任务不需要立即执行,比如订单超时取消、定时提醒等。River 支持延迟调度:

// 30 分钟后执行订单超时检查
_, err = riverClient.Insert(ctx, CheckOrderTimeoutArgs{
    OrderID: "order_12345",
}, &river.InsertOpts{
    ScheduledAt: time.Now().Add(30 * time.Minute),
})

这个功能在电商场景特别实用。用户下单后,如果 30 分钟内不支付就自动取消订单并释放库存。传统做法可能需要定时任务轮询数据库,效率低还容易出问题。用 River 的延迟任务,优雅解决。

River 的延迟任务是怎么实现的?其实很简单:

  1. 任务入库时 scheduled_at 字段设置为未来时间
  2. Producer 查询时加条件 WHERE scheduled_at <= NOW()
  3. 时间未到的任务自动被过滤,不会被消费

这种设计的好处是精确可靠。不像定时任务可能因为进程重启漏掉执行,数据库里的延迟任务永远不会丢。

优先级队列:让重要的事先做

不是所有任务都同等重要。VIP 用户的订单处理应该优先于普通用户,紧急告警应该优先于日常统计。

// 高优先级:VIP 用户订单
_, err = riverClient.Insert(ctx, ProcessOrderArgs{
    OrderID: "vip_order_001",
}, &river.InsertOpts{
    Priority: 1, // 数字越小优先级越高
    Queue:    "orders",
})

// 低优先级:普通用户订单
_, err = riverClient.Insert(ctx, ProcessOrderArgs{
    OrderID: "normal_order_002",
}, &river.InsertOpts{
    Priority: 3,
    Queue:    "orders",
})

River 会优先消费高优先级的任务,确保核心业务不受影响。在数据库层面,这是通过 ORDER BY priority ASC 实现的,简单高效。

重试机制:让失败不那么可怕

网络抖动、第三方服务暂时不可用,这些都是家常便饭。River 内置了智能重试机制:

type SendEmailWorker struct {
    river.WorkerDefaults[SendEmailArgs]
}

// 配置重试策略
func (w *SendEmailWorker) MaxAttempts() int {
    return 5 // 最多重试 5 次
}

// 自定义重试间隔(指数退避)
func (w *SendEmailWorker) NextRetry(job *river.Job[SendEmailArgs]) time.Time {
    // 第 1 次重试:30 秒后
    // 第 2 次重试:2 分钟后
    // 第 3 次重试:8 分钟后
    // ...以此类推
    seconds := int(math.Pow(2, float64(job.Attempt))) * 30
    return time.Now().Add(time.Duration(seconds) * time.Second)
}

func (w *SendEmailWorker) Work(ctx context.Context, job *river.Job[SendEmailArgs]) error {
    err := sendEmail(job.Args.Email, job.Args.Subject, job.Args.Body, job.Args.Attachments)
    if err != nil {
        // 返回错误,River 会自动按照重试策略重试
        return fmt.Errorf("发送失败,将在 %s 后重试: %w", 
            w.NextRetry(job).Sub(time.Now()), err)
    }
    return nil
}

这种指数退避策略在处理第三方服务限流时特别有效。刚开始快速重试,如果持续失败就逐渐拉长间隔,避免雪崩效应。

重试的原理也很简单:

  1. 任务执行失败时,River 将 state 改回 available
  2. scheduled_at 设置为下次重试时间
  3. 递增 attempt 计数器
  4. 如果 attempt >= max_attempts,将 state 改为 discarded(放弃重试)

批量处理:性能优化的杀手锏

有些场景下,批量处理比单条处理效率高得多,比如批量插入数据库、批量发送邮件。

// 批量入队任务
jobs := make([]river.InsertManyParams, 0, 1000)
for i := 0; i < 1000; i++ {
    jobs = append(jobs, river.InsertManyParams{
        Args: SendEmailArgs{
            Email:   fmt.Sprintf("user%d@example.com", i),
            Subject: "批量通知",
            Body:    "系统升级通知",
        },
    })
}

// 一次性入队 1000 个任务,使用 PostgreSQL 的 COPY 协议
_, err = riverClient.InsertMany(ctx, jobs)

River 内部使用 PostgreSQL 的 COPY FROM 命令,相比逐条 INSERT,性能提升 10 倍以上。1000 个任务可能只需要几十毫秒。

COPY FROM 是 PostgreSQL 的高效批量导入命令,它直接绕过 SQL 解析器,以二进制格式写入数据,速度极快。在我的实测中,普通 INSERT 每秒约 5000 条,而 COPY FROM 能达到 5 万条以上。

监控与运维:让系统可观测

生产环境中,我们需要知道队列的健康状况。River 提供了丰富的统计接口:

// 获取队列统计信息
stats, err := riverClient.QueueStats(ctx, "default")
if err != nil {
    log.Fatal(err)
}

fmt.Printf("队列名称: %s\n", stats.Queue)
fmt.Printf("待处理任务: %d\n", stats.Pending)
fmt.Printf("运行中任务: %d\n", stats.Running)
fmt.Printf("已完成任务: %d\n", stats.Completed)
fmt.Printf("失败任务: %d\n", stats.Failed)

// 如果待处理任务积压过多,可以考虑扩容 worker
if stats.Pending > 10000 {
    log.Println("告警:队列积压严重,建议增加 MaxWorkers")
}

配合 Prometheus、Grafana 等监控系统,你可以实时掌握队列状态,及时发现和解决问题。

更进一步,你可以定期将这些指标推送到监控系统:

// 定期上报队列指标到 Prometheus
func reportMetrics(client *river.Client) {
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()

    for range ticker.C {
        queues := []string{"default", "emails", "reports"}
        for _, queue := range queues {
            stats, err := client.QueueStats(context.Background(), queue)
            if err != nil {
                continue
            }

            // 上报到 Prometheus
            queuePendingGauge.WithLabelValues(queue).Set(float64(stats.Pending))
            queueRunningGauge.WithLabelValues(queue).Set(float64(stats.Running))
            queueCompletedCounter.WithLabelValues(queue).Add(float64(stats.Completed))
            queueFailedCounter.WithLabelValues(queue).Add(float64(stats.Failed))
        }
    }
}

实战:电商订单处理系统

让我们用一个完整的例子把前面的知识串起来。假设我们在做一个电商系统,订单支付成功后需要:

  1. 更新订单状态
  2. 扣减库存
  3. 发送支付成功短信
  4. 发送邮件通知
  5. 增加用户积分

传统做法可能是在支付回调接口里同步执行这些操作,但这样有两个问题:

用 River 来实现:

// 支付回调处理
func handlePaymentCallback(w http.ResponseWriter, r *http.Request) {
    // 解析支付平台的回调数据
    orderID := r.FormValue("order_id")

    // 开启数据库事务
    tx, err := db.Begin()
    if err != nil {
        http.Error(w, "系统错误", 500)
        return
    }
    defer tx.Rollback()

    // 1. 更新订单状态(在事务中)
    _, err = tx.Exec(`
        UPDATE orders 
        SET status = 'paid', paid_at = NOW() 
        WHERE id = $1 AND status = 'pending'
    `, orderID)
    if err != nil {
        http.Error(w, "订单更新失败", 500)
        return
    }

    // 2. 扣减库存(在事务中)
    _, err = tx.Exec(`
        UPDATE products p
        SET stock = stock - oi.quantity
        FROM order_items oi
        WHERE oi.order_id = $1 AND p.id = oi.product_id
    `, orderID)
    if err != nil {
        http.Error(w, "库存扣减失败", 500)
        return
    }

    // 3. 入队异步任务(在同一个事务中)
    // 发送短信(高优先级)
    _, err = riverClient.InsertTx(ctx, tx, SendSMSArgs{
        OrderID: orderID,
    }, &river.InsertOpts{
        Priority: 1,
    })

    // 发送邮件(普通优先级)
    _, err = riverClient.InsertTx(ctx, tx, SendEmailArgs{
        OrderID: orderID,
    }, &river.InsertOpts{
        Priority: 2,
    })

    // 增加积分(低优先级)
    _, err = riverClient.InsertTx(ctx, tx, AddPointsArgs{
        OrderID: orderID,
    }, &river.InsertOpts{
        Priority: 3,
    })

    // 4. 提交事务
    if err := tx.Commit(); err != nil {
        http.Error(w, "系统错误", 500)
        return
    }

    // 立即响应支付平台
    w.Write([]byte("SUCCESS"))
}

注意这个实现的精妙之处:

  1. 数据一致性:订单状态、库存扣减、任务入队都在同一个事务里,要么全成功,要么全失败
  2. 快速响应:核心逻辑(更新订单、扣库存)在事务里同步完成,耗时操作(发短信、发邮件)异步执行
  3. 优先级保证:短信最重要立即发送,邮件其次,积分最后,资源紧张时优先保证核心功能

这就是 River 的威力:用最简单的方式,解决了分布式系统中最复杂的问题。

让我们再深入一点,看看 Worker 的实现:

// 短信发送 Worker
type SendSMSWorker struct {
    river.WorkerDefaults[SendSMSArgs]
    smsClient *SMSClient // 第三方短信服务客户端
}

func (w *SendSMSWorker) Work(ctx context.Context, job *river.Job[SendSMSArgs]) error {
    // 1. 查询订单信息
    order, err := getOrder(job.Args.OrderID)
    if err != nil {
        return fmt.Errorf("查询订单失败: %w", err)
    }

    // 2. 组装短信内容
    content := fmt.Sprintf(
        "【商城】您的订单 %s 支付成功,金额 %.2f 元,我们将尽快为您发货。",
        order.ID, order.Amount,
    )

    // 3. 发送短信(带超时控制)
    ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
    defer cancel()

    err = w.smsClient.Send(ctx, order.UserPhone, content)
    if err != nil {
        // 记录详细错误信息,方便排查
        return fmt.Errorf("发送短信失败 [订单:%s, 手机:%s]: %w", 
            order.ID, order.UserPhone, err)
    }

    // 4. 记录发送日志(可选)
    logSMSSent(order.ID, order.UserPhone, content)

    return nil
}

// 配置重试策略
func (w *SendSMSWorker) MaxAttempts() int {
    return 3 // 短信最多重试 3 次
}

func (w *SendSMSWorker) NextRetry(job *river.Job[SendSMSArgs]) time.Time {
    // 短信失败快速重试,间隔 1、2、4 分钟
    minutes := int(math.Pow(2, float64(job.Attempt-1)))
    return time.Now().Add(time.Duration(minutes) * time.Minute)
}

横向对比

选择技术方案时,横向对比是必不可少的。让我们看看 River 与主流队列方案的差异:

特性 River Redis + Sidekiq RabbitMQ Kafka
事务性保证 ✅ 原生支持,数据库事务 ❌ 需要额外方案(两阶段提交) ❌ 需要额外方案 ❌ 需要额外方案
并发模型 Goroutine(轻量级) 多进程(重量级) 多进程/多线程 多进程
部署复杂度 低(只需 PostgreSQL) 中(需 Redis + 持久化配置) 高(需 RabbitMQ 集群) 高(需 Kafka + ZooKeeper)
性能 1 万/秒 + 1 万/秒 + 10 万/秒 + 百万/秒 +
消息持久化 ✅ 数据库持久化 ⚠️ 需配置 AOF/RDB ✅ 磁盘持久化 ✅ 磁盘持久化
消息顺序性 ⚠️ 同队列同优先级有序 ⚠️ 单队列有序 ✅ 支持严格顺序 ✅ 分区内有序
运维成本 低(复用现有 PG) 中(Redis 监控 + 备份) 高(集群管理复杂) 高(需专业团队)
适用场景 中小规模,重一致性 中大规模,轻量任务 大规模,复杂路由 海量数据流式处理
学习曲线 平缓(Go + SQL) 平缓(Ruby/Go + Redis) 陡峭(AMQP 协议) 陡峭(分布式流平台)

最佳实践与注意事项

使用 River 时,首先要确保所有任务逻辑具备幂等性,即同一任务被多次执行也不会产生副作用。此外,应对长时间运行或依赖外部服务的任务设置合理的超时时间,防止因个别任务卡顿拖垮整个队列。生产中还需持续监控队列积压和失败率,及时扩容和优化,避免消费不及时造成阻塞。

其次,数据库连接池配置需与 River worker 数量、应用自身需求综合考虑,保证足够的连接数以支撑并发处理,避免资源争抢导致性能瓶颈。同时建议任务参数保持简洁,例如仅传递文件路径或对象 ID,不要直接塞入过大的二进制内容,以免影响存储和运行效率。

最后,建议按任务类型进行队列隔离,如关键事务、报表、通知分别分配独立队列与 worker,各自独立伸缩,互不干扰,这样可以提升系统鲁棒性和运维可控性,确保关键任务优先得到保障。


FunTester 原创精华


↙↙↙阅读原文可查看相关链接,并与作者交流