node.jspipebigdatastdindeno

Streaming large dataset to child process (as JSON)


I'm trying to send a large amount of data that is stored in memory to a child process. Specifically I have a large dataset represented as JSON in Node.js which I want to send to a child process I am spawning (Deno).

This is the most basic attempt I've made:

const deno = spawn('deno', ['run', '--allow-all', '--unstable-bare-node-builtins', '--unstable-sloppy-imports', `--allow-read=${cwd}`, filename], {
    stdio: ['pipe', 'pipe', 'pipe'],
    cwd,
    env: {
        PATH: process.env.PATH,
        DENO_NO_PACKAGE_JSON: '1'
    }
});

const payload = JSON.stringify({
    some: 'data'
});

deno.stdin.write(payload);

However I've also tried observing the drain event while writing:

// Recursive function to write a chunk to the child process.
const writeChunk = async (chunk: string) => {
    const written = deno.stdin.write(chunk);

    if (!written) {
        // Wait for drain if necessary
        await new Promise((resolve) => deno.stdin.once('drain', resolve));
        return writeChunk(chunk);
    }
};

This seemed to result in more data but still I am clearly missing chunks of the JSON or they are out of order because I cannot successfully parse it in the Deno process.

Additionally I tried just piping the output of cat dataset.json but Deno complains with an error ENOTTY: Not a Typewriter. Upon googling and "ChatGPTing" this I couldn't resolve it.

FYI: I'm trying to use STDIN because it seems like the most efficient way to transfer this data without it leaving memory. Writing to a disk is my last resort as that would be much slower I imagine.

And so I'm not sure if it's the way I'm sending the data over STDIN or how i'm receiving it but I must be doing something wrong. Can anyone point me in the right direction here? Is there just a better way to do this?


Solution

  • Although I'm not sure I fully understand everything you've written in the question… it seems like you're asking for an example showing how to pipe a stream of JSON data from a Node.js process to a Deno child process — and parse that data in the Deno process (…and perhaps pipe data back and read it in Node).

    Below I put together a minimal, reproducible example that you can use as a model. It includes types and comments explaining each step.

    Additionally, here are some APIs used in the Node.js code that might not already be familiar:


    Files:

    ./package.json:

    {
      "name": "so-78915710",
      "version": "0.1.0",
      "type": "module",
      "scripts": {
        "check:types": "tsc --noEmit",
        "compile": "tsc",
        "test": "node --test dist/test.js"
      },
      "engines": {
        "node": ">=20.17.0"
      },
      "devDependencies": {
        "@types/node": "^22.5.0",
        "typescript": "^5.5.4"
      },
      "license": "MIT"
    }
    

    ./tsconfig.json:

    {
      "$schema": "https://json.schemastore.org/tsconfig",
    
      "compilerOptions": {
        "strict": true,
        "exactOptionalPropertyTypes": true,
        "noImplicitOverride": true,
        "noImplicitReturns": true,
        "noUncheckedIndexedAccess": true,
    
        "removeComments": true,
        "inlineSourceMap": true,
        "inlineSources": true,
    
        "forceConsistentCasingInFileNames": true,
    
        "module": "NodeNext",
        "moduleDetection": "force",
        "moduleResolution": "NodeNext",
        "isolatedModules": true,
        "esModuleInterop": true,
    
        "target": "ES2022",
        "lib": ["ESNext"],
    
        "isolatedDeclarations": true,
        "declaration": true,
    
        "outDir": "./dist"
      },
    
      "include": ["./src/**/*"],
      "exclude": ["./src/stdio_json_buffer_echo.ts"]
    }
    

    ./src/example.ts:

    import { type SpawnOptionsWithoutStdio, spawn } from "node:child_process";
    import { Writable } from "node:stream";
    
    type JsonValue =
      | boolean
      | number
      | null
      | string
      | JsonValue[]
      | { [key: string]: JsonValue | undefined };
    
    async function example(): Promise<JsonValue> {
      const env: NodeJS.ProcessEnv = {
        // Shown in the code in your question, but unused in this example:
        // PATH: process.env.PATH,
        DENO_NO_PACKAGE_JSON: "1",
      };
    
      const spawnOptions: SpawnOptionsWithoutStdio = {
        // cwd: process.cwd(), // Default
        env,
        // stdio: ["pipe", "pipe", "pipe"], // Default
      };
    
      const denoModuleFilePath = import.meta.resolve(
        "../src/stdio_json_buffer_echo.ts",
      );
    
      const denoArgs = ["run", denoModuleFilePath] satisfies readonly string[];
      using childProcess = spawn("deno", denoArgs, spawnOptions);
    
      // The data in memory that can be serialized as JSON
      const serializable = { foo: "bar" };
    
      // A (WHATWG standard) ReadableStream representing a copy of that data serialized as JSON
      const jsonByteStream = new Blob([JSON.stringify(serializable)]).stream();
    
      // Convert the stdin of the child process to a (WHATWG standard) WritableStream
      const writableStream = Writable.toWeb(childProcess.stdin);
    
      // Wait for both of the following async operations to complete (read + write),
      // and assign the resolved value from the first operation to a varaible:
      const [parsed] = await Promise.all([
        // Collect the byte stream from the child process and parse as JSON
        new Response(childProcess.stdout).json() as Promise<JsonValue>,
        // Write the JSON byte stream to the stdin of the child process
        jsonByteStream.pipeTo(writableStream),
      ] as const);
    
      return parsed;
    }
    
    export { example, type JsonValue };
    

    ./src/stdio_json_buffer_echo.ts (this is the module that will run in Deno):

    // Collect the byte stream and parse as JSON
    const parsed = await new Response(Deno.stdin.readable).json();
    
    // Re-serialize…
    const json = JSON.stringify(parsed);
    
    // …and emit to stdout
    await new Blob([json]).stream().pipeTo(Deno.stdout.writable);
    

    ./src/test.ts:

    import { deepEqual as assertDeepEqual } from "node:assert/strict";
    import { test } from "node:test";
    
    import { example } from "./example.js";
    
    test("deno child process returns expected JSON data", async () => {
      const expected = { foo: "bar" };
      const actual = await example();
      assertDeepEqual(actual, expected);
    });
    

    Software versions:

    % node --version
    v20.17.0
    
    % npm --version
    10.8.2
    
    % deno --version
    deno 1.46.1 (stable, release, aarch64-apple-darwin)
    v8 12.9.202.2-rusty
    typescript 5.5.2
    

    Running the example test:

    % npm install
    
    added 3 packages, and audited 4 packages in 431ms
    
    found 0 vulnerabilities
    
    % npm run compile && npm run test
    
    > so-78915710@0.1.0 compile
    > tsc
    
    
    > so-78915710@0.1.0 test
    > node --test dist/test.js
    
    ✔ deno child process returns expected JSON data (21.365625ms)
    ℹ tests 1
    ℹ suites 0
    ℹ pass 1
    ℹ fail 0
    ℹ cancelled 0
    ℹ skipped 0
    ℹ todo 0
    ℹ duration_ms 53.168541