Go 先锋
读完需要
速读仅需 3 分钟
1. 延时任务系统介绍
延时任务系统(Delayed Job System)是一种用于设置任务在将来的某个时刻自动触发执行的机制。
定义:延时任务系统使作业/任务能够在未来的某个预定义时间运行。简单来说,就是一种可以设置任务在未来执行的系统。
延时任务系统的主要特点是:
时间触发:根据预设的时间来自动触发任务执行
可靠性:具备容错能力,防止任务丢失
扩展性:可以水平扩展,提高吞吐量
灵活性:可以自由配置执行逻辑
延时任务系统的主要应用场景:
订单超时取消:下单后如果超过 xx 分钟未支付,自动取消订单
短信/邮件验证码:xx 分钟内未使用验证码自动失效
商品秒杀:定时开启秒杀活动,同一时间发起请求大量涌入
会员特权:会员包年包月,到期后自动失效
积分系统:签到、邀请奖励,定期发放
消息推送:预约兑换码,定时推送兑换链接
2. Go 语言实现延时任务的方法
Go 语言中的标准库和社区生态中提供了多种实现延时任务的方法。
(1) time.After()
time.After()可以在指定的时间后发送一个值到返回的 chan 中。
package main
import (
"fmt"
"time"
)
func main() {
c1 := time.After(time.Second * 5)
<-c1
fmt.Println("5 seconds later")
c2 := time.After(time.Minute * 1)
<-c2
fmt.Println("1 minute later")
}
time.After()非常适合简单的一次性延时任务。但由于返回一个 chan,所以无法获得状态等信息。
(2) time.Tick()
time.Tick()会每隔一定时间就发送一个事件到 chan 中,可以实现定时执行。
package main
import (
"fmt"
"time"
)
func main() {
ticker := time.NewTicker(time.Second * 2)
i := 0
for {
<- ticker.C
i++
fmt.Println("Tick", i)
if i == 5 {
ticker.Stop()
break
}
}
}
time.Tick()适合要定期执行的任务场景。停止只需调用 ticker.Stop()。
(3) Worker Pools
使用 worker pool 方式可以更好地复用 goroutine,而不是每次都启动新的 goroutine。
package main
import (
"fmt"
"time"
)
func worker(id int) {
for {
fmt.Printf("Worker %d: started\n", id)
time.Sleep(time.Second)
}
}
func main() {
// 定义2个worker
p := make(chan struct{}, 2)
// 启动2个worker
go worker(1)
p <- struct{}{}
go worker(2)
p <- struct{}{}
// 停10秒后退出
time.Sleep(time.Second * 10)
}
缺点是没有统一的入口来发布任务。
(4) cronjobs
使用成熟的类似 cron 的第三方库(如 robfig/cron)可以使得任务调度更加简单。
package main
import (
"github.com/robfig/cron"
"fmt"
)
func task() {
fmt.Println("Running task...")
}
func main() {
c := cron.New()
c.AddFunc("@every 2s", task)
c.Start()
// 运行一段时间后退出
time.Sleep(5 * time.Minute)
c.Stop()
}
基于 cron 语法可以方便地处理定时任务,但是不支持延时任务。
综上所说,Go 语言通过自带的 time 包或第三方库都可以实现延时任务,后面将展示完整的基于 Redis Sorted Set 实现的延时任务系统。
3. 延时任务系统关键要素设计
开发一个可靠、可扩展的分布式延时任务系统需要考虑一些关键要素:
(1) 任务创建
tasks 应该是一种通用的数据结构,需要考虑版本、重试次数等数据信息。
接口设计应该简单,比如提供创建任务的函数:
// 定义任务结构
type Task struct {
ID string
Type string
Payload map[string]string
// 其他元数据
}
func CreateTask(task *Task) error
(2) 任务存储
需要选择一个持久化存储,比如关系数据库或者 Redis 等
存储方式应该有利于任务的有序访问和范围扫描重点是查找操作需要高效。
(3) 任务调度
调度器需要定期扫描存储,查找已经到期需要执行的任务。
扫描间隔需要可以配置,精确到毫秒。
超时时长可以作为任务的一部分持久化存储。
(4) 任务执行
Worker 进程负责监听任务存储,弹出到期任务并执行相关业务逻辑。
如果任务失败应该重试,一般设置最大重试次数。
执行的任务状态、时间、结果等需要更新存储。
(5) 任务结果处理
执行成功的任务从存储中删除。
失败的任务根据重试策略进行重试或备份。
回调机制可以通知相关服务。
(6) 容错处理
考虑分布式场景下可能出现的问题:
Clock Synchronization: 不同机器间时钟不同步导致调度不准。
Data Inconsistency:任务状态 Replication 不一致。
Job Deduplication: 避免任务重复执行。
Resharding:动态扩容时保证高可用。
4. 基于 Redis 的延时任务系统实现
基于 Redis 的 Sorted Set 数据结构,可以简单实现一个延时任务系统。
(1) 使用 Redis Sorted Set 存储任务
ZADD tasks <timestamp1> <payload1>
ZADD tasks <timestamp2> <payload2>
时间戳是 Score,payload 就是需要延时执行的任务数据。
(2) 调度器定期扫描 Sorted Set
调度器维护一个指针 lastScanTimestamp,每次扫描从这个时间点开始扫描:
ZRANGEBYSCORE tasks lastScanTimestamp 当前时间戳
扫描出的任务即为已到期需要执行的任务。
(3) Worker 从 Set 中弹出执行
Worker 可以监听任务已到期需要执行的 Redis Pub/Sub 频道。
根据调度器发送的到期任务,弹出该任务,执行任务处理逻辑,最后删除已处理任务。
(4) 任务执行结果更新
如果任务执行成功,直接从 Sorted Set 中删除该任务。
如果执行失败,根据重试策略更新 Set 中任务的其他元数据,例如重试次数等。
5. 性能优化方法
(1) 调整任务扫描间隔
扫描间隔直接影响任务延迟精确性。scanInterval 越小则延迟会越小,但是 Redis 压力会上升。
需要根据任务数量级调整间隔,比如初期可以 100ms,后期每增大 10 倍调整到 1s 等。
(2) 根据业务类型优化存储
如果有大量具有相同超时时长的任务,可以考虑为他们创建独立的 Sorted Set,从而减少扫描范围提升效率。
(3) 流控
大流量时为了保护后端服务,可以通过限流限速来控制生产入库速度。例如线上业务可以按照每个用户最多产生 2 个任务/秒来限速。
(4) 扩容机制
可以基于预警指标来自动扩容,比如当每秒待处理任务数超过 10000 时,自动增加一台 Worker 节点。
6. 遇到的常见问题及解决方法
(1) 时钟回拨
如果系统时钟回拨,可能导致任务重复执行。解决方法是每个任务维护一个唯一 ID,Worker 处理时检查重试字段来避免重复执行。
(2) 分布式下任务协同
分布式场景下要确保同一任务只会被唯一一个 Worker 执行,使用 SETNX 选取待处理任务可以防止重复执行。
(3) 任务丢失和重复执行
网络抖动、重启等都可能导致结果丢失或多次写 Redis。这种情况使用唯一 ID+重试次数也可以有效解决。
(4) 任务堆积处理
可以设置失败任务的最大重试历史,如果超过此历史则丢入独立的 dead queues 中由人工处理。避免堆积的任务占用存储和计算资源。
7. 总结
(1) 延时任务系统的意义
延时任务系统是很多业务场景下的基础通用服务,包含了调度机制、容错能力、执行框架等,可以显著简化业务开发。
(2) Go 语言实现的优势
Go 语言本身的高并发、简洁和性能使其很适合构建后台服务。利用 Go 语言开发的延时任务系统性能好,互联网公司的延时任务场景能够承受大规模吞吐。
(3) 展望
可以预见随着微服务架构和流量不断增长,对任务系统的可扩展性、隔离性和运维的要求会越来越高。基于 Kubernetes 等装箱式部署,以及 Serverless 架构的任务系统会是发展方向。