As the title says, I'm experiencing a behavior that I find pretty strange regarding a re-implementation of mine of the MessageChannel provided by Node.js.
The goal of my implementation is to provide better performance for sending UTF-16 strings to another thread.
QUICK DISCLAIMER: This is NOT meant for production use (at least for now). I am just experimenting with an algorithm/proof-of-concept and want to implement it myself. Therefore, I won't accept answers along the lines of "why are you re-inventing the wheel" or "there is already this npm package that perfectly fits your use-case".
My idea is to have two handles: one for writing, which is managed and used by the main thread, and the other one for reading, which I hand to a Worker.
They communicate over a SharedArrayBuffer using two typed arrays: a Uint16Array, which spans across the entire buffer - 8 bytes, and a Int32Array, which spans across those last 8 bytes and serves to store the writer and reader indexes respectively to provide a synchronization mechanism.
The writing handle enqueues the strings it gets to write by pushing them inside an array. After setImmediate fires, it flushes the strings by writing their length in the shared buffer at the current writing index and then encoding the strings themselves one code point at a time. The process is repeated for each string until the queue is eventually cleared while also wrapping around the end of the buffer if necessary.
This means that if one were to write "Hello" and " world!" as two separate strings, at the end of the flush operation, the buffer would look like this: [5, 72, 101, 108, 108, 111, 7, 32, 119, 111, 114, 108, 100, 33, ..., 14, 0] Where the first element (5) is the length of "Hello", the following 5 numbers are the UTF-16 encoded characters, the following element (7) is the length of " world!", the following 7 numbers are again the encoded string, and the last two numbers (14 and 0) are the writer and reader indexes respectively. After this flush, the next writing operation will take place starting from the 15th element of the buffer.
The reading handle is just a generator that waits to be notified of a change in writer's index, which happens after the queue is flushed. When this happens, the reading handle reads the value located at the current reading index as the length of the next piece of string to yield. It then decodes and yields the first string of the original queue. The process goes on until the reading handle catches up with the writing handle, at which point it goes to sleep and waits to be notified to start reading again.
import {nextTick} from "node:process"
import {isMainThread, Worker, workerData, type WorkerOptions as _WO} from "node:worker_threads"
/**
* Key used to access the {@link SharedArrayBuffer} in worker data.
*/
const kSharedBuffer = "__$SharedBuffer$" as const
export class WorkerStream {
////////////////////////////////////////////////////////////////////////////
// region Buffer
////////////////////////////////////////////////////////////////////////////
/**
* The writable section of the {@link SharedArrayBuffer}.
*/
readonly #buffer: Uint16Array
/**
* The section of the {@link SharedArrayBuffer} used to store the atomic W/R indexes
* of the {@link WorkerStream} and the {@link WorkerStreamHandle} respectively.
*/
readonly #stateBuffer: Int32Array
/**
* Wraps an index around the buffer.
*
* @throws {RangeError} If the index is negative.
* @throws {RangeError} If the index needs to be wrapped around more than once.
*/
private wrapIndex(index: number): number {
if (index < 0) {
throw new RangeError("Index cannot be negative")
}
const {length} = this.#buffer
if (index >= length * 2) {
throw new RangeError("Index cannot be wrapped around more than once")
}
return (index >= length) ? index - length : index
}
////////////////////////////////////////////////////////////////////////////
// endregion
////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////
// region Data Queue
////////////////////////////////////////////////////////////////////////////
/**
* The maximum length of a single string in UTF-16 code units.
*/
static readonly MAX_STRING_LENGTH = 10_000
/**
* The queue of strings to be written to the {@link WorkerStreamHandle}.
*/
readonly #dataQueue: string[] = []
/**
* The number of cells in the buffer required to flush the data queue.
*/
#dataQueueSize = 0
/**
* Flushes the queue of strings to the {@link buffer}.
*/
flush() {
const {length: bufferLength} = this.#buffer
let [w] = this.#stateBuffer
while (this.#dataQueue.length > 0) {
const s = this.#dataQueue.shift()!
this.#buffer[w] = s.length
for (let i = 0; i < s.length; i++) {
if (++w === bufferLength) {
w = 0
}
this.#buffer[w] = s.charCodeAt(i)
}
if (++w === bufferLength) {
w = 0
}
}
this.#dataQueueSize = 0
Atomics.store(this.#stateBuffer, 0, w)
Atomics.notify(this.#stateBuffer, 0)
}
////////////////////////////////////////////////////////////////////////////
// endregion
////////////////////////////////////////////////////////////////////////////
/**
* @throws {RangeError} If the buffer size is less than 2 * {@link Int32Array.BYTES_PER_ELEMENT}.
*/
constructor(
workerPath: URL | string,
workerOptions: Omit<_WO, "workerData"> & { workerData?: Record<string | symbol, any> } = {},
unrefWorker = true,
bufferSize: number = 4 * 1024 * 1024,
) {
if (bufferSize < 2 * Int32Array.BYTES_PER_ELEMENT) {
throw new RangeError("Buffer size must be at least 2 * Int32Array.BYTES_PER_ELEMENT")
}
const buffer = new SharedArrayBuffer(bufferSize)
this.#buffer = new Uint16Array(
buffer,
0,
(bufferSize - 2 * Int32Array.BYTES_PER_ELEMENT) / Uint16Array.BYTES_PER_ELEMENT,
)
this.#stateBuffer = new Int32Array(buffer, bufferSize - 2 * Int32Array.BYTES_PER_ELEMENT)
workerOptions.workerData ??= {}
workerOptions.workerData[kSharedBuffer] = buffer
const worker = new Worker(workerPath, workerOptions)
worker.once("exit", () => {
throw new Error("Worker exited unexpectedly")
})
worker.once("error", (err) => {
throw err
})
if (unrefWorker) {
worker.unref()
}
}
////////////////////////////////////////////////////////////////////////////
// region Streaming
////////////////////////////////////////////////////////////////////////////
/**
* Writes a string to the {@link WorkerStream}.
*
* # Summary
*
* This method enqueues a string to be streamed to the {@link WorkerStreamHandle}
* as soon as {@link setImmediate} allows it.
*
* The string is encoded using UTF-16, so it can contain any Unicode character,
* and its length is limited to {@link MAX_STRING_LENGTH}.
*
* If there is not enough space in the buffer to write the string,
* calling this method will return `false` and the string will not be enqueued.
* You are advised to check with {@link canWrite} before calling this method.
*/
write(s: string): boolean {
if (!this.canWrite(s)) {
return false
}
this.#dataQueue.push(s)
if (this.#dataQueueSize === 0) {
this.#dataQueueSize++
setImmediate(() => this.flush())
}
this.#dataQueueSize += s.length + 1
return true
}
/**
* Determines whether there is enough space in the buffer to write a string.
*/
canWrite({length}: string): boolean {
if (length > WorkerStream.MAX_STRING_LENGTH) {
return false
}
const w = this.#stateBuffer[0], r = Atomics.load(this.#stateBuffer, 1)
const end = this.wrapIndex(w + length + 1)
const didNotWrapAroundBuffer = end > w
const endsBehindReader = end < r
const wasAlreadyBehindReader = w < r
if (wasAlreadyBehindReader) {
return endsBehindReader && didNotWrapAroundBuffer
} else {
return endsBehindReader || didNotWrapAroundBuffer
}
}
////////////////////////////////////////////////////////////////////////////
// endregion
////////////////////////////////////////////////////////////////////////////
}
export class WorkerStreamHandle {
////////////////////////////////////////////////////////////////////////////
// region Buffer
////////////////////////////////////////////////////////////////////////////
/**
* The readable section of the {@link SharedArrayBuffer}.
*/
readonly #buffer: Uint16Array
/**
* The portion of the {@link SharedArrayBuffer} used to store the atomic W/R indexes
* of the {@link WorkerStream} and the {@link WorkerStreamHandle} respectively.
*/
readonly #stateBuffer: Int32Array
////////////////////////////////////////////////////////////////////////////
// endregion
////////////////////////////////////////////////////////////////////////////
/**
* @throws {Error} If instantiated from the main thread.
* @throws {Error} If the {@link SharedArrayBuffer} is not found in {@link workerData}.
*/
constructor() {
if (isMainThread) {
throw new Error("WorkerStreamHandle can only be used inside a worker")
}
const buffer = workerData[kSharedBuffer]
if (!(buffer instanceof SharedArrayBuffer)) {
throw new Error("SharedBuffer not found in workerData; did you instantiate WorkerStreamHandle from the right worker?")
}
delete workerData[kSharedBuffer]
const {byteLength: bufferSize} = buffer
this.#buffer = new Uint16Array(
buffer,
0,
(bufferSize - 2 * Int32Array.BYTES_PER_ELEMENT) / Uint16Array.BYTES_PER_ELEMENT,
)
this.#stateBuffer = new Int32Array(buffer, bufferSize - 2 * Int32Array.BYTES_PER_ELEMENT)
}
////////////////////////////////////////////////////////////////////////////
// region Streaming
////////////////////////////////////////////////////////////////////////////
#isStreaming = false
async* [Symbol.asyncIterator](): AsyncIterableIterator<string> {
if (this.#isStreaming) {
throw new Error("Already streaming data")
}
this.#isStreaming = true
const {length: bufferLength} = this.#buffer
while (true) {
let r = this.#stateBuffer[1]
await new Promise<void>((resolve) => {
nextTick(() => {
Atomics.wait(this.#stateBuffer, 0, r)
resolve()
})
})
const length = this.#buffer[r]
const buffer = new Uint16Array(length)
for (let i = 0; i < length; i++) {
if (++r >= bufferLength) {
r = 0
}
buffer[i] = this.#buffer[r]
}
this.#stateBuffer[1] = (++r >= bufferLength) ? 0 : r
yield String.fromCharCode(...buffer)
}
}
static async* incoming(): AsyncIterableIterator<string> {
yield* new WorkerStreamHandle()
}
////////////////////////////////////////////////////////////////////////////
// endregion
////////////////////////////////////////////////////////////////////////////
}
PS: I'm using private fields to make the buffer inaccessible to the outside to prevent malicious interactions to the underlying buffer
This implementation worked fine until yesterday: to make sure that the underlying worker was receiving the stream of text, I've written these tests:
logger.test.ts
import {equal, ok, throws} from "node:assert/strict"
import {execSync} from "node:child_process"
import {MessageChannel, Worker} from "node:worker_threads"
import {WorkerStream} from "../src/logger/stream"
const testingWorkerPath = new URL("./data/worker.js", import.meta.url)
describe("WorkerStream", () => {
before(() => {
// Suppress the "Worker exited unexpectedly" error
// so that we can manually close Workers to stop tests
// without having Mocha crying about uncaught exceptions
const original = process.listeners("uncaughtException").pop()!
process.removeListener("uncaughtException", original)
process.on("uncaughtException", (err) => {
if (err.message !== "Worker exited unexpectedly") {
original(err, "uncaughtException")
}
})
})
it("writes data to the worker", async () => {
const {port1: workerPort, port2: parentPort} = new MessageChannel()
const stream = new WorkerStream(
testingWorkerPath,
{workerData: {parentPort}, transferList: [parentPort]},
)
stream.write("Hello World!")
const receivedMessage = await new Promise((resolve) => {
workerPort.on("message", resolve)
})
equal(receivedMessage, "Hello World!")
stream.write("done")
})
it("wraps around when it reaches the end of the buffer", async () => {
const DATA = [
"ab".repeat(15),
"bc".repeat(25),
"cd".repeat(35),
"de".repeat(35),
"ef".repeat(35),
"fg".repeat(35),
]
const {port1: workerPort, port2: parentPort} = new MessageChannel()
const stream = new WorkerStream(
testingWorkerPath,
{workerData: {parentPort}, transferList: [parentPort]},
true,
1024,
)
for (const data of DATA) {
const iterations = Math.floor(1024 / (data.length * 2) - 1)
for (let _ = 0; _ < 15; _++) {
for (let i = 0; i < iterations; i++) {
ok(stream.write(data))
}
for (let i = 0; i < iterations; i++) {
equal(
await new Promise((resolve) => {
workerPort.on("message", resolve)
}),
data,
)
}
}
}
stream.write("done")
})
})
data/worker.js
import {isMainThread, workerData} from "node:worker_threads"
import {WorkerStreamHandle} from "../../src/logger/stream.js"
if (isMainThread) {
throw new Error("This file must be run as a worker")
}
/**
* @type {import("node:worker_threads").MessagePort}
*/
const parentPort = workerData.parentPort
/**
* Incoming stream of text.
*/
for await (const chunk of WorkerStreamHandle.incoming()) {
if (chunk === "done") {
break
}
parentPort.postMessage(chunk)
}
All the tests above pass like a charm. Every message that goes into a worker is always reported correctly. No data corruption, no data races, no issues whatsoever even when wrapping around the buffer.
The issue arises when I try to use the WorkerStream in a more """realistic""" environment like this:
import {isMainThread} from "node:worker_threads"
import {WorkerStreamHandle} from "./logger/stream"
if (isMainThread) {
throw new Error("This file must be run as a worker")
}
for await (const chunk of WorkerStreamHandle.incoming()) {
console.log(chunk)
}
When I tried to run this worker by sending it 2 messages, only 1 seemed to make it to the other side. In short, the issue is that in any setup other than the testing one only the first string ever makes it to the other side of the channel; all other strings seem to "disappear".
next()
method and, while I was "dismounting" the generator I tried to call its next()
method twice manually in the worker instead of using a loop statement (for await (of)
) and, to my extreme surprise, both messages where logged successfully.for await (of)
loop for a while (true)
loop that manually awaits the next value... AND THIS TIME IT DIDN'T WORK!Finally, after this pretty long introduction, my questions are:
How in the world the reading handle hangs up/blocks/paralyzes the worker thread when used inside a for/while loop without yielding anything after the first string but works like a charm when iterated and awaited manually without any looping system? Why does it work while being debugged or during tests? Is there any synchronization issue or is it something much worse related to the algorithm's design itself? Should I empty the buffer all at once because it is right to not be working with one string at a time or am I right to think that it should work even when decoding and reading one string at a time?
I finally figured it out!
The "culprit" was the system that handles backpressure built into Node.js Streams.
All I needed to do was transforming the reading loop in the worker into a recursive function proxied by setImmediate.