node.jstypescriptnode-worker-threads

Typescript, Node, Two Worker Threads and one Main Thread with static Map


I have the following files, which result in an infinite loop through the workers if I even reference the static Map on the main thread - it doesn't even need to run the code referencing it.

Note: It actually "runs" - it just re-starts when done ad infinitum - with the increased memory as a result (I have to close the terminal window to free). Note: This code doesn't do a thing - it's just a demo reduced from the real code into what I hope is the minimum to expose the error and get an answer.

I'm having trouble finding any documentation about this as there are no errors or anything that I've found to guide the troubleshooting. Which could easily mean I just haven't put up the right guard-rail or search term to expose the issue (feeling kind of dumb about this).

The commented-out code in baseworker.ts is deliberate. The code with the commented-out lines does not cause the error - it just doesn't do the needed 'registration' of class reference step. However, if you uncomment function "registerWorkerName" in lines 24 - 28 of baseworker.ts and run this example, the error appears. As mentioned, you don't even need to call the function - just the reference to 'Main.baseWorkerMap' appears to be all that's needed, although when fully uncommented, the "registration" step is performed - over and over.

Here are the files:

// main.ts

import path from  'path';
import { Worker } from 'node:worker_threads';
import { BaseWorker } from './baseworker';

export class Main {
    static readonly baseWorkerPath = path.resolve(__dirname, 'baseworker.js');
    static readonly baseWorkerSource = path.resolve(__dirname, 'baseworker.ts');
    static readonly worker2Path = path.resolve(__dirname, 'worker2.js');
    static readonly worker2Source = path.resolve(__dirname, 'worker2.ts');

    static baseWorkerMap: Map<string, BaseWorker> = new Map<string, BaseWorker>();
    
    static async invokeWorker(workernameParm: string, workerPathParm: string, workerSourceParm: string): Promise<void> {
        let onlineResolve: (value: void | PromiseLike<void>) => void;
        const waitForOnline: Promise<void> = new Promise((resolve) => { onlineResolve = resolve; });

        const workerRef = new Worker(workerPathParm, {
            workerData: {
                path: workerSourceParm
            }
        });
        console.log();
        console.log(`Invoked worker '${workernameParm}'`);

        workerRef.on('error', (error) => console.error(`ERROR from worker '${workernameParm}': `, error));
        workerRef.on('exit', (code) => console.log(`EXIT worker '${workernameParm}': code `, code));
        workerRef.on('message', (message) => console.log(`Message from worker '${workernameParm}': `, message));
        workerRef.on('messageerror', (messageerror) => console.error(`Message ERROR from worker '${workernameParm}': `, messageerror));
        workerRef.on('online', () => { console.log(`Worker '${workernameParm}' online`); onlineResolve() });

        await waitForOnline;
    }
}

Main.invokeWorker('baseworker', Main.baseWorkerPath, Main.baseWorkerSource).then (
    () => Main.invokeWorker('worker2', Main.worker2Path, Main.worker2Source).then (
        () => { console.log(); console.log(`all done`); console.log(); }
    )
)
// baseworker.ts

import { isMainThread } from 'node:worker_threads';
import { Main } from './main';

export class BaseWorker {
    constructor(readonly name: string) {
        console.log(`finishing construction for class BaseWorker '${name}'`);
    }
}

export function initWorker<T extends BaseWorker>(isMainThreadParm: boolean, workername: string, instance: T) {
    checkIfWorkerThread(isMainThreadParm, workername);

    console.log();
    console.log(`Executing code in worker '${workername}'`);

    // registerAndListRegisteredWorkerNames(workername, instance);
}

// function registerAndListRegisteredWorkerNames<T extends BaseWorker>(workernameParm: string, instanceParm: T): void {
//     registerWorkerName<T>(workernameParm, instanceParm);
//     listRegisterdeNames();
// }

// function registerWorkerName<T extends BaseWorker>(workernameParm: string, instanceParm: T): void {
//     if (!Main.baseWorkerMap.get(workernameParm)) {
//         Main.baseWorkerMap.set(workernameParm, instanceParm);
//     }
// }

// function listRegisterdeNames(): void {
//     console.log();
//     console.log(`Current list of registered worker names:`);
//     const registeredWorkerNames = Main.baseWorkerMap.keys();
//     let nextRegisteredName = registeredWorkerNames.next();
//     while (!nextRegisteredName.done) {
//         console.log(`    ${nextRegisteredName.value}`);
//         nextRegisteredName = registeredWorkerNames.next();
//     }
//     console.log();
// }

function checkIfWorkerThread(isMainThreadParm: boolean, workernameParm: string): void {
    if (isMainThreadParm) {
        const errorMessage = `ERROR: ${workernameParm} is running in main thread, this should be a worker thread`;
        throw new Error(errorMessage);
    }
}

const workername = 'baseWorker'
console.log(`Starting code in worker '${workername}'`);
initWorker<BaseWorker>(isMainThread, workername, new BaseWorker(workername));
// worker2.ts

import { isMainThread } from 'node:worker_threads';
import { BaseWorker, initWorker } from "./baseworker";

class Worker2 extends BaseWorker {
    constructor(name: string) {
        super(name)
        console.log(`finishing constructor for '${name}'`);
    }
}

const workername = 'worker2';
console.log(`in '${workername}' directly`);
initWorker<Worker2>(isMainThread, workername, new BaseWorker(workername));

And the tsconfig.json used (renamed 'tsconfigquick.json'):

{
  "resolveJsonModule": true,
  "compilerOptions": {
    "outDir": ".",
    "target": "es5",
    "lib": ["dom"],
    "module": "NodeNext",
    "moduleResolution": "nodenext",
    "resolveJsonModule": true,
    "sourceMap": true,
    "allowSyntheticDefaultImports": true,
    "esModuleInterop": true,
    "forceConsistentCasingInFileNames": true,
    "strict": true,
    "skipLibCheck": true
  },
  "files": [
    "./main.ts",
    "./baseworker.ts",
    "./worker2.ts"
  ]
}

I'm running it with (in same directory as above files): "npx tsc --project .\tsconfigquick.json & npx tsx .\main.ts"

What's the problem with referencing a static object residing on the main thread from a worker thread? I'm speculating it's related to the fact that the static object is on the main thread and I'm executing from the worker threads - but then why does it actually half-work (it just doesn't know when to quit)?


Solution

  • It looks like your worker imports main which starts the worker again, resulting in that infinite loop.

    If your code is meant to have 1 'parent' (main.ts) and 2 workers (baseworker.ts and worker2.ts), then these 3 files should never be included by other files.

    So that means that in your case, if worker2.ts needs something that exists in baseworker.ts, then you need to move that logic into new files that are included by everything.

    A general good rule is to create files that (mainly) don't have side effects and only export things like classes and functions. This is part of your library and included by other files, and entry-point files that never get included by anything else