javaalgorithmgraphjgrapht

Group tasks for concurrent processing in directed acyclic dependency graph using topological sorting


I have Task class that has a dependency on other tasks before execution. I want to group tasks that can be parallelized and order them. I decide that it can be represented as DAG first and trying to use JGrapht. First i'm traversing input list of tasks to get all the tasks with their dependancies and collecting them in one list. Then for each task I'm creating a vertex in the graph.

DirectedAcyclicGraph<Task, DefaultEdge> d = new DirectedAcyclicGraph<>(DefaultEdge.class);
Set<Task> all = collectAllTasks(tasks);
all.forEach(d::addVertex);

then using same list I'm trying to create edges between nodes.

all.forEach(task -> {
        Set<TaskName> predecessors = task.getPredecessors();

        predecessors.forEach(name -> {
            Task predecessor = taskService.getTaskByName(name);
            d.addEdge(predecessor, task);
        });

    });

Then I'm trying to group tasks

private Set<Set<TaskId>> groupTasks(DirectedAcyclicGraph<TaskId, DefaultEdge> dag) {
    Set<Set<TaskId>> groups = new LinkedHashSet<>();
    Iterator<TaskId> iterator = new TopologicalOrderIterator<>(dag);

    iterator.forEachRemaining(taskId -> {
        //source?
        if (dag.incomingEdgesOf(taskId).isEmpty()) {
            if (groups.isEmpty()) {
                Set<TaskId> set = new HashSet<>();
                set.add(taskId);
                groups.add(set);
            } else {
                groups.iterator().next().add(taskId);
            }
        }

        Set<TaskId> tasks = new HashSet<>(Graphs.successorListOf(dag, taskId));

        //sink?
        if (tasks.isEmpty()) {
            return;
        }

        groups.add(featuresGroup);
    });

    return groups;
}

So the outcome is ordered and grouped tasks, for example for graph

enter image description here

The outcome will be A, B, {C, D}, E.

However, it completely breaks on the case when B is also the predecessor of E like this example

enter image description here

How can I achieve order of A, B, {C, D}, E for graph like previous? Is there any particular algorithm I can look or how I can achieve that in a better way? Thanks.


Solution

  • A solution can be obtained using the following procedure:

    1. The first group contains of all the tasks without incoming arcs: these tasks do not have incoming dependencies.
    2. Remove all tasks in the first group from the graph
    3. The second group consists of tasks without incoming arcs in the modified graph: all dependencies for these tasks have been met.
    4. Remove all tasks in the second group from the modified graph. Continue this process until all tasks have been removed.

    Using JGraphT:

    public static List<List<String>> getGroups(Graph<String, DefaultEdge> taskGraph){
        List<List<String>> groups = new ArrayList<>();
        //The first group contains all vertices without incoming arcs
        List<String> group = new LinkedList<>();
        for(String task : taskGraph.vertexSet())
            if(taskGraph.inDegreeOf(task) == 0)
                group.add(task);
        //Next we construct all remaining groups. The group k+1 consists of al vertices without incoming arcs if we were
        //to remove all vertices in the previous group k from the graph.
        do {
            groups.add(group);
            List<String> nextGroup = new LinkedList<>();
            for (String task : group) {
                for (String nextTask : Graphs.neighborSetOf(taskGraph, task)) {
                    if (taskGraph.inDegreeOf(nextTask) == 1)
                        nextGroup.add(nextTask);
                }
                taskGraph.removeVertex(task); //Removes a vertex and all its edges from the graph
            }
            group=nextGroup;
        }while(!group.isEmpty());
        return groups;
    }
    public static Graph<String, DefaultEdge> getGraph1(){
        Graph<String, DefaultEdge> taskGraph=new SimpleDirectedGraph<>(DefaultEdge.class);
        Graphs.addAllVertices(taskGraph, Arrays.asList("A","B","C","D","E"));
        taskGraph.addEdge("A","B");
        taskGraph.addEdge("B","C");
        taskGraph.addEdge("B","D");
        taskGraph.addEdge("C","E");
        taskGraph.addEdge("D","E");
        return taskGraph;
    }
    public static Graph<String, DefaultEdge> getGraph2(){
        Graph<String, DefaultEdge> taskGraph=new SimpleDirectedGraph<>(DefaultEdge.class);
        Graphs.addAllVertices(taskGraph, Arrays.asList("A","B","C","D","E"));
        taskGraph.addEdge("A","B");
        taskGraph.addEdge("B","C");
        taskGraph.addEdge("B","D");
        taskGraph.addEdge("B","E");
        taskGraph.addEdge("C","E");
        taskGraph.addEdge("D","E");
        return taskGraph;
    }
    public static void main(String[] args){
        System.out.println("Graph1: "+getGroups(getGraph1()));
        System.out.println("Graph2: "+getGroups(getGraph2()));
    }
    

    Output:

    Graph1: [[A], [B], [C, D], [E]]
    Graph2: [[A], [B], [C, D], [E]]
    

    Note: the above code assumes that the input graph is indeed a valid task graph. You could build in an additional check to identify cyclic dependencies, e.g. if you had a sequence like: A -> B -> A.