首先是 message 表:
| 字段名 | 类型及描述 |
|---|---|
| id | 主键 |
| status | 消息状态(待处理,进行中,成功,失败) |
| try_count | 消费者重试次数 |
| lock_expires | 锁过期时间 |
| created | 创建时间 |
| data | 消息数据 |
然后消费端轮询:
select * from message where status in ('PENDING', 'STARTED', 'FAILED') and try_count < max_tries and lock_expires < now() order by created limit 1 for update skip locked; 解释下 sql:
- 状态筛选 (status IN ('PENDING', 'STARTED', 'FAILED')) 只选取未开始或已开始但未完成的任务(可能因崩溃需重启)。
- 重试保护 (try_count < max_tries) 确保任务重试次数未超过允许的最大值(避免无限重试失败任务)。
- 锁过期检查 (lock_expires < NOW()) 任务被处理时会加锁(设置未来过期时间),此条件筛选锁已过期的任务(说明此前处理进程异常退出)。
- 排序与限量 (ORDER BY created LIMIT 1) 按创建时间排序后取最旧的一条任务,实现公平调度。
- 并发控制 (FOR UPDATE SKIP LOCKED)
- FOR UPDATE:锁定所选行,防止其他进程修改。
- SKIP LOCKED:跳过已被其他进程锁定的行,避免阻塞等待,提升并发效率。
- 此语法在 postgresql 、mysql(8.0)均支持。
利用关系数据库持久化消息,支持索引,可以灵活检索,关键是无需额外引入组件,请问这种方案是否可行呢?
