memory-leakslazy-evaluationrakulazy-sequencesrakudo

How to slip gather-take in lazy manner into map?


I need to construct following flow:

However I have no idea how to properly inject gather-take into map:

sub MAIN ( *@file-names ) {

    @file-names.map( { slip parse-file( $_ ) } ).map( { process-line( $_ ) } );
}

sub parse-file ( $file-name ) {
    return gather for $file-name.IO.lines -> $line {
        take $line if $line ~~ /a/; # dummy example logic
    }
}

sub process-line ( $line ) {
    say $line;  # dummy example logic
}

This code works but leaks memory like crazy. I assume slip makes gather-take eager? Or slip does not mark Seq items as consumed? Is there a way to slip gather-take result into map in lazy manner?

BTW: My intent is to parallelize each step with race later - so for example I have 2 files parsed at the same time producing lines for 10 line processors. Generally speaking I'm trying to figure out easiest way of composing such cascade flows. I've tried Channels to connect each processing step but they have no built-in pushback. If you have any other patterns for such flows then comments are more than welcomed.

EDIT 1:

I think my code is correct, and memory leak is not caused by bad logic but rather by bug in Slip class. I've created issue https://github.com/rakudo/rakudo/issues/5138 that is currently open. I'll post an update once it is resolved.

EDIT 2: No, my code was not correct :) Check for my post for answer.


Solution

  • First of all - I had big misconception. I thought that all lines produced by parse-file must be slipped into map block like this:

    @file-names.map( produce all lines here ).map( process all lines here );
    

    And Slip is a List that tracks all elements. That is why I had big memory leak.

    The solution is to create gather-take sequence inside map but consume it outside map:

    @file-names.map( { parse-file( $_ ) } ).flat.map( { process-line( $_ ) } );
    

    So now it is:

    @file-names.map( construct sequence here ).(get items from sequence here).map( process all lines here );