Coordenando a execução paralela em node.js

O modelo de programação orientada a events do node.js torna um pouco complicado coordenar o stream do programa.

A execução sequencial simples é transformada em retornos de chamada nesteds, o que é bastante fácil (embora um pouco complicada para escrever).

Mas como sobre a execução paralela? Digamos que você tenha três tarefas A, B, C que podem ser executadas em paralelo e, quando terminarem, você deseja enviar os resultados para a tarefa D.

Com um modelo fork / join isso seria

  • garfo A
  • garfo B
  • garfo C
  • junte-se A, B, C, execute D

Como eu escrevo isso em node.js? Existem melhores práticas ou livros de receitas? Eu tenho que entregar uma solução toda vez, ou há alguma biblioteca com ajudantes para isso?

Nada é verdadeiramente paralelo em node.js, uma vez que é single threaded. No entanto, vários events podem ser agendados e executados em uma sequência que você não pode determinar de antemão. E algumas coisas, como o access ao database, são, na verdade, “paralelas”, pois as próprias consultas ao database são executadas em encadeamentos separados, mas são reintegradas ao stream de events quando concluídas.

Então, como você agendar um retorno de chamada em vários manipuladores de events? Bem, essa é uma técnica comum usada em animações no javascript do lado do navegador: use uma variável para acompanhar a conclusão.

Isso soa como um hack e é, e parece potencialmente confuso deixando um monte de variables ​​globais em torno de fazer o rastreamento e em uma linguagem menor seria. Mas no javascript podemos usar closures:

function fork (async_calls, shared_callback) { var counter = async_calls.length; var callback = function () { counter --; if (counter == 0) { shared_callback() } } for (var i=0;i 

No exemplo acima, mantemos o código simples, assumindo que as funções async e callback não requerem argumentos. É claro que você pode modificar o código para passar argumentos para as funções assíncronas e fazer com que a function de retorno de chamada acumule resultados e passe para a function shared_callback.


Resposta adicional:

Na verdade, mesmo assim, essa function fork() já pode passar argumentos para as funções assíncronas usando um encerramento:

 fork([ function(callback){ A(1,2,callback) }, function(callback){ B(1,callback) }, function(callback){ C(1,2,callback) } ],D); 

a única coisa que resta a fazer é acumular os resultados de A, B, C e passá-los para D.


Ainda mais resposta adicional:

Eu não pude resistir. Continuou pensando nisso durante o café da manhã. Aqui está uma implementação de fork() que acumula resultados (geralmente passados ​​como argumentos para a function callback):

 function fork (async_calls, shared_callback) { var counter = async_calls.length; var all_results = []; function makeCallback (index) { return function () { counter --; var results = []; // we use the arguments object here because some callbacks // in Node pass in multiple arguments as result. for (var i=0;i 

Isso foi bastante fácil. Isso faz com que o fork() uma finalidade bastante geral e possa ser usado para sincronizar vários events não homogêneos.

Exemplo de uso no Node.js:

 // Read 3 files in parallel and process them together: function A (c){ fs.readFile('file1',c) }; function B (c){ fs.readFile('file2',c) }; function C (c){ fs.readFile('file3',c) }; function D (result) { file1data = result[0][1]; file2data = result[1][1]; file3data = result[2][1]; // process the files together here } fork([A,B,C],D); 

Atualizar

Este código foi escrito antes da existência de bibliotecas como async.js ou as várias bibliotecas baseadas em promises. Eu gostaria de acreditar que o async.js foi inspirado por isso, mas eu não tenho nenhuma prova disso. Enfim .. se você está pensando em fazer isso hoje, dê uma olhada no async.js ou promete. Apenas considere a resposta acima de uma boa explicação / ilustração de como coisas como async.parallel funcionam.

Para fins de integralidade, o seguinte é como você faria isso com async.parallel :

 var async = require('async'); async.parallel([A,B,C],D); 

Note que o async.parallel funciona exatamente da mesma forma que a function fork que implementamos acima. A principal diferença é que ele passa um erro como o primeiro argumento para D e o retorno de chamada como o segundo argumento conforme a convenção node.js.

Usando promises, escrevemos da seguinte forma:

 // Assuming A, B & C return a promise instead of accepting a callback Promise.all([A,B,C]).then(D); 

Acredito que agora o módulo “async” fornece essa funcionalidade paralela e é aproximadamente o mesmo que a function fork acima.

O módulo de futuros tem um submódulo chamado junit que eu gostei de usar:

Junta chamadas assíncronas juntas, como o pthread_join funciona para encadeamentos.

O readme mostra alguns bons exemplos de uso do freestyle ou usando o futuro submódulo usando o padrão Promise. Exemplo dos documentos:

 var Join = require('join') , join = Join() , callbackA = join.add() , callbackB = join.add() , callbackC = join.add(); function abcComplete(aArgs, bArgs, cArgs) { console.log(aArgs[1] + bArgs[1] + cArgs[1]); } setTimeout(function () { callbackA(null, 'Hello'); }, 300); setTimeout(function () { callbackB(null, 'World'); }, 500); setTimeout(function () { callbackC(null, '!'); }, 400); // this must be called after all join.when(abcComplete); 

Uma solução simples pode ser possível aqui: http://howtonode.org/control-flow-part-ii role para Ações paralelas. Outra maneira seria ter A, B e C compartilhando a mesma function de retorno de chamada, ter essa function com um incrementador global ou pelo menos fora da function, se todos os três tiverem chamado o retorno de chamada e, em seguida, executar D, É claro que você terá que armazenar os resultados de A, B e C em algum lugar também.

Outra opção poderia ser o módulo Step para o nó: https://github.com/creationix/step

Você pode querer experimentar esta minúscula biblioteca: https://www.npmjs.com/package/parallel-io

Além de promises populares e biblioteca assíncrona, existe uma terceira maneira elegante – usando “fiação”:

 var l = new Wire(); funcA(l.branch('post')); funcB(l.branch('comments')); funcC(l.branch('links')); l.success(function(results) { // result will be object with results: // { post: ..., comments: ..., links: ...} }); 

https://github.com/garmoshka-mo/mo-wire