I have a workflow that consists of a directed acyclic graph (DAG) with several nodes, each having a random execution time. I would like to determine the maximum concurrency of nodes in this workflow. What should I do?
Currently, we traverse all possible running scenarios in a full traversal manner to find the maximum concurrency value. However, this approach becomes time-consuming when dealing with a large number of nodes or edges. Are there algorithms with lower complexity to solve this issue?
For example:
In this diagram, I have listed 5 simple examples.
In reality, our DAG may have several hundred or even thousands of nodes, along with more complex dependency relationships.
The maximum concurrency value refers to the maximum number of nodes that can be concurrently executed based on the dependency relationships in a DAG, either from the root node or when running downwards. For example, in the first scenario, 1, 2, 5, and 6 can be concurrently executed, representing the maximum concurrency among all possible concurrent executions. In the second scenario, if a dependency is introduced between 2 and 5, the maximum concurrent execution would be either 1, 2, 5 or 1, 5, 6 or 1 2 4 or 3 5 6.
@Juan Lopes ----------------------- 2024-08-19 append
Thank you very much for your guidance. I briefly studied the maximum flow problem algorithm and found several common algorithms, including the Ford Fulkerson method, Edmonds Karp algorithm, Dinic algorithm, and Push label algorithm.
Following the Python demo you provided, I worked hard to learn, and with the assistance of ChatGPT, roughly implemented the Java version of the Fulkerson and Dinic algorithm. (The Push label algorithm is too difficult for me.)
I added some comparative tests to my project and found that the results were quite accurate.
However, There is another issue, my two algorithm implementations have very poor performance in the case of sequential dependency.
example1:
201 nodes, dependency like this 1 -> 2 -> 3 -> 4 -> 5 -> xxxxx -> 201
output (dag calculate max This line is the implementation of full traversal+HashSet pruning):
2024-08-19 16:11:24 CST UTC+08:00 INFO - test DinicSolver alg
2024-08-19 16:11:52 CST UTC+08:00 INFO - test DinicSolver alg return 1, cost time: 28177ms
2024-08-19 16:11:52 CST UTC+08:00 INFO - test FordFulkersonSolver alg
2024-08-19 16:11:53 CST UTC+08:00 INFO - test FordFulkersonSolver alg return 1, cost time: 372ms
2024-08-19 16:11:53 CST UTC+08:00 INFO - dag calculate max concurrency 1 cost time: 7ms
And I found that the time consumption is not fixed, and the longer the node name, the longer the time consumption.
use number as node name:
use one letter + number as node name:
use two letter + number as node name:
use three letter + number as node name:
example2:
201 nodes, dependency like this pic:
output (dag calculate max This line is the implementation of full traversal+HashSet pruning):
2024-08-19 16:11:55 CST UTC+08:00 INFO - test DinicSolver alg
2024-08-19 16:11:55 CST UTC+08:00 INFO - test DinicSolver alg return 201, cost time: 6ms
2024-08-19 16:11:55 CST UTC+08:00 INFO - test FordFulkersonSolver alg
2024-08-19 16:11:55 CST UTC+08:00 INFO - test FordFulkersonSolver alg return 201, cost time: 2ms
2024-08-19 16:11:55 CST UTC+08:00 INFO - dag calculate max concurrency 10 cost time: 660ms
append my java code implemention
Ford-Fulkerson alg:
public class FordFulkersonSolver<T> {
private final Map<String, Map<String, Integer>> network;
private final Set<String> visited;
public FordFulkersonSolver() {
network = new HashMap<>();
visited = new HashSet<>();
}
private int dfs(String source, String sink, int flow) {
visited.add(source);
if (source.equals(sink)) {
return flow;
}
for (Map.Entry<String, Integer> entry : network.get(source).entrySet()) {
String neighbor = entry.getKey();
Integer capacity = entry.getValue();
if (capacity <= 0 || visited.contains(neighbor)) continue;
int sent = dfs(neighbor, sink, Math.min(flow, capacity));
if (sent == 0) continue;
network.get(source).put(neighbor, capacity - sent);
network.get(neighbor).put(source, network.get(neighbor).getOrDefault(source, 0) + sent);
return sent;
}
return 0;
}
private Set<String> reach(Map<T, Set<T>> graph, T t, Set<String> visited) {
Queue<T> queue = new LinkedList<>();
queue.add(t);
while (!queue.isEmpty()) {
T current = queue.poll();
String currentKey = "A" + current.toString();
visited.add(currentKey);
for (T neighbor : graph.get(current)) {
String neighborKey = "B" + neighbor.toString();
if (!visited.contains(neighborKey)) {
queue.add(neighbor);
visited.add(neighborKey);
}
}
}
return visited;
}
private void addEdge(String from, String to, int capacity) {
network.computeIfAbsent(from, k -> new HashMap<>()).put(to, capacity);
network.computeIfAbsent(to, k -> new HashMap<>()).put(from, 0);
}
public int solve(Map<T, Set<T>> graph) {
for (T t : graph.keySet()) {
addEdge("src", "A" + t.toString(), 1);
addEdge("B" + t, "sink", 1);
// Corrected here to pass the correct generic type for visited
Set<String> visitedSubset = new HashSet<>();
for (String u : reach(graph, t, visitedSubset)) {
addEdge("A" + t, u, 1);
}
}
int total = 0;
while (true) {
this.visited.clear(); // Clear visited set for each iteration
int sent = dfs("src", "sink", Integer.MAX_VALUE);
if (sent == 0) break;
total += sent;
}
return graph.size() - total;
}
}
Dinic alg:
public class DinicSolver<T> {
private final Map<String, Map<String, Integer>> network;
private List<String> nodes;
private int[] level;
public DinicSolver() {
network = new HashMap<>();
nodes = new ArrayList<>();
nodes.add("src");
nodes.add("sink");
}
private void bfs(String source) {
level = new int[nodes.size()];
Arrays.fill(level, -1);
level[nodes.indexOf(source)] = 0;
Queue<String> queue = new LinkedList<>();
queue.offer(source);
while (!queue.isEmpty()) {
String u = queue.poll();
for (Map.Entry<String, Integer> entry : network.get(u).entrySet()) {
String v = entry.getKey();
int capacity = entry.getValue();
if (capacity > 0 && level[nodes.indexOf(v)] == -1) {
level[nodes.indexOf(v)] = level[nodes.indexOf(u)] + 1;
queue.offer(v);
}
}
}
}
private int dfs(String u, int flow, String sink) {
if (u.equals(sink)) {
return flow;
}
for (Map.Entry<String, Integer> entry : network.get(u).entrySet()) {
String v = entry.getKey();
int capacity = entry.getValue();
if (capacity > 0 && level[nodes.indexOf(u)] < level[nodes.indexOf(v)]) {
int sent = dfs(v, Math.min(flow, capacity), sink);
if (sent > 0) {
network.get(u).put(v, capacity - sent);
network.get(v).put(u, network.get(v).getOrDefault(u, 0) + sent);
return sent;
}
}
}
return 0;
}
private void addEdge(String from, String to, int capacity) {
network.computeIfAbsent(from, k -> new HashMap<>()).put(to, capacity);
network.computeIfAbsent(to, k -> new HashMap<>()).put(from, 0);
if (!nodes.contains(from)) nodes.add(from);
if (!nodes.contains(to)) nodes.add(to);
}
private Set<String> reach(Map<T, Set<T>> graph, T t, Set<String> visited) {
Queue<T> queue = new LinkedList<>();
queue.add(t);
while (!queue.isEmpty()) {
T current = queue.poll();
String currentKey = "A" + current.toString();
visited.add(currentKey);
for (T neighbor : graph.get(current)) {
String neighborKey = "B" + neighbor.toString();
if (!visited.contains(neighborKey)) {
queue.add(neighbor);
visited.add(neighborKey);
}
}
}
return visited;
}
public int solve(Map<T, Set<T>> graph) {
for (T t : graph.keySet()) {
addEdge("src", "A" + t.toString(), 1);
addEdge("B" + t, "sink", 1);
Set<String> visitedSubset = new HashSet<>();
for (String u : reach(graph, t, visitedSubset)) {
addEdge("A" + t, u, 1);
}
}
int maxFlow = 0;
while (true) {
bfs("src");
if (level[nodes.indexOf("sink")] == -1) break;
int flow;
while ((flow = dfs("src", Integer.MAX_VALUE, "sink")) > 0) {
maxFlow += flow;
}
}
return graph.size() - maxFlow;
}
}
That is equivalent to finding the maximum antichain in a partially ordered set (also known as width). That is the maximum subset where no two elements are comparable, which in this case means that no node is a dependency to another.
To find this, we can use Dilworth's theorem, that states that the width of a partially ordered set is equal to the size of the minimum decomposition of the set into chains, that is, subsets of elements where every pair of elements is comparable. We can solve that by modeling the graph as a bipartite graph with an edge for every "happens before" relationship, and solving the maximum matching problem. Important: the DAG is a compressed form of the order relation, it omits many edges. Model the bipartite graph from the transitive closure of the DAG!
Here's an example in Python that uses Ford-Fulkerson (for simplicity), but you can plug your favorite maximum matching algorithm.
from collections import defaultdict
def dfs(network, source, sink, flow, visited):
visited.add(source)
if source == sink: return flow
for neighbor, capacity in network[source].items():
if capacity <= 0 or neighbor in visited: continue
sent = dfs(network, neighbor, sink, min(flow, capacity), visited)
if not sent: continue
network[source][neighbor] -= sent
network[neighbor][source] += sent
return sent
return 0
def reach(graph, source, visited):
for neighbor in graph[source]:
if neighbor not in visited:
visited.add(neighbor)
reach(graph, neighbor, visited)
return visited
def solve(graph):
network = defaultdict(lambda: defaultdict(lambda: 0))
for v in graph:
network['src']['A' + str(v)] = 1
network['B' + str(v)]['sink'] = 1
for u in reach(graph, v, set()):
network['A' + str(v)]['B' + str(u)] = 1
total = 0
while True:
sent = dfs(network, 'src', 'sink', float('+Inf'), set())
if sent == 0: break
total += sent
return len(graph) - total
print(solve({
1: [3],
2: [3],
3: [7],
4: [5, 6],
5: [7],
6: [7],
7: []
})) #4
print(solve({
1: [2],
2: [3],
3: []
})) #1
print(solve({
1: [3],
2: [3, 5],
3: [7],
4: [5, 6],
5: [7],
6: [7],
7: []
})) #3
print(solve({
1: [4],
2: [4],
3: [4],
4: [],
})) #3
print(solve({
1: [4, 2],
2: [4],
3: [4],
4: [],
})) #2
print(solve({
1: [3],
2: [3],
3: [5, 6, 7],
4: [5],
5: [8],
6: [8],
7: [8],
8: []
})) #3
Also, here's an example representation of the original graph, its transitive closure, and its bipartite transformation: