I want to split a nextflow workflow into separate chunks (vehicle types), then split these chunks even further (vehicles) and then report as soon as all workers for a specific vehicle type are done.
Currently nextflow main.nf results in:
N E X T F L O W ~ version 24.04.2
Launching `nextflow_pipeline/main.nf` [amazing_bell] DSL2 - revision: aae9cee085
executor > local (11)
[6e/28da8a] start | 1 of 1 ✔
[36/743eea] step1 (2) | 2 of 2 ✔
[cd/50083f] step2 (6) | 6 of 6 ✔
[5f/a823c3] step3 (1) | 2 of 2 ✔
step2: boats1
step2: boats2
step2: cars2
step2: cars3
step2: cars1
step2: cars4
all cars are done
all boats are done
What I would like instead is:
N E X T F L O W ~ version 24.04.2
Launching `nextflow_pipeline/main.nf` [amazing_bell] DSL2 - revision: aae9cee085
executor > local (11)
[6e/28da8a] start | 1 of 1 ✔
[36/743eea] step1 (2) | 2 of 2 ✔
[cd/50083f] step2 (6) | 6 of 6 ✔
[5f/a823c3] step3 (1) | 2 of 2 ✔
step2: boats1
step2: boats2
all boats are done
step2: cars2
step2: cars3
step2: cars1
step2: cars4
all cars are done
Minimal reproducible example:
process start {
output:
path "vehicle_types.list", emit: vehicle_type
"""
#!/usr/bin/python3
with open('vehicle_types.list', 'w') as fout:
for vehicle in ['boats', 'cars']:
fout.write(vehicle + "\\n")
"""
}
process step1 {
debug true
input:
val vehicle_type
output:
path "vehicles.list", emit: vehicles
"""
#!/usr/bin/python3
import time
if "${vehicle_type}" == 'boats':
n = 2
else:
n = 4
with open('vehicles.list', 'w') as fout:
for i in range(n):
time.sleep(2)
fout.write("${vehicle_type}" + str(i + 1) + "\\n")
"""
}
process step2 {
debug true
input:
val vehicle
output:
path "not_needed.txt", emit: hacky_output
"""
echo step2: ${vehicle}
echo hello > not_needed.txt
"""
}
process step3 {
debug true
input:
val vehicle_type
val hacky_variable_to_ensure_step2_completed
"""
echo "all ${vehicle_type} are done"
"""
}
workflow {
main:
start()
start.out.vehicle_type.splitText().map{it -> it.trim()}.set{vehicle_type}
step1(vehicle_type)
step1.out.vehicles.splitText().map{it -> it.trim()}.set{vehicles}
step2(vehicles)
step3(vehicle_type, step2.out.hacky_output.last())
}
Currently step3 is being blocked by the .last()
. I want to concat the results from step2 in a non-blocking way as soon as all the workers for a specific vehicle type are done instead of waiting till all workers for step2 are done. How do I achieve this?
You can use the built-in groupKey()
function to define the expected size of each group. Then call with the groupTuple
operator, for example:
process step1 {
tag "$vehicle_type"
debug true
input:
val vehicle_type
output:
tuple val(vehicle_type), path("vehicles.list"), emit: vehicles
"""
#!/usr/bin/python3
import time
n = 2 if "${vehicle_type}" == 'boats' else 4
with open('vehicles.list', 'w') as fout:
for i in range(n):
time.sleep(2)
fout.write("${vehicle_type}" + str(i + 1) + "\\n")
"""
}
process step2 {
tag "$vehicle_type"
debug true
input:
tuple val(vehicle_type), val(vehicle)
output:
tuple val(vehicle_type), path("not_needed.txt"), emit: hacky_output
"""
echo "step2: ${vehicle}" && touch not_needed.txt
"""
}
process step3 {
tag "$vehicle_type"
debug true
input:
tuple val(vehicle_type), path("txt")
"""
echo "all ${vehicle_type} are done"
"""
}
workflow {
vehicle_types = Channel.of('boats', 'cars')
step1( vehicle_types )
step1.out.vehicles
.map { vehicle_type, vehicle_list ->
def vehicles = vehicle_list.readLines()
def group_key = groupKey( vehicle_type, vehicles.size() )
tuple( group_key, vehicles )
}
.transpose()
.set { vehicles }
step2( vehicles )
step3( step2.out.hacky_output.groupTuple() )
}
Results:
$ nextflow run main.nf
N E X T F L O W ~ version 24.04.3
Launching `main.nf` [modest_nightingale] DSL2 - revision: 49168943ed
executor > local (10)
[b1/dae30f] step1 (cars) [100%] 2 of 2 ✔
[4a/018f10] step2 (cars) [100%] 6 of 6 ✔
[eb/bab138] step3 (cars) [100%] 2 of 2 ✔
step2: boats2
step2: boats1
all boats are done
step2: cars1
step2: cars2
step2: cars3
step2: cars4
all cars are done