Параллелизм в Node.JS
Привет, в этом посте я хочу попытаться рассказать о том, как можно писать на node.js параллельные приложения, это не на столько удобный параллелизм как в Java, но это реально параллельное выполнение. Ну что-же, приступим.
Как это работает
Все на самом деле не так сложно, Node.JS позволяет делать fork процессов и налаживать коммуникации в них через IPC (Inter-Process Communication). Это дает нам возможность передавать данные между процессами в обоих направлениях. Что ж, давайте напишем приложение которое будет параллельно обрабатывать нам к примеру данные в файлах и потом мы должны получить результат работы потоков в одном массиве.
Задача
Давайте заставим наши потоки обрабатывать информацию о пользователях в файлах, там будет храниться имя и счет человека, если счет >= 10_000 долларов, то этого человека мы должны будем вернуть из нашего потока. Давайте уже начнем писать код, хватит болтать :)
Структура директорий
.
├── files
│ ├── a.txt
│ ├── b.txt
│ └── c.txt
├── index.js
└── thread.js
Я думаю тут все и так слишком понятно. В папке files будут хранится данные о наших людях, в файле index.js будет код запуска потоков и thread.js код обработки данных (содержимое файлов будет в моем github репозитории, ссылка в конце статьи)
index.js
Как я уже написал выше, этот файл содержит в себе код запуска потоков и раздачи задач. Давайте напишем код этого файла.
const { fork } = require('child_process');
const thread = fork(`${__dirname}/thread.js`, ['a.txt']);
// get message from another thread
thread.on('message', (msg) => console.log(msg));
Этот код пока ничего полезного не делает, но он начнет это делать уже скоро, а пока здесь можно увидеть как в Node.JS можно создать что-то похожее на поток. Обратите внимание на функцию fork, она делает всю работу, мы ей говорим какой файл загрузить и открыть с ним IPC сокет, далее мы можем получать сообщения из запущенного потока, давайте попробуем сделать этот код поумнее?
const {fork} = require('child_process');
const {promisify} = require('util');
const fs = require('fs');
promisify(fs.readdir)('./files')
.then(makeThreadPromises)
.then(processingResults => processingResults.reduce((l, r) => [...l, ...r]))
.then(console.log)
.catch(console.log);
/**
* Эта функция возвращает промис который завершается после всех промисов
* которые обрабатывают файлы
*
* Это по сути является синхронизацией потоков, чтобы результаты были
* аггрегированны в один массив
*
* @param files
* @returns {Promise<any[]>}
*/
function makeThreadPromises(files) {
return Promise.all(files.map(f => makeProcessingThread(`./files/${f}`)));
}
/**
* Создает один промис который является как бы потоком, для того
* чтобы потом захватить результат с IPC канала
*
* @param fName
* @returns {Promise<any>}
*/
function makeProcessingThread(fName) {
if (!fName)
throw new Error('file name must be set');
return new Promise((resolve, reject) => {
const p = fork('./thread.js', [fName]);
let result = undefined;
p.on('message', processingResult => result = processingResult);
p.on('disconnect', () => {
if (!result) {
reject(new Error('thread return empty result'))
} else {
resolve(result);
}
});
})
}
Здесь поменялось очень много :). Давайте по порядку разберем что тут происходит. Первая конструкция из промисов, это цепочка обработки наших файлов, она запускает потоки и аггрегирует из них результат.
Следующая функция
makeThreadPromises
Создает JS промис который вызывается после завершения выполнения всех параллельных потоков которые обрабатывают файлы
Вот эта функция
makeProcessingThread
Создает JS промис который возвращает результат обработки из потока, давайте остановимся на этой функции подольше. Эта функция на самом деле легче чем кажется. Первое что мы делаем, это проверяем входной параметр, чтобы имя файла для обработки присутствовало. Далее мы возвращаем промис который создает новый процесс с которым налажен IPC канал, дальше мы подписываемся на сообщения от потока который обрабатывает наши файлы.
Событие 'message'
Это событие вызывается когда поток который мы запустили, отправляет нам какое-нибудь сообщение, это может быть JSON объектом, любым литеральным типом и так далее. Мы используем это событие чтобы сохранить результат работы потока в переменную
Событие 'disconnect'
Это событие присылается когда поток который мы запустили отключился
Пора рассмотреть код потока
Далее я привожу код файла thread.js. Повторюсь, это js файл который обрабатывает наш файл и отдаем нам результаты обработки (люди у которых счет >= 10000)
const fs = require('fs');
const {promisify} = require('util');
const ACCOUNT_SEGMENT = 1;
const USER_NAME_SEGMENT = 0;
const myFileName = process.argv[2];
/*
* Этот код в виде цепочки промисов делает обработку файла, ищет
* в нем людей у которых сумма счета >= 10_000 долларов
*/
promisify(fs.readFile)(myFileName).then(buffer => buffer.toString())
.then(fileAsString => fileAsString.split('\n'))
.then(processUsers)
.then(sendResultToParent)
.then(() => process.disconnect());
/**
* Эта функция возвращает список пользователей у которых размер
* счета больше или равен $10 000
*
* @param userLines
* @returns {Array}
*/
function processUsers(userLines) {
const res = [];
for (const line of userLines) {
const segments = line.split(' ');
if (parseInt(segments[ACCOUNT_SEGMENT], 10) >= 10000)
res.push(segments[USER_NAME_SEGMENT])
}
return res;
}
/**
* Эта функция отсылает результат по IPC каналу родительскому процессу
*
* @param users
* @returns {Promise<void>}
*/
function sendResultToParent(users) {
process.send(users);
return Promise.resolve();
}
Я думаю из комментариев к функциям станет все понятно. Остановимся только на функции sendResultToParent, она используется для того чтобы отправить результаты вызывающему процессу, для этого используется функция process.send которая принимает первым параметром то, что мы хотим отправить, а так же имеет пару опциональных параметров и возвращает промис.
Прошу обратить внимание на функцию process.disconnect, это очень важный вызов, без него приложение будет выполняться бесконечно, по-этому, когда работа в процессе закончена, всегда необходимо вызвать эту функцию.
Давайте запустим код и посмотрим на результаты
Для запуска проекта воспользуемся следующей командой
node index.js
мы видим что все работает и в результате на консоль мы получили следующий результат обработки файлов
[ 'Martin',
'Ferdinand',
'Lari',
'Alex',
'Simon',
'Max',
'Bil',
'Mark',
'Lora',
'Lari.B',
'Vladimir.P',
'Martin' ]
Итоги
Мы с вами получили приложение которое распаралелено на 3 потока, эти три потока выполняют обработку трех файлов, параллельно, при больших задачах это может очень сильно ускорить приложение, например расчет статистики сайта за 3 года. Код который приведен выше не нужно использовать как основу для чего либо, он написан ради того, чтобы показать концепцию, если вы хотите использовать его у себя в проекте, то Вам следует его хорошенько доработать.
Материалы для изучения