typescriptfunctional-programmingfp-ts

Running an array of TaskEithers in parallel, but continue if 1 or more task fails


I have to make an array of IO calls in parallel, and merge the contents of the call if successful. If one fails the others get processed as per normal, but an error message.

My thought process on how this can be implemented:

Array<TE<E, A>> -> TE<E, Array<A>> -> TE<E, MergedA> -> [E, A]

What I'm currently doing:

I am currently sequencing an array of TE, but any failure in the chain will yield a left.

pipe(
sequenceT(TE.taskEither)(arrayofTE), //TE<E,A>[] -> TE<E,A[]>
TE.map(mergeFn), //TE<E, A[]> -> TE<E, MergedA> 
???
)

How can I stop the short circuiting?


Solution

  • You can pass T.task instead of TE.taskEither to sequence/sequenceT (docs):

    Action: execute an array of tasks in parallel, collecting all failures and successes

    TaskEither: array.sequence(T.task)(taskEithers) - same for sequenceT

    sequence: Running tasks with same type in parallel

    import { pipeable as P, taskEither as TE, task as T, array as A, either as E } from "fp-ts";
    
    const arrayofTE: TE.TaskEither<string, number>[] = [
      TE.right(1),
      TE.right(2),
      TE.left("Oh shit")
    ];
    
    const run = P.pipe(
      // change to T.task instead of TE.taskEither here
      A.array.sequence(T.task)(arrayofTE),
      mergeFn
    );
    
    run(); // run side effect
    // returns Promise<{"errors":["Oh shit"],"results":[1,2]}>
    
    // whatever merged result you want to have; this one collects all errors and all results
    declare function mergeFn(te: T.Task<E.Either<string, number>[]>): T.Task<Results> 
    
    type Results = { errors: string[]; results: number[] };
    

    sequenceT: Running tasks with different type in parallel

    import { apply as AP /* and others above */ } from "fp-ts";
    
    // Here, TaskEither result can be number | boolean (success case), string on error
    const arrayofTE = [TE.right(1), TE.right(true), TE.left("Oh shit")] as const;
    
    const run = P.pipe(
      AP.sequenceT(T.task)(...arrayofTE), // we use sequenceT here and pass T.task again
      mergeFn
    );
    
    declare function mergeFn(a: T.Task<E.Either<string, number | boolean>[]>): T.Task<Results>
    

    Here are sandboxes with mergeFn implementation to play around: sequence , sequenceT.