javascriptnode.jsasynchronouseventsterminate

How can I terminate a function process when an event occurred


everyone, there is a task, where I need to terminate a function process when an event occurs.

Normally, when throwing an error in anything inside the process, for example, throwing an error from the delay in any step of the next steps, the process will terminate without any problems.

But if I created an event listener, that's responsible for terminating the process. If I make throw error inside it the process will keep running without any problems in the process. and it will be considered as an Unhandled Rejection or DOMException.

(() => {
  class EventEmitter {
    constructor() {
      this.events = {};
    }

    on(event, listener) {
      if (!this.events[event]) {
        this.events[event] = [];
      }

      this.events[event].push(listener);

      // Return a function that can be used to remove this specific listener
      return () => {
        this.off(event, listener);
      };
    }

    emit(event, ...args) {
      if (this.events[event]) {
        this.events[event].forEach(listener => listener(...args));
      }
    }

    removeListeners(event) {
      delete this.events[event];
    }

    off(event, listener) {
      if (this.events[event]) {
        this.events[event] = this.events[event].filter(existingListener => existingListener !== listener);
      }
    }
  }

  const delay = (ms) => new Promise(resolve => setTimeout(resolve, ms));
  const processEmitter = new EventEmitter();

  async function startProcess(params) {
    try {
      const removeListener = processEmitter.on('stop', async () => {
        removeListener(); 
        throw new Error(`Process stopped due to external event`); 
      });

      await delay(2000);
      console.log('finished step 1');
      await delay(2000);
      console.log('finished step 2');
      await delay(2000);
      console.log('finished step 3');
      await delay(2000);
      console.log('finished step 4');
      await delay(2000);
      console.log('finished step 5');
      await delay(2000);
      console.log('finished step 6');
    } catch (err) {
      console.log('Process terminated:', err.message);
    } finally {
      console.log('done !!')
    }
  }

  startProcess();

  setTimeout(() => {
    processEmitter.emit('stop');
  }, 5000);
})();

(() =>  {
  const processEmitter = new EventEmitter();
  async function startProcess(params) {
    try {
      // use proxy as a middleware when the value change
      const abortHandler = new Proxy({ msg: '' }, {
        set(target, key, value) {
          target[key] = value;
          throw new Error(value);
        }
      });

      const removeListener = processEmitter.on('stop', async () => {
        removeListener(); 
        abortHandler.msg = `Process stopped due to external event`; 
      });

      await delay(2000);
      console.log('finished step 1');
      await delay(2000);
      console.log('finished step 2');
      await delay(2000);
      console.log('finished step 3');
      await delay(2000);
      console.log('finished step 4');
      await delay(2000);
      console.log('finished step 5');
      await delay(2000);
      console.log('finished step 6');
    } catch (err) {
      console.log('Process terminated:', err.message);
    } finally {
      console.log('done !!')
    }
  }

  startProcess();

  setTimeout(() => {
    processEmitter.emit('stop');
  }, 5000);
})();
(() =>  {
  const processEmitter = new EventEmitter();
  async function startProcess(params) {
    try {
      // use proxy as a middleware when the value change
      const reject = msg => {
        throw new Error(msg);
      } 

      const removeListener = processEmitter.on('stop', async () => {
        removeListener(); 
        reject(`Process stopped due to external event`); 
      });

      await delay(2000);
      console.log('finished step 1');
      await delay(2000);
      console.log('finished step 2');
      await delay(2000);
      console.log('finished step 3');
      await delay(2000);
      console.log('finished step 4');
      await delay(2000);
      console.log('finished step 5');
      await delay(2000);
      console.log('finished step 6');
    } catch (err) {
      console.log('Process terminated:', err.message);
    } finally {
      console.log('done !!')
    }
  }

  startProcess();

  setTimeout(() => {
    processEmitter.emit('stop');
  }, 5000);
})();
(() => {
  const processEmitter = new EventEmitter();
  async function startProcess(params) {
    return new Promise(async (_resolve, reject) => {
      try {
        const removeListener = processEmitter.on('stop', async () => {
          removeListener();
          reject(`Process stopped due to external event`);
        });

        await delay(2000);
        console.log('finished step 1');
        await delay(2000);
        console.log('finished step 2');
        await delay(2000);
        console.log('finished step 3');
        await delay(2000);
        console.log('finished step 4');
        await delay(2000);
        console.log('finished step 5');
        await delay(2000);
        console.log('finished step 6');
      } catch (err) {
        console.log('Process terminated:', err.message);
      } finally {
        console.log('done !!')
      }
    })
  }

  startProcess();

  setTimeout(() => {
    processEmitter.emit('stop');
  }, 5000);
})();

