Thread pooling em C ++ 11

Questões relevantes :

Sobre o C ++ 11:

  • C ++ 11: std :: thread agrupado?
  • O asynchronous (launch :: async) em C ++ 11 torna os pools de threads obsoletos para evitar a criação de threads caras?

Sobre o Boost:

  • C ++ impulsionam threads de reutilização de threads
  • boost :: thread e criando um pool deles!

Como obtenho um pool de threads para enviar tarefas , sem criá-las e excluí-las repetidas vezes? Isso significa encadeamentos persistentes para ressincronizar sem ingressar.


Eu tenho código que se parece com isso:

namespace { std::vector workers; int total = 4; int arr[4] = {0}; void each_thread_does(int i) { arr[i] += 2; } } int main(int argc, char *argv[]) { for (int i = 0; i < 8; ++i) { // for 8 iterations, for (int j = 0; j < 4; ++j) { workers.push_back(std::thread(each_thread_does, j)); } for (std::thread &t: workers) { if (t.joinable()) { t.join(); } } arr[4] = std::min_element(arr, arr+4); } return 0; } 

Em vez de criar e unir threads a cada iteração, prefiro enviar tarefas para meus threads de trabalho a cada iteração e criá-las apenas uma vez.

Você pode usar a biblioteca C ++ Thread Pool, https://github.com/vit-vit/ctpl .

Então o código que você escreveu pode ser substituído pelo seguinte

 #include  // or  if ou do not have Boost library int main (int argc, char *argv[]) { ctpl::thread_pool p(2 /* two threads in the pool */); int arr[4] = {0}; std::vector> results(4); for (int i = 0; i < 8; ++i) { // for 8 iterations, for (int j = 0; j < 4; ++j) { results[j] = p.push([&arr, j](int){ arr[j] +=2; }); } for (int j = 0; j < 4; ++j) { results[j].get(); } arr[4] = std::min_element(arr, arr + 4); } } 

Você obterá o número desejado de encadeamentos e não os criará e os excluirá repetidamente nas iterações.

Um conjunto de encadeamentos significa que todos os seus encadeamentos estão em execução, o tempo todo – em outras palavras, a function de encadeamento nunca retorna. Para dar aos encadeamentos algo significativo a ser feito, é necessário projetar um sistema de comunicação entre encadeamentos, com o objective de informar ao encadeamento que há algo a ser feito, bem como para comunicar os dados reais do trabalho.

Normalmente, isso envolverá algum tipo de estrutura de dados simultânea, e cada thread presumivelmente dormiria em algum tipo de variável de condição, que seria notificada quando houvesse trabalho a fazer. Ao receber a notificação, um ou vários dos segmentos são ativados, recuperam uma tarefa da estrutura de dados simultânea, a processam e armazenam o resultado de maneira análoga.

A discussão então iria verificar se há ainda mais trabalho a fazer, e se não voltar a dormir.

O resultado é que você tem que projetar tudo isso sozinho, já que não há uma noção natural de “trabalho” que seja universalmente aplicável. É um pouco de trabalho, e há alguns problemas sutis que você precisa acertar. (Você pode programar no Go se você gosta de um sistema que cuida do gerenciamento de threads para você nos bastidores.)

Isto é copiado da minha resposta para outro post muito semelhante, espero que possa ajudar:

1) Comece com o número máximo de threads que um sistema pode suportar:

 int Num_Threads = thread::hardware_concurrency(); 

2) Para uma implementação eficiente do conjunto de encadeamentos, uma vez que os encadeamentos sejam criados de acordo com Num_Threads, é melhor não criar novos ou destruir antigos (unindo-se). Haverá penalidade de desempenho, pode até fazer com que seu aplicativo seja mais lento que a versão serial.

Cada encadeamento C ++ 11 deve estar rodando em sua function com um loop infinito, aguardando constantemente que novas tarefas sejam executadas e executadas.

Aqui está como append essa function ao pool de threads:

 int Num_Threads = thread::hardware_concurrency(); vector Pool; for(int ii = 0; ii < Num_Threads; ii++) { Pool.push_back(thread(Infinite_loop_function));} 

3) O Infinite_loop_function

