在现代后端开发中,消息队列已经成为系统架构中不可或缺的一环。无论是异步任务处理、削峰填谷,还是系统解耦,队列都扮演着关键角色。但问题来了:市面上队列方案这么多,为什么要选 River?今天咱们就来深入聊聊这个在 Go 圈子里悄然走红的队列库。
简单来说,River 是一个专为 Go 设计的作业队列库,但它不是那种「又一个轮子」的存在。它最大的特点是什么?把队列和 PostgreSQL 深度绑定。听起来有点反直觉?别急,往下看你就懂了。
传统方案里,我们通常会用 Redis、RabbitMQ 这些专门的消息中间件来做队列。这没问题,但也带来了一个老大难问题:分布式事务。
想象一个场景:你在处理订单支付,需要先更新数据库的订单状态,再往队列里扔一个「发送支付成功短信」的任务。如果数据库更新成功了,但往队列里塞任务失败了呢?或者反过来?这就是分布式事务的经典困境。
River 的解决方案很直接:既然都用 PostgreSQL 存数据,那队列也放 PostgreSQL 里不就完了? 一个事务搞定所有事情,要么全成功,要么全回滚。是不是有种「大道至简」的感觉?
River 最核心的优势就是事务性入队。我们来看个实际例子:
// 在一个数据库事务中同时完成业务操作和任务入队
tx, err := db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
// 1. 更新订单状态
_, err = tx.Exec("UPDATE orders SET status = 'paid' WHERE id = ?", orderID)
if err != nil {
return err
}
// 2. 入队发送短信任务(和上面的更新在同一个事务里)
_, err = riverClient.InsertTx(ctx, tx, SendSMSArgs{
Phone: user.Phone,
Content: "支付成功",
}, nil)
if err != nil {
return err
}
// 3. 提交事务,要么全成功,要么全失败
return tx.Commit()
看到了吗?数据库更新和任务入队在同一个事务里。如果中间任何一步失败,整个事务回滚,不会出现「订单状态更新了但短信没发」这种尴尬情况。
这个特性在金融、电商等对数据一致性要求严格的场景下,简直就是救命稻草。传统方案要实现这个效果,你得引入分布式事务框架(比如 Saga),复杂度直线上升。而 River?数据库原生事务就搞定了。
River 充分利用了 Go 的 goroutine 并发模型。传统的队列处理器通常是多进程模型(比如 Ruby 的 Sidekiq),一个进程处理一个任务。而 River 在一个进程内就能启动成百上千个 goroutine 并发处理任务。
// 启动 River 客户端,配置 100 个并发工作协程
workers := river.NewWorkers()
workers.Add(SendEmailWorker{})
workers.Add(ResizeImageWorker{})
workers.Add(GenerateReportWorker{})
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100}, // 默认队列 100 个 worker
"emails": {MaxWorkers: 50}, // 邮件队列 50 个 worker
"reports": {MaxWorkers: 20}, // 报表队列 20 个 worker
},
Workers: workers,
})
这种设计带来了两个好处:
MaxWorkers 参数就行,不需要加机器可能有人会担心:把队列放数据库里,性能不会拉胯吗?
十年前确实会,但现在情况不一样了。PostgreSQL 9.5 引入了 SKIP LOCKED 特性,这是专门为队列场景设计的优化。简单来说,当多个 worker 同时去抢任务时,已经被锁定的行会被直接跳过,不会阻塞其他 worker。
-- River 内部使用的查询语句(简化版)
SELECT * FROM river_job
WHERE state = 'available'
AND queue = 'default'
ORDER BY priority DESC, scheduled_at
LIMIT 100
FOR UPDATE SKIP LOCKED;
这个特性的威力在哪?举个例子:假设有 100 个 worker 同时去抢 10 个任务。在没有 SKIP LOCKED 的时代,这 100 个 worker 会在数据库层面疯狂争抢锁,大量时间浪费在等待上。而有了 SKIP LOCKED,前 10 个 worker 拿到任务后直接锁住,剩下的 90 个瞬间跳过这些行去找下一批任务,完全不阻塞。
这就像超市收银:传统方式是所有人挤在一个收银台排队,后面的人只能干等;SKIP LOCKED 则像是自动分流,看到一个台子在忙立刻去下一个空闲的台子,效率翻倍。
配合现代 SSD 的 I/O 性能,PostgreSQL 做队列的性能已经完全够用。River 官方测试显示,在普通的 MacBook Air 上,每秒处理 1 万个简单任务不在话下。更重要的是,这个性能是在保证 ACID 事务的前提下达成的,数据安全性有保障。
而且别忘了,PostgreSQL 本身就支持主从复制、备份恢复、监控告警等企业级特性。用它做队列,你不需要额外维护一套 Redis 或 RabbitMQ 的基础设施。减少技术栈,降低运维成本,这才是真正的工程智慧。
了解底层原理能帮我们更好地使用 River,也能在遇到问题时快速定位。
River 在 PostgreSQL 中创建了 river_job 表来存储所有任务:
CREATE TABLE river.river_job (
id BIGSERIAL PRIMARY KEY, -- 任务唯一 ID
state TEXT NOT NULL, -- 任务状态:available、running、completed、failed 等
queue TEXT NOT NULL, -- 队列名称
kind TEXT NOT NULL, -- 任务类型(对应 Worker 的 Kind)
args JSONB NOT NULL, -- 任务参数(JSON 格式存储)
priority SMALLINT NOT NULL DEFAULT 3, -- 优先级(数字越小优先级越高)
scheduled_at TIMESTAMPTZ NOT NULL, -- 计划执行时间
attempted_at TIMESTAMPTZ, -- 上次尝试执行时间
attempt SMALLINT NOT NULL DEFAULT 0, -- 已尝试次数
max_attempts SMALLINT NOT NULL DEFAULT 25, -- 最大尝试次数
errors JSONB, -- 错误信息记录
created_at TIMESTAMPTZ NOT NULL, -- 创建时间
finalized_at TIMESTAMPTZ -- 完成时间
);
-- 核心索引:加速任务查询和锁定
CREATE INDEX idx_river_job_fetch ON river.river_job(
queue,
state,
scheduled_at,
priority
) WHERE state = 'available';
这个表结构的设计很巧妙:
queue + state + scheduled_at + priority 组合索引,保证查询效率state = 'available' 的任务建索引,减少索引维护开销当 worker 准备处理任务时,River 的执行流程是这样的:
// 简化版的任务获取逻辑
func (c *Client) fetchJobs(ctx context.Context, queue string, limit int) ([]*Job, error) {
// 1. 在事务中批量获取并锁定任务
tx, err := c.db.Begin(ctx)
if err != nil {
return nil, err
}
defer tx.Rollback(ctx)
// 2. 使用 SKIP LOCKED 避免竞争
rows, err := tx.Query(ctx, `
UPDATE river_job
SET
state = 'running',
attempted_at = NOW(),
attempt = attempt + 1
WHERE id IN (
SELECT id FROM river_job
WHERE state = 'available'
AND queue = $1
AND scheduled_at <= NOW()
ORDER BY priority ASC, scheduled_at ASC
LIMIT $2
FOR UPDATE SKIP LOCKED -- 关键:跳过已锁定的行
)
RETURNING *
`, queue, limit)
// 3. 解析任务并返回
jobs := parseJobs(rows)
// 4. 提交事务
return jobs, tx.Commit(ctx)
}
这个流程有几个关键点:
running,避免重复消费River 的并发模型是这样工作的:
┌─────────────────────────────────────────┐
│ River Client │
│ ┌───────────────────────────────────┐ │
│ │ Producer Goroutine │ │
│ │ (定期批量获取任务) │ │
│ └───────────────┬───────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────┐ │
│ │ Job Channel (缓冲通道) │ │
│ └───────────────┬───────────────────┘ │
│ │ │
│ ┌─────────┴──────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │Worker 1 │ │Worker 2 │ │Worker N │ │
│ │Goroutine│ │Goroutine│ │Goroutine│ │
│ └─────────┘ └─────────┘ └─────────┘ │
└─────────────────────────────────────────┘
具体来说:
这种模型的优势是:
首先安装 River 和 PostgreSQL 驱动:
go get github.com/riverqueue/river
go get github.com/riverqueue/river/riverdriver/riverpgxv5
go get github.com/jackc/pgx/v5/pgxpool
然后在数据库中创建 River 所需的表结构:
-- River 会自动创建这些表,你也可以手动执行
CREATE SCHEMA river;
-- 作业表
CREATE TABLE river.river_job (
id bigserial PRIMARY KEY,
state text NOT NULL,
queue text NOT NULL,
kind text NOT NULL,
args jsonb NOT NULL,
-- 其他字段...
);
-- 索引优化查询性能
CREATE INDEX idx_river_job_state_queue ON river.river_job(state, queue);
River 最酷的地方在于利用了 Go 的泛型特性(Go 1.18+),让任务参数完全类型安全。
// 定义任务参数结构体
type SendEmailArgs struct {
Email string `json:"email"`
Subject string `json:"subject"`
Body string `json:"body"`
Attachments []string `json:"attachments,omitempty"`
}
// 实现 Kind 方法,返回任务的唯一标识符
func (SendEmailArgs) Kind() string {
return "send_email"
}
// 定义 Worker,处理具体任务
type SendEmailWorker struct {
river.WorkerDefaults[SendEmailArgs]
}
// Work 方法:真正的业务逻辑
func (w *SendEmailWorker) Work(ctx context.Context, job *river.Job[SendEmailArgs]) error {
args := job.Args // 直接拿到类型安全的参数,不需要反序列化
fmt.Printf("发送邮件到 %s\n", args.Email)
fmt.Printf("主题: %s\n", args.Subject)
// 这里实现真正的邮件发送逻辑
err := sendEmail(args.Email, args.Subject, args.Body, args.Attachments)
if err != nil {
// 返回错误,River 会根据配置自动重试
return fmt.Errorf("发送邮件失败: %w", err)
}
fmt.Println("邮件发送成功!")
return nil
}
注意看,整个过程没有任何 json.Unmarshal、类型断言或反射操作。参数直接就是强类型的 SendEmailArgs,IDE 能自动补全,编译期就能发现类型错误。这在大型项目中能避免无数的运行时错误。
package main
import (
"context"
"fmt"
"log"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
)
func main() {
ctx := context.Background()
// 1. 连接 PostgreSQL
dbPool, err := pgxpool.New(ctx, "postgres://user:pass@localhost/mydb")
if err != nil {
log.Fatal(err)
}
defer dbPool.Close()
// 2. 注册所有 Worker
workers := river.NewWorkers()
river.AddWorker(workers, &SendEmailWorker{})
// 可以继续添加其他 Worker
// river.AddWorker(workers, &ResizeImageWorker{})
// river.AddWorker(workers, &GenerateReportWorker{})
// 3. 创建 River 客户端
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
Workers: workers,
})
if err != nil {
log.Fatal(err)
}
// 4. 启动客户端(开始消费任务)
if err := riverClient.Start(ctx); err != nil {
log.Fatal(err)
}
defer riverClient.Stop(ctx)
// 5. 入队一个任务
_, err = riverClient.Insert(ctx, SendEmailArgs{
Email: "user@example.com",
Subject: "欢迎使用 River",
Body: "这是一封测试邮件",
}, nil)
if err != nil {
log.Fatal(err)
}
fmt.Println("任务已入队,等待处理...")
// 保持程序运行
select {}
}
运行这段代码,你会看到任务被迅速消费并打印出「邮件发送成功!」。整个流程非常直观。