(() =>  {
  const delay = (ms, signal) => {
    if (signal?.aborted) {
      throw new Error(signal?.reason || 'Aborted')
    }

    return new Promise(resolve => setTimeout(resolve, ms))
  };

  const processEmitter = new EventEmitter();
  const controller = new AbortController();
  const { signal } = controller;

  async function startProcess(params) {
    try {
      const removeListener = processEmitter.on('stop', async () => {
        removeListener();
        controller.abort('Process stopped due to external event');
      });

      await delay(2000, signal);
      console.log('finished step 1');
      await delay(2000, signal);
      console.log('finished step 2');
      await delay(2000, signal);
      console.log('finished step 3');
      await delay(2000, signal);
      console.log('finished step 4');
      await delay(2000, signal);
      console.log('finished step 5');
      await delay(2000, signal);
      console.log('finished step 6');
    } catch (err) {
      console.log('Process terminated:', err.message);
    } finally {
      console.log('done !!')
    }
  }

  startProcess();

  setTimeout(() => {
    processEmitter.emit('stop');
  }, 5000);
})();
(() =>  {
  const delay = (ms, err) => {
    if (err) {
      throw err;
    }

    return new Promise(resolve => setTimeout(resolve, ms))
  };

  const processEmitter = new EventEmitter();

  async function startProcess(params) {
    try {
      let err = null;
      const removeListener = processEmitter.on('stop', async () => {
        removeListener();
        err = new Error('Process stopped due to external event');
      });

      await delay(2000, err);
      console.log('finished step 1');
      await delay(2000, err);
      console.log('finished step 2');
      await delay(2000, err);
      console.log('finished step 3');
      await delay(2000, err);
      console.log('finished step 4');
      await delay(2000, err);
      console.log('finished step 5');
      await delay(2000, err);
      console.log('finished step 6');
    } catch (err) {
      console.log('Process terminated:', err.message);
    } finally {
      console.log('done !!')
    }
  }

  startProcess();

  setTimeout(() => {
    processEmitter.emit('stop');
  }, 5000);
})();

Solution

  • As an alternative you can use my library which provides a cancelable promise (Live Demo):

    import { AxiosPromise } from "axios-promise";
    
    const delay = (ms) => {
      return new Promise((resolve) => setTimeout(resolve, ms));
    };
    
    const startProcess = AxiosPromise.promisify(function* (params) {
      try {
        yield delay(2000);
        console.log("finished step 1");
        yield delay(2000);
        console.log("finished step 2");
        yield delay(2000);
        console.log("finished step 3");
        yield delay(2000);
        console.log("finished step 4");
        yield delay(2000);
        console.log("finished step 5");
        yield delay(2000);
        console.log("finished step 6");
      } catch (err) {
        console.log("Process terminated:", err.message);
      } finally {
        console.log("done !!");
      }
    });
    
    const promise = startProcess("foo");
    
    setTimeout(() => {
      promise.cancel("stop");
    }, 5000);
    

    Console output:

    Restarting 'index.js'
    finished step 1
    finished step 2
    Process terminated: stop
    done !!
    Completed running 'index.js'