Este é um loop "while (true)" esperando pela fila de tarefas

 void The_Pool:: Infinite_loop_function() { while(true) { { unique_lock lock(Queue_Mutex); condition.wait(lock, []{return !Queue.empty()}); Job = Queue.front(); Queue.pop(); } Job(); // function type } }; 

4) Faça uma function para adicionar trabalho à sua fila

 void The_Pool:: Add_Job(function New_Job) { { unique_lock lock(Queue_Mutex); Queue.push(New_Job); } condition.notify_one(); } 

5) Vincule uma function arbitrária à sua fila

 Pool_Obj.Add_Job(std::bind(&Some_Class::Some_Method, &Some_object)); 

Depois de integrar esses ingredientes, você tem seu próprio pool dynamic de threads. Esses segmentos sempre são executados, aguardando que o trabalho seja feito.

Peço desculpas se houver alguns erros de syntax, digitei esses códigos e tenho uma memory ruim. Lamentamos não poder fornecer o código completo do conjunto de encadeamentos, o que violaria a integridade do meu trabalho.

Um conjunto de encadeamentos está no núcleo de um conjunto de encadeamentos todos vinculados a uma function que funciona como um loop de events. Esses threads aguardarão indefinidamente uma tarefa ou sua própria finalização.

A tarefa do conjunto de encadeamentos é fornecer uma interface para enviar tarefas, definir (e talvez modificar) a política de execução dessas tarefas (regras de planejamento, instanciação de encadeamento, tamanho do conjunto) e monitorar o status dos encadeamentos e resources relacionados.

Assim, para um pool versátil, é necessário começar definindo o que é uma tarefa, como ela é iniciada, interrompida, qual é o resultado (veja a noção de promise e futuro para essa pergunta), que tipo de events os threads terão para responder para como eles vão lidar com eles, como esses events devem ser discriminados daqueles tratados pelas tarefas. Isso pode se tornar bastante complicado, como você pode ver, e impor restrições sobre como os threads funcionarão, à medida que a solução se torna mais e mais envolvida.

O ferramental atual para manipular events é razoavelmente barebones (*): primitivos como mutexes, variables ​​de condição e algumas abstrações em cima disso (bloqueios, barreiras). Mas, em alguns casos, essas abstrações podem se mostrar inadequadas (veja essa questão relacionada), e é preciso voltar a usar os primitivos.

Outros problemas também precisam ser gerenciados:

  • sinal
  • eu / o
  • hardware (afinidade do processador, configuração heterogênea)

Como isso ocorreria em seu ambiente?

Esta resposta a uma pergunta semelhante aponta para uma implementação existente para reforço e stl.

Eu ofereci uma implementação muito bruta de um conjunto de encadeamentos para outra pergunta, que não resolve muitos problemas descritos acima. Você pode querer construir sobre isso. Você também pode querer dar uma olhada em estruturas existentes em outras linguagens, para encontrar inspiração.


(*) Eu não vejo isso como um problema, muito pelo contrário. Eu acho que é o espírito de C ++ herdado de C.

Algo como isso pode ajudar (tirado de um aplicativo de trabalho).

 #include  #include  #include  struct thread_pool { typedef std::unique_ptr asio_worker; thread_pool(int threads) :service(), service_worker(new asio_worker::element_type(service)) { for (int i = 0; i < threads; ++i) { auto worker = [this] { return service.run(); }; grp.add_thread(new boost::thread(worker)); } } template void enqueue(F f) { service.post(f); } ~thread_pool() { service_worker.reset(); grp.join_all(); service.stop(); } private: boost::asio::io_service service; asio_worker service_worker; boost::thread_group grp; }; 

Você pode usá-lo assim:

 thread_pool pool(2); pool.enqueue([] { std::cout < < "Hello from Task 1\n"; }); pool.enqueue([] { std::cout << "Hello from Task 2\n"; }); 

Tenha em mente que reinventar um mecanismo de enfileiramento asynchronous eficiente não é trivial.

Boost :: asio :: io_service é uma implementação muito eficiente, ou na verdade é uma coleção de wrappers específicos da plataforma (por exemplo, envolve as portas de conclusão de E / S no Windows).

Esta é outra implementação do conjunto de encadeamentos que é muito simples, fácil de entender e usar, usa somente a biblioteca padrão do C ++ 11 e pode ser consultada ou modificada para seus usos, deve ser um bom começo se você quiser entrar no encadeamento piscinas:

https://github.com/progschj/ThreadPool

Edit: Isso requer agora C + + 17 e conceitos. (A partir de 9/12/16, apenas g ++ 6.0+ é suficiente.)

