Concurrency vs Parallelism

Node.js has long excelled at concurrency. With the recent release of Node 13.0, Node now has a stable answer to parallelism as well. 

Concurrency can be thought of as switching between async processes, which all take turns executing, and, while idle, return control back to the event loop. On the other hand, parallelism is the ability for a process to separate and run simultaneously on multiple threads. There are other solutions in JavaScript that have tried to address this problem. For an in-depth comparison, I found this article useful.

The Master Script

The master script must do 3 things

  1. Create the workers by referencing the js file for the worker. 
        const worker = new WorkerThread.Worker(join(__dirname, './worker.js'))
  2. Send messages to the workers initiating work. A message is just a JavaScript object, so if you need to customize the behavior of the worker, you can include any categorization, data, etc.
  3.     worker.postMessage({foo:"stuff"});
  4. Register responses to the actions. Any response from the worker will be a message event, returning an object. You could return a string or a more complex object with the results of whatever operations the worker has performed. 
  5.     worker.on('message', (message) => { // do stuff with the response }
    

You may also like: Parallelism and Concurrency in Python.

Now, try combining those together into a script

const { join } = require('path');
const WorkerThread = require('worker_threads');

const THREAD_COUNT = 30;

/**
 * before running Modify THREAD_COUNT
 */
(async () => {

  // Setup whatever data needed to run your jobs. 
  const users = await getAllUsers();

  // Define functions to handle the different messages passed by the workers
  function handleStatusReport(workerNumber, report) {
    console.log(`the worker:${workerNumber} says`, report.body || report);
  }

  function handleWorkerFinished(worker, workerNumber, message) {
    console.log(`done with ${JSON.stringify(message.body)}!`);
    if (i < users.length) {
      worker.postMessage(users[i]);
      i += 1;
    } else {
      console.log(`Worker number ${workerNumber} completed working!`);
    }
  }

  //Spin up our initial batch of workers... we will reuse these workers by sending more work as they finish
  for (let j = 0; j < Math.min(THREAD_COUNT, users.length); j += 1) {
    const worker = new WorkerThread.Worker(join(__dirname, './worker.js'));
    console.log(`Running ${i} of ${users.length}`);
    worker.postMessage(users[i]);
    i += 1;

    //Listen on messages from the worker
    worker.on('message', (messageBody) => {
      //Switch on values in message body, to support different types of message
      if (messageBody.type === 'done') {
        handleWorkerFinished(worker, j, messageBody);
      } else {
        handleStatusReport(j, messageBody);
      }
    });
  }
})();

The Worker Script

The worker can as simple as 

parentPort.on('message', async someObject => // Some function that uses the object, and can report back to the parent worker by  );

The worker is not required to report back to the master; however, this is good practice to know whether or not the worker finished, failed, or, optionally, to return the results of the calculations.

  parentPort.postMessage({ type: 'done', body: {key: 'value'} });
  parentPort.postMessage({ type: 'log', body: {message: 'something happened'} });

The examples above have a property type with values done and log. Both the name and values are arbitrary — just remember that you can exchange data between the workers and the master in this fashion. Additionally, I should note that sending a message from the worker to the master will not terminate the worker. So, one worker can send many messages during its execution.

Here is a more thorough template for a worker:

const { parentPort } = require('worker_threads');

async function process(someObject) {
  //Do lots of processing here...

  //Send a status update to the PRIMARY
  parentPort.postMessage({ type: 'status', body: 'still working' });

  //Do more processing...

  //Send final message with relevant data back to the PRIMARY script
  parentPort.postMessage({ type: 'done', body: {key: 'value'} });
}

//Register method to execute on message
parentPort.on('message', async someObject => process(someObject));

Additional Notes

Thread Workers have only recently become a stable feature in Node 13.0. If any error occurs while running older versions of node, it is worth making sure thread workers are enabled by running with the –experimental-worker flag. 

node --experimental-worker master.js

Further Reading



Source link

Write A Comment