I need to construct following flow:
However I have no idea how to properly inject gather
-take
into map
:
sub MAIN ( *@file-names ) {
@file-names.map( { slip parse-file( $_ ) } ).map( { process-line( $_ ) } );
}
sub parse-file ( $file-name ) {
return gather for $file-name.IO.lines -> $line {
take $line if $line ~~ /a/; # dummy example logic
}
}
sub process-line ( $line ) {
say $line; # dummy example logic
}
This code works but leaks memory like crazy. I assume slip
makes gather
-take
eager? Or slip
does not mark Seq
items as consumed? Is there a way to slip gather
-take
result into map
in lazy manner?
BTW: My intent is to parallelize each step with race
later - so for example I have 2 files parsed at the same time producing lines for 10 line processors. Generally speaking I'm trying to figure out easiest way of composing such cascade flows. I've tried Channels to connect each processing step but they have no built-in pushback. If you have any other patterns for such flows then comments are more than welcomed.
EDIT 1:
I think my code is correct, and memory leak is not caused by bad logic but rather by bug in Slip class. I've created issue https://github.com/rakudo/rakudo/issues/5138 that is currently open. I'll post an update once it is resolved.
EDIT 2: No, my code was not correct :) Check for my post for answer.
First of all - I had big misconception. I thought that all lines produced by parse-file
must be slipped into map block
like this:
@file-names.map( produce all lines here ).map( process all lines here );
And Slip
is a List
that tracks all elements. That is why I had big memory leak.
The solution is to create gather
-take
sequence inside map
but consume it outside map
:
@file-names.map( { parse-file( $_ ) } ).flat.map( { process-line( $_ ) } );
So now it is:
@file-names.map( construct sequence here ).(get items from sequence here).map( process all lines here );