A dedução de modelo é muito mais precisa por causa disso, por isso vale a pena o esforço de obter um compilador mais novo. Ainda não encontrei uma function que requer argumentos de modelo explícitos.

Ele agora também pega qualquer object que possa ser chamado ( e ainda é estaticamente seguro !!! ).

Agora também inclui um pool de threads de prioridade de encadeamento verde opcional usando a mesma API. Esta class é apenas POSIX, no entanto. Ele usa a API ucontext_t para troca de tarefas do espaço do usuário.


Eu criei uma biblioteca simples para isso. Um exemplo de uso é dado abaixo. (Estou respondendo isso porque foi uma das coisas que encontrei antes de decidir que era necessário escrevê-lo eu mesmo.)

 bool is_prime(int n){ // Determine if n is prime. } int main(){ thread_pool pool(8); // 8 threads list> results; for(int n = 2;n < 10000;n++){ // Submit a job to the pool. results.emplace_back(pool.async(is_prime, n)); } int n = 2; for(auto i = results.begin();i != results.end();i++, n++){ // i is an iterator pointing to a future representing the result of is_prime(n) cout << n << " "; bool prime = i->get(); // Wait for the task is_prime(n) to finish and get the result. if(prime) cout < < "is prime"; else cout << "is not prime"; cout << endl; } } 

Você pode passar qualquer function async com qualquer valor de retorno (ou vazio) e qualquer (ou nenhum) argumento e ele retornará um std::future correspondente. Para obter o resultado (ou apenas esperar até que uma tarefa seja concluída), você chama get() no futuro.

Aqui está o github: https://github.com/Tyler-Hardin/thread_pool .

 Follwoing [PhD EcE](https://stackoverflow.com/users/3818417/phd-ece) suggestion, I implemented the thread pool: 

function_pool.h

 #pragma once #include  #include  #include  #include  #include  #include  class Function_pool { private: std::queue> m_function_queue; std::mutex m_lock; std::condition_variable m_data_condition; std::atomic m_accept_functions; public: Function_pool(); ~Function_pool(); void push(std::function func); void done(); void infinite_loop_func(); }; 

function_pool.cpp

 #include "function_pool.h" Function_pool::Function_pool() : m_function_queue(), m_lock(), m_data_condition(), m_accept_functions(true) { } Function_pool::~Function_pool() { } void Function_pool::push(std::function func) { std::unique_lock lock(m_lock); m_function_queue.push(func); // when we send the notification immediately, the consumer will try to get the lock , so unlock asap lock.unlock(); m_data_condition.notify_one(); } void Function_pool::done() { std::unique_lock lock(m_lock); m_accept_functions = false; lock.unlock(); // when we send the notification immediately, the consumer will try to get the lock , so unlock asap m_data_condition.notify_all(); //notify all waiting threads. } void Function_pool::infinite_loop_func() { std::function func; while (true) { { std::unique_lock lock(m_lock); m_data_condition.wait(lock, [this]() {return !m_function_queue.empty() || !m_accept_functions; }); if (!m_accept_functions && m_function_queue.empty()) { //lock will be release automatically. //finish the thread loop and let it join in the main thread. return; } func = m_function_queue.front(); m_function_queue.pop(); //release the lock } func(); } } 

main.cpp

 #include "function_pool.h" #include  #include  #include  #include  #include  #include  Function_pool func_pool; class quit_worker_exception : public std::exception {}; void example_function() { std::cout < < "bla" << std::endl; } int main() { std::cout << "stating operation" << std::endl; int num_threads = std::thread::hardware_concurrency(); std::cout << "number of threads = " << num_threads << std::endl; std::vector thread_pool; for (int i = 0; i < num_threads; i++) { thread_pool.push_back(std::thread(&Function_pool::infinite_loop_func, &func_pool)); } //here we should send our functions for (int i = 0; i < 50; i++) { func_pool.push(example_function); } func_pool.done(); for (unsigned int i = 0; i < thread_pool.size(); i++) { thread_pool.at(i).join(); } } 

Um conjunto de encadeamentos sem dependencies fora do STL é totalmente possível. Recentemente, escrevi uma pequena biblioteca de encadeamentos de encadeamentos para resolver exatamente o mesmo problema. Ele suporta o redimensionamento dynamic do pool (alterando o número de trabalhadores em tempo de execução), aguardando, parando, pausando, retomando e assim por diante. Espero que você ache útil.