文章

go实现多任务并发DAG调度器

用golang实现多任务依赖调度器,自动按照任务的依赖关系并发执行,降低任务整体延时。

本文介绍的DAG调度工具已开源,欢迎使用、反馈、PR、记得留下小星星~:开源DAG调度器

一、什么是DAG调度器

DAG(Directed Acyclic Graph),中文名是“有向无环图”,在计算机中是一种图数据结构。DAG通常用来描述一系列关系的任务(节点)的前后依赖顺序。我们日常生活中也有很多这样的例子,拿简单的做饭活动作为例子,做饭可以拆分为洗菜、洗锅、切菜、淘米、煮饭、炒菜、吃饭等阶段,这个活动可以用图形化的DAG来描述:

dag示例

用dag图形中可以清晰的看出洗菜、洗锅、淘米之间是可以“并行”的,也就是没有先后顺序依赖,但炒菜必须等洗锅和切菜两个活动完成才能进行。类似吃饭也需要炒菜和煮饭完成之后才能开始。

现实的开发过程中,尤其是做服务端开发工作的同学,往往会在业务逻辑中执行多个任务(如进行耗时的RPC调用),这些任务之间有的可以并行,有的有依赖关系需要串形执行。在go语言中,需要并行的任务我们可以给每个任务启动一个goroutine来执行,在有依赖的任务开始前用sync.WaitGroup.Wait()来进行同步。如果任务数量较少的话,这种写法没什么问题,但如果一个任务存在大量的RPC调用,或者某些任务需要可配置化任务依赖关系时,这种硬编码的方式就显得有些力不从心。

我们可以参考DAG图,来抽象出一个可以自动分析要执行任务的依赖关系,然后自动化的调度任务,无依赖的任务之间并发执行,存在依赖的任务也自动进行同步,会大大简化我们的开发工作。

二、如何实现一个DAG调度器

2.1 实现一个图数据结构

要实现DAG,需要首先实现一个图的数据结构来描述任务之间的依赖关系。每个任务节点我们一般表示为Node,为了通用性考虑,我们定义Node为一个接口,只有一个Name()方法,表示任务的名称。

1
2
3
type Node interface {
    Name() string
}

DAG中除了点,还需要有边,用来描述节点之间的依赖关系,这里我们简单的使用map结构表示即可,map的key和values分别表示有依赖关系的节点及其后继节点,有了边和点的定义,我们的DAG图可以定义为:

1
2
3
4
type Graph struct {
    Nodes []Node
    Edges map[Node][]Node
}

接下来我们给图结构添加上AddNode,AddEdge的函数:

1
2
3
4
5
6
7
func (g *Graph) AddNode(n Node) {
    g.Nodes = append(g.Nodes, n)
}

func (g *Graph) AddEdge(from Node, to Node) {
    g.Edges[from] = append(g.Edges[from], to)
}

