typescriptbigdata

How to Build a Type-Safe Data Processing Pipeline in TypeScript?


big data apps often involve complex data transformations - filtering, mapping, aggregating, etc. Ensuring type safety in these dynamic pipelines can be tricky.

How can a flexible, type-safe function be created, that processes data through a sequence of transformation steps, ensuring that each step gets the correctly typed input from the previous one?

Here's an example of a transformation pipeline where each function modifies the data step by step:

type Transformation<T, U> = (input: T) => U;


// todo: this must be improvable 
function createPipeline<T, Steps extends [...Transformation<any, any>[]]>(
  initialData: T,
  ...steps: Steps
): ReturnType<Steps[number]> {
  return steps.reduce((data, step) => step(data), initialData);
}

// Example usage:
const parseNumbers = (data: string[]) => data.map(Number);
const filterValidNumbers = (data: number[]) => data.filter(n => !isNaN(n));
const sumNumbers = (data: number[]) => data.reduce((sum, n) => sum + n, 0);

const result = createPipeline(['1', '2', 'three'], parseNumbers, filterValidNumbers, sumNumbers);
// Expected output: 3

TS doesn’t enforce that the output type of one step matches the input type of the next, leading to potential runtime errors.

With newish features like variadic tuple types and the satisfies operator, is there a way to make TypeScript statically verify that each transformation is applied in the correct sequence?


Solution

  • You can use a recursive type to check that the input or a return type matches the next function input:

    Playground

    type Transformation<T, U> = (input: T) => U;
    
    type Steps<T, P extends Transformation<any, any>[], O = P> = 
        P extends [] ? O : P extends [(input: T) => infer R, ...infer B extends Transformation<any, any>[]] ? Steps<R, B, O> : never;
        
    type Last<T extends any[]> = T extends [infer A, ...infer B] ? B extends [] ? A : Last<B> : never;
    
    function createPipeline<T, S extends [...Transformation<any, any>[]]>(
      initialData: T,
      ...steps: Steps<T, S>
    ) {
      return steps.reduce((data, step) => step(data), initialData) as ReturnType<Last<S>>;
    }
    
    
    const parseNumbers = (data: string[]) => data.map(Number);
    const filterValidNumbers = (data: number[]) => data.filter(n => !isNaN(n));
    const sumNumbers = (data: number[]) => data.reduce((sum, n) => sum + n, 0);
    
    const result = createPipeline(['1', '2', 'three'], parseNumbers, filterValidNumbers, sumNumbers); // number
    const result2 = createPipeline(['1', '2', 'three'], filterValidNumbers, parseNumbers, sumNumbers); // error