延迟任务:定时触发的艺术
有些任务不需要立即执行,比如订单超时取消、定时提醒等。River 支持延迟调度:
// 30 分钟后执行订单超时检查
_, err = riverClient.Insert(ctx, CheckOrderTimeoutArgs{
OrderID: "order_12345",
}, &river.InsertOpts{
ScheduledAt: time.Now().Add(30 * time.Minute),
})
这个功能在电商场景特别实用。用户下单后,如果 30 分钟内不支付就自动取消订单并释放库存。传统做法可能需要定时任务轮询数据库,效率低还容易出问题。用 River 的延迟任务,优雅解决。
River 的延迟任务是怎么实现的?其实很简单:
- 任务入库时
scheduled_at字段设置为未来时间 - Producer 查询时加条件
WHERE scheduled_at <= NOW() - 时间未到的任务自动被过滤,不会被消费
这种设计的好处是精确可靠。不像定时任务可能因为进程重启漏掉执行,数据库里的延迟任务永远不会丢。
优先级队列:让重要的事先做
不是所有任务都同等重要。VIP 用户的订单处理应该优先于普通用户,紧急告警应该优先于日常统计。
// 高优先级:VIP 用户订单
_, err = riverClient.Insert(ctx, ProcessOrderArgs{
OrderID: "vip_order_001",
}, &river.InsertOpts{
Priority: 1, // 数字越小优先级越高
Queue: "orders",
})
// 低优先级:普通用户订单
_, err = riverClient.Insert(ctx, ProcessOrderArgs{
OrderID: "normal_order_002",
}, &river.InsertOpts{
Priority: 3,
Queue: "orders",
})
River 会优先消费高优先级的任务,确保核心业务不受影响。在数据库层面,这是通过 ORDER BY priority ASC 实现的,简单高效。
重试机制:让失败不那么可怕
网络抖动、第三方服务暂时不可用,这些都是家常便饭。River 内置了智能重试机制:
type SendEmailWorker struct {
river.WorkerDefaults[SendEmailArgs]
}
// 配置重试策略
func (w *SendEmailWorker) MaxAttempts() int {
return 5 // 最多重试 5 次
}
// 自定义重试间隔(指数退避)
func (w *SendEmailWorker) NextRetry(job *river.Job[SendEmailArgs]) time.Time {
// 第 1 次重试:30 秒后
// 第 2 次重试:2 分钟后
// 第 3 次重试:8 分钟后
// ...以此类推
seconds := int(math.Pow(2, float64(job.Attempt))) * 30
return time.Now().Add(time.Duration(seconds) * time.Second)
}
func (w *SendEmailWorker) Work(ctx context.Context, job *river.Job[SendEmailArgs]) error {
err := sendEmail(job.Args.Email, job.Args.Subject, job.Args.Body, job.Args.Attachments)
if err != nil {
// 返回错误,River 会自动按照重试策略重试
return fmt.Errorf("发送失败,将在 %s 后重试: %w",
w.NextRetry(job).Sub(time.Now()), err)
}
return nil
}
这种指数退避策略在处理第三方服务限流时特别有效。刚开始快速重试,如果持续失败就逐渐拉长间隔,避免雪崩效应。
重试的原理也很简单:
- 任务执行失败时,River 将
state改回available - 将
scheduled_at设置为下次重试时间 - 递增
attempt计数器 - 如果
attempt >= max_attempts,将state改为discarded(放弃重试)
批量处理:性能优化的杀手锏
有些场景下,批量处理比单条处理效率高得多,比如批量插入数据库、批量发送邮件。
// 批量入队任务
jobs := make([]river.InsertManyParams, 0, 1000)
for i := 0; i < 1000; i++ {
jobs = append(jobs, river.InsertManyParams{
Args: SendEmailArgs{
Email: fmt.Sprintf("user%d@example.com", i),
Subject: "批量通知",
Body: "系统升级通知",
},
})
}
// 一次性入队 1000 个任务,使用 PostgreSQL 的 COPY 协议
_, err = riverClient.InsertMany(ctx, jobs)
River 内部使用 PostgreSQL 的 COPY FROM 命令,相比逐条 INSERT,性能提升 10 倍以上。1000 个任务可能只需要几十毫秒。
COPY FROM 是 PostgreSQL 的高效批量导入命令,它直接绕过 SQL 解析器,以二进制格式写入数据,速度极快。在我的实测中,普通 INSERT 每秒约 5000 条,而 COPY FROM 能达到 5 万条以上。
监控与运维:让系统可观测
生产环境中,我们需要知道队列的健康状况。River 提供了丰富的统计接口:
// 获取队列统计信息
stats, err := riverClient.QueueStats(ctx, "default")
if err != nil {
log.Fatal(err)
}
fmt.Printf("队列名称: %s\n", stats.Queue)
fmt.Printf("待处理任务: %d\n", stats.Pending)
fmt.Printf("运行中任务: %d\n", stats.Running)
fmt.Printf("已完成任务: %d\n", stats.Completed)
fmt.Printf("失败任务: %d\n", stats.Failed)
// 如果待处理任务积压过多,可以考虑扩容 worker
if stats.Pending > 10000 {
log.Println("告警:队列积压严重,建议增加 MaxWorkers")
}
配合 Prometheus、Grafana 等监控系统,你可以实时掌握队列状态,及时发现和解决问题。
更进一步,你可以定期将这些指标推送到监控系统:
// 定期上报队列指标到 Prometheus
func reportMetrics(client *river.Client) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
queues := []string{"default", "emails", "reports"}
for _, queue := range queues {
stats, err := client.QueueStats(context.Background(), queue)
if err != nil {
continue
}
// 上报到 Prometheus
queuePendingGauge.WithLabelValues(queue).Set(float64(stats.Pending))
queueRunningGauge.WithLabelValues(queue).Set(float64(stats.Running))
queueCompletedCounter.WithLabelValues(queue).Add(float64(stats.Completed))
queueFailedCounter.WithLabelValues(queue).Add(float64(stats.Failed))
}
}
}
实战:电商订单处理系统
让我们用一个完整的例子把前面的知识串起来。假设我们在做一个电商系统,订单支付成功后需要:
- 更新订单状态
- 扣减库存
- 发送支付成功短信
- 发送邮件通知
- 增加用户积分
传统做法可能是在支付回调接口里同步执行这些操作,但这样有两个问题:
- 响应慢:用户等待时间长
- 不可靠:任何一步失败可能导致数据不一致
用 River 来实现:
// 支付回调处理
func handlePaymentCallback(w http.ResponseWriter, r *http.Request) {
// 解析支付平台的回调数据
orderID := r.FormValue("order_id")
// 开启数据库事务
tx, err := db.Begin()
if err != nil {
http.Error(w, "系统错误", 500)
return
}
defer tx.Rollback()
// 1. 更新订单状态(在事务中)
_, err = tx.Exec(`
UPDATE orders
SET status = 'paid', paid_at = NOW()
WHERE id = $1 AND status = 'pending'
`, orderID)
if err != nil {
http.Error(w, "订单更新失败", 500)
return
}
// 2. 扣减库存(在事务中)
_, err = tx.Exec(`
UPDATE products p
SET stock = stock - oi.quantity
FROM order_items oi
WHERE oi.order_id = $1 AND p.id = oi.product_id
`, orderID)
if err != nil {
http.Error(w, "库存扣减失败", 500)
return
}
// 3. 入队异步任务(在同一个事务中)
// 发送短信(高优先级)
_, err = riverClient.InsertTx(ctx, tx, SendSMSArgs{
OrderID: orderID,
}, &river.InsertOpts{
Priority: 1,
})
// 发送邮件(普通优先级)
_, err = riverClient.InsertTx(ctx, tx, SendEmailArgs{
OrderID: orderID,
}, &river.InsertOpts{
Priority: 2,
})
// 增加积分(低优先级)
_, err = riverClient.InsertTx(ctx, tx, AddPointsArgs{
OrderID: orderID,
}, &river.InsertOpts{
Priority: 3,
})
// 4. 提交事务
if err := tx.Commit(); err != nil {
http.Error(w, "系统错误", 500)
return
}
// 立即响应支付平台
w.Write([]byte("SUCCESS"))
}
注意这个实现的精妙之处:
- 数据一致性:订单状态、库存扣减、任务入队都在同一个事务里,要么全成功,要么全失败
- 快速响应:核心逻辑(更新订单、扣库存)在事务里同步完成,耗时操作(发短信、发邮件)异步执行
- 优先级保证:短信最重要立即发送,邮件其次,积分最后,资源紧张时优先保证核心功能
这就是 River 的威力:用最简单的方式,解决了分布式系统中最复杂的问题。
让我们再深入一点,看看 Worker 的实现:
// 短信发送 Worker
type SendSMSWorker struct {
river.WorkerDefaults[SendSMSArgs]
smsClient *SMSClient // 第三方短信服务客户端
}
func (w *SendSMSWorker) Work(ctx context.Context, job *river.Job[SendSMSArgs]) error {
// 1. 查询订单信息
order, err := getOrder(job.Args.OrderID)
if err != nil {
return fmt.Errorf("查询订单失败: %w", err)
}
// 2. 组装短信内容
content := fmt.Sprintf(
"【商城】您的订单 %s 支付成功,金额 %.2f 元,我们将尽快为您发货。",
order.ID, order.Amount,
)
// 3. 发送短信(带超时控制)
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
err = w.smsClient.Send(ctx, order.UserPhone, content)
if err != nil {
// 记录详细错误信息,方便排查
return fmt.Errorf("发送短信失败 [订单:%s, 手机:%s]: %w",
order.ID, order.UserPhone, err)
}
// 4. 记录发送日志(可选)
logSMSSent(order.ID, order.UserPhone, content)
return nil
}
// 配置重试策略
func (w *SendSMSWorker) MaxAttempts() int {
return 3 // 短信最多重试 3 次
}
func (w *SendSMSWorker) NextRetry(job *river.Job[SendSMSArgs]) time.Time {
// 短信失败快速重试,间隔 1、2、4 分钟
minutes := int(math.Pow(2, float64(job.Attempt-1)))
return time.Now().Add(time.Duration(minutes) * time.Minute)
}
横向对比
选择技术方案时,横向对比是必不可少的。让我们看看 River 与主流队列方案的差异:
| 特性 | River | Redis + Sidekiq | RabbitMQ | Kafka |
|---|---|---|---|---|
| 事务性保证 | ✅ 原生支持,数据库事务 | ❌ 需要额外方案(两阶段提交) | ❌ 需要额外方案 | ❌ 需要额外方案 |
| 并发模型 | Goroutine(轻量级) | 多进程(重量级) | 多进程/多线程 | 多进程 |
| 部署复杂度 | 低(只需 PostgreSQL) | 中(需 Redis + 持久化配置) | 高(需 RabbitMQ 集群) | 高(需 Kafka + ZooKeeper) |
| 性能 | 1 万/秒 + | 1 万/秒 + | 10 万/秒 + | 百万/秒 + |
| 消息持久化 | ✅ 数据库持久化 | ⚠️ 需配置 AOF/RDB | ✅ 磁盘持久化 | ✅ 磁盘持久化 |
| 消息顺序性 | ⚠️ 同队列同优先级有序 | ⚠️ 单队列有序 | ✅ 支持严格顺序 | ✅ 分区内有序 |
| 运维成本 | 低(复用现有 PG) | 中(Redis 监控 + 备份) | 高(集群管理复杂) | 高(需专业团队) |
| 适用场景 | 中小规模,重一致性 | 中大规模,轻量任务 | 大规模,复杂路由 | 海量数据流式处理 |
| 学习曲线 | 平缓(Go + SQL) | 平缓(Ruby/Go + Redis) | 陡峭(AMQP 协议) | 陡峭(分布式流平台) |
最佳实践与注意事项
使用 River 时,首先要确保所有任务逻辑具备幂等性,即同一任务被多次执行也不会产生副作用。此外,应对长时间运行或依赖外部服务的任务设置合理的超时时间,防止因个别任务卡顿拖垮整个队列。生产中还需持续监控队列积压和失败率,及时扩容和优化,避免消费不及时造成阻塞。
其次,数据库连接池配置需与 River worker 数量、应用自身需求综合考虑,保证足够的连接数以支撑并发处理,避免资源争抢导致性能瓶颈。同时建议任务参数保持简洁,例如仅传递文件路径或对象 ID,不要直接塞入过大的二进制内容,以免影响存储和运行效率。
最后,建议按任务类型进行队列隔离,如关键事务、报表、通知分别分配独立队列与 worker,各自独立伸缩,互不干扰,这样可以提升系统鲁棒性和运维可控性,确保关键任务优先得到保障。