接下来稍微复杂的一步,既然我们的图DAG表示的是“有向无环“图,所以当图中的点和边都添加完之后,需要确认图中是没有环存在的,这里需要用到图的遍历的知识,我们可以用深度优先/广度优先遍历来实现,为了效率的考量,我们这里采用非递归形式的广度优先遍历算法。维护一个入度为0的节点集合:从集合中取出节点,查找其后继节点,将后继节点的入度-1,如果后继节点的入度变为0,则将其添加到入度为0的节点集合中,同时记录遍历过的节点的个数,重复这个过程直到没有入度为0的集合为空。如果遍历到的节点个数小于图中节点的总数,则说明图中存在环。下面是这个算法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// BFS 广度优先遍历图迭代器,遍历过程中检查图是否有环
// go1.23之后,可以考虑改为返回一个标准的迭代器 iter.Seq
func (g *Graph) BFS(walker Walker) error {
    var visitedNodesNum int
    inDegrees := make(map[Node]int, len(g.Nodes))
    for _, n := range g.Nodes {
        inDegrees[n] = 0
    }

    for _, to := range g.Edges {
        for _, n := range to {
            inDegrees[n]++
        }
    }
    var zeroDegreeNodes []Node
    for _, v := range g.Nodes {
        if inDegrees[v] == 0 {
            zeroDegreeNodes = append(zeroDegreeNodes, v)
        }
    }

    for len(zeroDegreeNodes) > 0 {
        curNode := zeroDegreeNodes[0]
        zeroDegreeNodes = zeroDegreeNodes[1:]
        if err := walker(curNode); err != nil {
            return err
        }
        visitedNodesNum++
        for _, to := range g.Edges[curNode] {
            inDegrees[to]--
            if inDegrees[to] == 0 {
                zeroDegreeNodes = append(zeroDegreeNodes, to)
            }
        }
    }
    // check circle
    if visitedNodesNum < len(g.Nodes) {
        var circleNodes []string
        for n, inDegree := range inDegrees {
            if inDegree != 0 {
                circleNodes = append(circleNodes, n.Name())
            }
        }
        sort.Slice(circleNodes, func(i, j int) bool {
            return circleNodes[i] < circleNodes[j]
        })
        return fmt.Errorf("graph has circle in nodes:%v", circleNodes)
    }

    return nil
}

2.2 调度器的实现

2.2.1 调度器定义

有了图结构的支撑,我们可以开始设计调度器了。想想看,我们的调度器中需要有哪些属性?首先需要包含一个图数据结构: dag *DAG,其次我们需要一些并发的控制,需要引入sync.WaitGroupsync.Mutex,然后我们需要一个状态量来标记我们的图已经构建完成:sealed bool,然后我们需要感知到任务因为某些原因提前结束了,这里我们使用channel: done chan error加上一些其他的属性就组成的我们的调度器定义:

1
2
3
4
5
6
7
8
9
10
// Scheduler simple scheduler for typed tasks
type Scheduler[T any] struct {
    dag         *Graph
    nodes       map[string]*node[T]
    swg         *sync.WaitGroup
    lock        sync.Mutex
    err         error
    sealed      bool
    done        chan error
}

2.2.2 任务定义

任务就是我们要调度的具体活动,我们的任务首先需要一个唯一的Name,而且由于任务是有依赖关系的,任务需要声明其前驱节点Dependencies都有哪些,最后任务需要定义其具体要执行的工作Execute,于是我们得到了任务的接口定义:

1
2
3
4
5
6
// Task is the interface all your tasks should implement
type Task[T any] interface {
    Name() string
    Dependencies() []string
    Execute(context.Context, T) error
}

为了方便任务调度,我们定义调度器中需要用到的节点类型node

1
2
3
4
5
type node[T any] struct {
    ds       *Scheduler[T]
    next     []*node[T]
    task     Task[T]
}

2.2.3 提交任务

我们的调度器需要能接受任务的提交,并在任务提交的时候做一些检查,如检查调度器是否已经密封sealed、任务是否重复存在等,并实际提交为我们自定义的node类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Submit provide typed task to scheduler, all task should implement interface Task
func (d *Scheduler[T]) Submit(tasks ...Task[T]) error {
    if d.sealed {
        return ErrSealed
    }
    for _, task := range tasks {
        if task == nil {
            d.err = ErrNilTask
            return d.err
        }
        if _, has := d.nodes[task.Name()]; has {
            d.err = ErrTaskExist
            return d.err
        }
        n := &node[T]{task: task, ds: d}
        d.dag.AddNode(n)
        d.nodes[task.Name()] = n
    }
    return nil
}

2.2.4 任务调度执行

终于到最关键的一步了,当所有任务都提交完成,我们需要根据任务的依赖关系开始调度任务并发执行了。首先需要先将调度器密封起来sealed,然后检查是否所有任务声明的依赖节点都存在,及其它们构成的DAG是否是无环的。

然后我们从入度为0的节点集合开始,因为它们没有任何前置依赖,开始并发执行,这在go语言中相当容易。只需要waitGoup.AddwaitGroup.Wait即可。就像我们之前遍历DAG一样,重复这个过程,即可将我们的图调度执行完毕。这里为了效率,我们不直接调用Graph中定义好的BFS方法,而是将无环检查/任务调度合并起来,用一个方法来执行。

