I am using metaflow
to create a text processing pipeline as follows:-
___F------
______ D---| |
| |___G---| |__>
____B-----| |----->H
| |______E_________________> ^
A -| |
|____C________________________________|
As per the documentation, branch
allows to compute steps in parallel and it is used to compute (B, C), (D, E) and (F, G) in parallel. Finally all the branches are joined at H. Following is the code to implement this logic:-
from metaflow import FlowSpec, step
class TextProcessing(FlowSpec):
@step
def a(self):
....
self.next(self.b, self.c)
@step
def c(self):
result1 = {}
....
self.next(self.join)
@step
def b(self):
....
self.next(self.d, self.e)
@step
def e(self):
result2 = []
.....
self.next(self.join)
@step
def d(self):
....
self.next(self.f, self.g)
@step
def f(self):
result3 = []
....
self.next(self.join)
@step
def g(self):
result4 = []
.....
self.next(self.join)
@step
def join(self, results):
data = [results.c.result, results.e.result2, result.f.result3, result.g.result4]
print(data)
self.next(self.end)
@step
def end(self):
pass
etl = TextProcessing()
On running python main.py run
, I am getting following error:-
Metaflow 2.2.10 executing TextProcessing for user:ubuntu
Validating your flow...
Validity checker found an issue on line 83:
Step join seems like a join step (it takes an extra input argument) but an incorrect number of steps (c, e, f, g) lead to it. This join was expecting 2 incoming paths, starting from splitted step(s) f, g.
Can someone point out where I am going wrong?
After going through docs again carefully, I realised that I wasn't handling joins properly. As per docs for metaflow-2.2.10
:-
Note that you can nest branches arbitrarily, that is, you can branch inside a branch. Just remember to join all the branches that you create.
which means every branch should be joined. In order to join values from branches, metaflow
provides merge_artifacts
utility function to aid in propagating unambiguous values.
Since, there are three branches in the workflow, therefore added three join steps to merge results.
Following changes worked for me:-
from metaflow import FlowSpec, step
class TextProcessing(FlowSpec):
@step
def a(self):
....
self.next(self.b, self.c)
@step
def c(self):
result1 = {}
....
self.next(self.merge_3)
@step
def b(self):
....
self.next(self.d, self.e)
@step
def e(self):
result2 = []
.....
self.next(self.merge_2)
@step
def d(self):
....
self.next(self.f, self.g)
@step
def f(self):
result3 = []
....
self.next(self.merge_1)
@step
def g(self):
result4 = []
.....
self.next(self.merge_1)
@step
def merge_1(self, results):
self.result = {
'result4' : results.g.result4,
'result3' : results.f.result3
}
self.next(self.merge_2)
@step
def merge_2(self, results):
self.result = { 'result2' : results.e.result2, **results.merge_1.result }
self.merge_artifacts(results, include=['result'])
self.next(self.merge_3)
@step
def merge_3(self, results):
self.result = { 'c' : results.c.result1, **results.merge_2.result }
self.merge_artifacts(results, include=['result'])
self.next(self.end)
@step
def end(self):
print(self.result)
etl = TextProcessing()