go实现多任务并发DAG调度器
用golang实现多任务依赖调度器,自动按照任务的依赖关系并发执行,降低任务整体延时。
本文介绍的DAG调度工具已开源,欢迎使用、反馈、PR、记得留下小星星~:开源DAG调度器
一、什么是DAG调度器
DAG(Directed Acyclic Graph),中文名是“有向无环图”,在计算机中是一种图数据结构。DAG通常用来描述一系列关系的任务(节点)的前后依赖顺序。我们日常生活中也有很多这样的例子,拿简单的做饭活动作为例子,做饭可以拆分为洗菜、洗锅、切菜、淘米、煮饭、炒菜、吃饭等阶段,这个活动可以用图形化的DAG来描述:
用dag图形中可以清晰的看出洗菜、洗锅、淘米之间是可以“并行”的,也就是没有先后顺序依赖,但炒菜必须等洗锅和切菜两个活动完成才能进行。类似吃饭也需要炒菜和煮饭完成之后才能开始。
现实的开发过程中,尤其是做服务端开发工作的同学,往往会在业务逻辑中执行多个任务(如进行耗时的RPC调用),这些任务之间有的可以并行,有的有依赖关系需要串形执行。在go语言中,需要并行的任务我们可以给每个任务启动一个goroutine来执行,在有依赖的任务开始前用sync.WaitGroup.Wait()来进行同步。如果任务数量较少的话,这种写法没什么问题,但如果一个任务存在大量的RPC调用,或者某些任务需要可配置化任务依赖关系时,这种硬编码的方式就显得有些力不从心。
我们可以参考DAG图,来抽象出一个可以自动分析要执行任务的依赖关系,然后自动化的调度任务,无依赖的任务之间并发执行,存在依赖的任务也自动进行同步,会大大简化我们的开发工作。
二、如何实现一个DAG调度器
2.1 实现一个图数据结构
要实现DAG,需要首先实现一个图的数据结构来描述任务之间的依赖关系。每个任务节点我们一般表示为Node,为了通用性考虑,我们定义Node为一个接口,只有一个Name()方法,表示任务的名称。
1
2
3
type Node interface {
Name() string
}
DAG中除了点,还需要有边,用来描述节点之间的依赖关系,这里我们简单的使用map结构表示即可,map的key和values分别表示有依赖关系的节点及其后继节点,有了边和点的定义,我们的DAG图可以定义为:
1
2
3
4
type Graph struct {
Nodes []Node
Edges map[Node][]Node
}
接下来我们给图结构添加上AddNode,AddEdge的函数:
1
2
3
4
5
6
7
func (g *Graph) AddNode(n Node) {
g.Nodes = append(g.Nodes, n)
}
func (g *Graph) AddEdge(from Node, to Node) {
g.Edges[from] = append(g.Edges[from], to)
}
接下来稍微复杂的一步,既然我们的图DAG表示的是“有向无环“图,所以当图中的点和边都添加完之后,需要确认图中是没有环存在的,这里需要用到图的遍历的知识,我们可以用深度优先/广度优先遍历来实现,为了效率的考量,我们这里采用非递归形式的广度优先遍历算法。维护一个入度为0的节点集合:从集合中取出节点,查找其后继节点,将后继节点的入度-1,如果后继节点的入度变为0,则将其添加到入度为0的节点集合中,同时记录遍历过的节点的个数,重复这个过程直到没有入度为0的集合为空。如果遍历到的节点个数小于图中节点的总数,则说明图中存在环。下面是这个算法的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// BFS 广度优先遍历图迭代器,遍历过程中检查图是否有环
// go1.23之后,可以考虑改为返回一个标准的迭代器 iter.Seq
func (g *Graph) BFS(walker Walker) error {
var visitedNodesNum int
inDegrees := make(map[Node]int, len(g.Nodes))
for _, n := range g.Nodes {
inDegrees[n] = 0
}
for _, to := range g.Edges {
for _, n := range to {
inDegrees[n]++
}
}
var zeroDegreeNodes []Node
for _, v := range g.Nodes {
if inDegrees[v] == 0 {
zeroDegreeNodes = append(zeroDegreeNodes, v)
}
}
for len(zeroDegreeNodes) > 0 {
curNode := zeroDegreeNodes[0]
zeroDegreeNodes = zeroDegreeNodes[1:]
if err := walker(curNode); err != nil {
return err
}
visitedNodesNum++
for _, to := range g.Edges[curNode] {
inDegrees[to]--
if inDegrees[to] == 0 {
zeroDegreeNodes = append(zeroDegreeNodes, to)
}
}
}
// check circle
if visitedNodesNum < len(g.Nodes) {
var circleNodes []string
for n, inDegree := range inDegrees {
if inDegree != 0 {
circleNodes = append(circleNodes, n.Name())
}
}
sort.Slice(circleNodes, func(i, j int) bool {
return circleNodes[i] < circleNodes[j]
})
return fmt.Errorf("graph has circle in nodes:%v", circleNodes)
}
return nil
}
2.2 调度器的实现
2.2.1 调度器定义
有了图结构的支撑,我们可以开始设计调度器了。想想看,我们的调度器中需要有哪些属性?首先需要包含一个图数据结构: dag *DAG,其次我们需要一些并发的控制,需要引入sync.WaitGroup和sync.Mutex,然后我们需要一个状态量来标记我们的图已经构建完成:sealed bool,然后我们需要感知到任务因为某些原因提前结束了,这里我们使用channel: done chan error加上一些其他的属性就组成的我们的调度器定义:
1
2
3
4
5
6
7
8
9
10
// Scheduler simple scheduler for typed tasks
type Scheduler[T any] struct {
dag *Graph
nodes map[string]*node[T]
swg *sync.WaitGroup
lock sync.Mutex
err error
sealed bool
done chan error
}
2.2.2 任务定义
任务就是我们要调度的具体活动,我们的任务首先需要一个唯一的Name,而且由于任务是有依赖关系的,任务需要声明其前驱节点Dependencies都有哪些,最后任务需要定义其具体要执行的工作Execute,于是我们得到了任务的接口定义:
1
2
3
4
5
6
// Task is the interface all your tasks should implement
type Task[T any] interface {
Name() string
Dependencies() []string
Execute(context.Context, T) error
}
为了方便任务调度,我们定义调度器中需要用到的节点类型node:
1
2
3
4
5
type node[T any] struct {
ds *Scheduler[T]
next []*node[T]
task Task[T]
}
2.2.3 提交任务
我们的调度器需要能接受任务的提交,并在任务提交的时候做一些检查,如检查调度器是否已经密封sealed、任务是否重复存在等,并实际提交为我们自定义的node类型:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Submit provide typed task to scheduler, all task should implement interface Task
func (d *Scheduler[T]) Submit(tasks ...Task[T]) error {
if d.sealed {
return ErrSealed
}
for _, task := range tasks {
if task == nil {
d.err = ErrNilTask
return d.err
}
if _, has := d.nodes[task.Name()]; has {
d.err = ErrTaskExist
return d.err
}
n := &node[T]{task: task, ds: d}
d.dag.AddNode(n)
d.nodes[task.Name()] = n
}
return nil
}
2.2.4 任务调度执行
终于到最关键的一步了,当所有任务都提交完成,我们需要根据任务的依赖关系开始调度任务并发执行了。首先需要先将调度器密封起来sealed,然后检查是否所有任务声明的依赖节点都存在,及其它们构成的DAG是否是无环的。
然后我们从入度为0的节点集合开始,因为它们没有任何前置依赖,开始并发执行,这在go语言中相当容易。只需要waitGoup.Add,waitGroup.Wait即可。就像我们之前遍历DAG一样,重复这个过程,即可将我们的图调度执行完毕。这里为了效率,我们不直接调用Graph中定义好的BFS方法,而是将无环检查/任务调度合并起来,用一个方法来执行。
任务执行中一般都需要有参数的输入以及各个节点任务的输出收集,我们这里使用了范型参数x来表示统一的执行环境,各个任务节点可以从中读取相关参数或者其他前驱节点的输出,也可以在其中保存当前节点的执行结果等,不过这里需要注意控制并发问题:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// Run start all tasks and block till all of them done or meet critical err
func (d *Scheduler[T]) Run(ctx context.Context, x T) error {
if d.err != nil {
return d.err
}
d.sealed = true
for _, n := range d.nodes {
for _, name := range n.task.Dependencies() {
pre, ok := d.nodes[name]
if !ok {
return fmt.Errorf("dag:%w: task :%s's dependency:%s not found", ErrTaskNotExist, n.Name(), name)
}
d.dag.AddEdge(pre, n)
pre.next = append(pre.next, n)
}
}
var visitedNodesNum int
// init nodes inDegrees
inDegrees := make(map[string]int, len(d.nodes))
for _, n := range d.nodes {
inDegrees[n.Name()] = 0
}
for _, to := range d.dag.Edges {
for _, n := range to {
inDegrees[n.Name()]++
}
}
var toStartNodes []*node[T]
for name, inDegree := range inDegrees {
curN := d.nodes[name]
if inDegree == 0 {
toStartNodes = append(toStartNodes, curN)
}
}
for len(toStartNodes) > 0 {
d.swg = new(sync.WaitGroup)
d.swg.Add(len(toStartNodes))
for _, n := range toStartNodes {
n.start(ctx, x)
visitedNodesNum++
}
d.swg.Wait()
if d.err != nil {
return d.err
}
pre := toStartNodes
toStartNodes = []*node[T]{}
for _, startNode := range pre {
for _, n := range startNode.next {
inDegrees[n.Name()]--
if inDegrees[n.Name()] == 0 {
toStartNodes = append(toStartNodes, n)
}
}
}
}
// check circle
if visitedNodesNum < len(d.dag.Nodes) {
var circleNodes []string
for n, inDegree := range inDegrees {
if inDegree != 0 {
circleNodes = append(circleNodes, n)
}
}
sort.Slice(circleNodes, func(i, j int) bool {
return circleNodes[i] < circleNodes[j]
})
return fmt.Errorf("dag:graph has circle in nodes:%v", circleNodes)
}
return nil
}
node.Start方法中,我们启动新的goroutine,来执行任务定义的Execute方法。记得新的goroutine协程中不要忘记捕获可能出现的panic。
三、使用示例
实现调度器之后,我们可以方便的定义自己的任务:B,C依赖A完成,并将任务提交给调度器执行:
1
2
3
4
5
6
7
8
9
10
11
12
13
// define taskA type
type task struct{name string, deps []string}
func (t task) Name() string {return t.name}
func (t task) Dependencies() []string {return t.deps}
func (t task) Execute(ctx context.Context, runCtx *sync.Map) error {return nil}
// taskB, taskC ....as the same
ds := NewScheduler[*sync.Map]()
ds.Submit(task{name:"A"})
ds.Submit(task{name:"B",deps:[]string{"A"}})
ds.Submit(task{name:"C",deps:[]string{"A"}})
err := ds.Run(context.Background(), &sync.Map{})
四、结语
好了,利用go语言,你可以自己轻松实现一个DAG调度器。这是我实现的DAG调度器:这个调度器还支持其他额外的Feature:
- 支持范型
- 直接提交函数任务
- 支持注入拦截器
- 支持分支任务
- 支持任务超时重试配置等
最后感谢阅读,欢迎使用、反馈、不要忘记给个小星星支持一下!😊


Comments powered by Disqus.