任务执行中一般都需要有参数的输入以及各个节点任务的输出收集,我们这里使用了范型参数x来表示统一的执行环境,各个任务节点可以从中读取相关参数或者其他前驱节点的输出,也可以在其中保存当前节点的执行结果等,不过这里需要注意控制并发问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// Run start all tasks and block till all of them done or meet critical err
func (d *Scheduler[T]) Run(ctx context.Context, x T) error {
    if d.err != nil {
        return d.err
    }
    d.sealed = true
    for _, n := range d.nodes {
        for _, name := range n.task.Dependencies() {
            pre, ok := d.nodes[name]
            if !ok {
                return fmt.Errorf("dag:%w: task :%s's dependency:%s not found", ErrTaskNotExist, n.Name(), name)
            }
            d.dag.AddEdge(pre, n)
            pre.next = append(pre.next, n)
        }
    }
    var visitedNodesNum int
    // init nodes inDegrees
    inDegrees := make(map[string]int, len(d.nodes))
    for _, n := range d.nodes {
        inDegrees[n.Name()] = 0
    }
    for _, to := range d.dag.Edges {
        for _, n := range to {
            inDegrees[n.Name()]++
        }
    }
    var toStartNodes []*node[T]
    for name, inDegree := range inDegrees {
        curN := d.nodes[name]
        if inDegree == 0 {
            toStartNodes = append(toStartNodes, curN)
        }
    }
    for len(toStartNodes) > 0 {
        d.swg = new(sync.WaitGroup)
        d.swg.Add(len(toStartNodes))
        for _, n := range toStartNodes {
            n.start(ctx, x)
            visitedNodesNum++
        }
        d.swg.Wait()
        if d.err != nil {
            return d.err
        }
        pre := toStartNodes
        toStartNodes = []*node[T]{}
        for _, startNode := range pre {
            for _, n := range startNode.next {
                inDegrees[n.Name()]--
                if inDegrees[n.Name()] == 0 {
                    toStartNodes = append(toStartNodes, n)
                }
            }
        }
    }
    // check circle
    if visitedNodesNum < len(d.dag.Nodes) {
        var circleNodes []string
        for n, inDegree := range inDegrees {
            if inDegree != 0 {
                circleNodes = append(circleNodes, n)
            }
        }
        sort.Slice(circleNodes, func(i, j int) bool {
            return circleNodes[i] < circleNodes[j]
        })
        return fmt.Errorf("dag:graph has circle in nodes:%v", circleNodes)
    }

    return nil
}

node.Start方法中,我们启动新的goroutine,来执行任务定义的Execute方法。记得新的goroutine协程中不要忘记捕获可能出现的panic。

三、使用示例

实现调度器之后,我们可以方便的定义自己的任务:B,C依赖A完成,并将任务提交给调度器执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
// define taskA type
type task struct{name string, deps []string}
func (t task) Name() string {return t.name}
func (t task) Dependencies() []string {return t.deps}
func (t task) Execute(ctx context.Context, runCtx *sync.Map) error {return nil}

// taskB, taskC ....as the same

ds := NewScheduler[*sync.Map]()
ds.Submit(task{name:"A"})
ds.Submit(task{name:"B",deps:[]string{"A"}})
ds.Submit(task{name:"C",deps:[]string{"A"}})
err := ds.Run(context.Background(), &sync.Map{})

四、结语

好了,利用go语言,你可以自己轻松实现一个DAG调度器。这是我实现的DAG调度器:这个调度器还支持其他额外的Feature:

  • 支持范型
  • 直接提交函数任务
  • 支持注入拦截器
  • 支持分支任务
  • 支持任务超时重试配置等

最后感谢阅读,欢迎使用、反馈、不要忘记给个小星星支持一下!😊

本文由作者按照 CC BY 4.0 进行授权

Comments powered by Disqus.