Como obter o ThreadPoolExecutor para aumentar os segmentos para o máximo antes de enfileirar?

Eu tenho sido frustrado por algum tempo com o comportamento padrão de ThreadPoolExecutor que faz o backup dos pools de segmentos do ExecutorService que muitos de nós usam. Para citar os Javadocs:

Se houver mais de corePoolSize, mas menos de maximumPoolSize threads em execução, um novo thread será criado apenas se a fila estiver cheia .

O que isso significa é que, se você definir um conjunto de encadeamentos com o código a seguir, ele nunca iniciará o segundo encadeamento porque o LinkedBlockingQueue é ilimitado.

 ExecutorService threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/, TimeUnit.SECONDS, new LinkedBlockingQueue(/* unlimited queue */)); 

Somente se você tiver uma fila limitada e a fila estiver cheia, todos os tópicos acima do número principal foram iniciados. Eu suspeito que um grande número de programadores Java multithreaded júnior não estão cientes desse comportamento do ThreadPoolExecutor .

Agora tenho um caso de uso específico em que isso não é ideal. Eu estou procurando maneiras, sem escrever minha própria class TPE, para contornar isso.

Minhas exigências são para um serviço da Web que está fazendo chamadas de retorno para uma terceira parte possivelmente não confiável.

  • Eu não quero fazer a chamada de volta em sincronia com a solicitação da web, então eu quero usar um pool de threads.
  • Eu normalmente recebo alguns desses por um minuto, então eu não quero ter um newFixedThreadPool(...) com um grande número de threads que estão inativas.
  • De vez em quando recebo uma explosão deste tráfego e quero aumentar o número de threads para algum valor máximo (digamos 50).
  • Eu preciso fazer uma melhor tentativa de fazer todos os retornos de chamada, então eu quero colocar na fila quaisquer adicionais acima de 50. Eu não quero sobrecarregar o resto do meu servidor web usando um newCachedThreadPool() .

Como posso trabalhar com essa limitação em ThreadPoolExecutor onde a fila precisa ser limitada e completa antes que mais threads sejam iniciados? Como posso obtê-lo para iniciar mais threads antes das tarefas de enfileiramento?

Editar:

@Flavio faz questão de usar o ThreadPoolExecutor.allowCoreThreadTimeOut(true) para ter o tempo limite e a saída dos threads principais. Eu considerei isso, mas eu ainda queria o recurso de threads principais. Eu não queria que o número de threads no pool caísse abaixo do tamanho do núcleo, se possível.

    Como posso contornar essa limitação em ThreadPoolExecutor onde a fila precisa ser limitada e completa antes que mais threads sejam iniciados.

    Acredito que finalmente encontrei uma solução um tanto elegante (talvez um pouco hacky) para essa limitação com o ThreadPoolExecutor . Envolve a extensão de LinkedBlockingQueue para que ele retorne false para queue.offer(...) quando já houver algumas tarefas enfileiradas. Se os encadeamentos atuais não estiverem acompanhando as tarefas enfileiradas, o TPE adicionará encadeamentos adicionais. Se o pool já estiver em encadeamentos máximos, o RejectedExecutionHandler será chamado. É o manipulador que faz o put(...) na fila.

    Certamente é estranho escrever uma fila onde a offer(...) pode retornar false e put() nunca bloqueia, então essa é a parte do hack. Mas isso funciona bem com o uso da fila pelo TPE, então não vejo nenhum problema em fazer isso.

    Aqui está o código:

     // extend LinkedBlockingQueue to force offer() to return false conditionally BlockingQueue queue = new LinkedBlockingQueue() { private static final long serialVersionUID = -6903933921423432194L; @Override public boolean offer(Runnable e) { /* * Offer it to the queue if there is 0 items already queued, else * return false so the TPE will add another thread. If we return false * and max threads have been reached then the RejectedExecutionHandler * will be called which will do the put into the queue. */ if (size() == 0) { return super.offer(e); } else { return false; } } }; ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*secs*/, TimeUnit.SECONDS, queue); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { /* * This does the actual put into the queue. Once the max threads * have been reached, the tasks will then queue up. */ executor.getQueue().put(r); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; } } }); 

    Com este mecanismo, quando eu envio tarefas para a fila, o ThreadPoolExecutor irá:

    1. Escale o número de threads até o tamanho do núcleo inicialmente (aqui 1).
    2. Ofereça-o à fila. Se a fila estiver vazia, ela será enfileirada para ser manipulada pelos encadeamentos existentes.
    3. Se a fila já tiver 1 ou mais elementos, a offer(...) retornará false.
    4. Se falso for retornado, aumente o número de encadeamentos no conjunto até que atinjam o número máximo (aqui 50).
    5. Se no máximo, ele chama o RejectedExecutionHandler
    6. O RejectedExecutionHandler seguida, coloca a tarefa na fila para ser processada pelo primeiro thread disponível na ordem FIFO.

    Embora no meu código de exemplo acima, a fila seja ilimitada, você também pode defini-la como uma fila limitada. Por exemplo, se você adicionar uma capacidade de 1000 ao LinkedBlockingQueue , ele irá:

    1. dimensionar os tópicos até o máximo
    2. então enfileirar até que esteja cheio com 1000 tarefas
    3. em seguida, bloqueie o chamador até que o espaço se torne disponível para a fila.

    Além disso, se você realmente precisasse usar a offer(...) no RejectedExecutionHandler , poderia usar o método offer(E, long, TimeUnit) , em vez disso, com Long.MAX_VALUE como o tempo limite.

    Editar:

    Eu ajustei minha offer(...) replace o método por feedback de @ Ralf. Isso só aumentará o número de encadeamentos no conjunto se eles não estiverem acompanhando a carga.

    Editar:

    Outro ajuste para essa resposta poderia ser perguntar ao TPE se há threads ociosos e apenas enfileirar o item se houver. Você teria que fazer uma class verdadeira para isso e adicionar um ourQueue.setThreadPoolExecutor(tpe); método sobre ele.

    Então o seu método de offer(...) pode ser algo como:

    1. Verifique se o tpe.getPoolSize() == tpe.getMaximumPoolSize() , nesse caso, basta chamar super.offer(...) .
    2. Senão, se tpe.getPoolSize() > tpe.getActiveCount() seguida, chame super.offer(...) pois parece haver encadeamentos ociosos.
    3. Caso contrário, retorne false para bifurcar outro thread.

    Talvez isto:

     int poolSize = tpe.getPoolSize(); int maximumPoolSize = tpe.getMaximumPoolSize(); if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) { return super.offer(e); } else { return false; } 

    Observe que os methods get no TPE são caros, pois acessam campos volatile ou (no caso de getActiveCount() ) bloqueiam o TPE e percorrem a lista de threads. Além disso, há condições de corrida aqui que podem fazer com que uma tarefa seja enfileirada incorretamente ou outro thread seja bifurcada quando houver um thread ocioso.

    Defina o tamanho do núcleo e o tamanho máximo para o mesmo valor e permita que os threads principais sejam removidos do pool com allowCoreThreadTimeOut(true) .

    Eu já tenho duas outras respostas sobre esta questão, mas suspeito que esta seja a melhor.

    É baseado na técnica da resposta aceita atualmente , a saber:

    1. Substituir o método offer() da fila para (às vezes) retornar falso,
    2. que faz com que o ThreadPoolExecutor para gerar um novo segmento ou rejeitar a tarefa e
    3. defina o RejectedExecutionHandler para enfileirar a tarefa na rejeição.

    O problema é quando offer() deve retornar false. A resposta aceita atualmente retorna false quando a fila tem algumas tarefas, mas como eu indiquei no meu comentário, isso causa efeitos indesejáveis. Como alternativa, se você sempre retornar falso, continuará gerando novos segmentos mesmo quando tiver segmentos aguardando na fila.

    A solução é usar o Java 7 LinkedTransferQueue e ter o offer() chamada tryTransfer() . Quando há um thread de consumidor em espera, a tarefa será passada para esse encadeamento. Caso contrário, offer() retornará false e o ThreadPoolExecutor gerará um novo thread.

      BlockingQueue queue = new LinkedTransferQueue() { @Override public boolean offer(Runnable e) { return tryTransfer(e); } }; ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { executor.getQueue().put(r); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }); 

    Nota: agora eu prefiro e recomendo minha outra resposta .

    Aqui está uma versão que me parece muito mais direta: Aumente o corePoolSize (até o limite de maximumPoolSize) sempre que uma nova tarefa for executada e diminua o corePoolSize (até o limite do “tamanho do pool principal” especificado pelo usuário) sempre que tarefa completa.

    Em outras palavras, acompanhe o número de tarefas em execução ou enfileiradas e garanta que o corePoolSize seja igual ao número de tarefas, desde que seja entre o “tamanho do conjunto principal” especificado pelo usuário e o maximumPoolSize.

     public class GrowBeforeQueueThreadPoolExecutor extends ThreadPoolExecutor { private int userSpecifiedCorePoolSize; private int taskCount; public GrowBeforeQueueThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); userSpecifiedCorePoolSize = corePoolSize; } @Override public void execute(Runnable runnable) { synchronized (this) { taskCount++; setCorePoolSizeToTaskCountWithinBounds(); } super.execute(runnable); } @Override protected void afterExecute(Runnable runnable, Throwable throwable) { super.afterExecute(runnable, throwable); synchronized (this) { taskCount--; setCorePoolSizeToTaskCountWithinBounds(); } } private void setCorePoolSizeToTaskCountWithinBounds() { int threads = taskCount; if (threads < userSpecifiedCorePoolSize) threads = userSpecifiedCorePoolSize; if (threads > getMaximumPoolSize()) threads = getMaximumPoolSize(); setCorePoolSize(threads); } } 

    Conforme escrito, a class não suporta a alteração do corePoolSize especificado pelo usuário ou maximumPoolSize após a construção e não suporta a manipulação da fila de trabalho diretamente ou via remove() ou purge() .

    Temos uma subclass de ThreadPoolExecutor que recebe um creationThreshold adicional e substitui a execute .

     public void execute(Runnable command) { super.execute(command); final int poolSize = getPoolSize(); if (poolSize < getMaximumPoolSize()) { if (getQueue().size() > creationThreshold) { synchronized (this) { setCorePoolSize(poolSize + 1); setCorePoolSize(poolSize); } } } } 

    talvez isso ajude também, mas o seu parece mais artístico, claro …

    A resposta recomendada resolve apenas um (1) do problema com o pool de threads do JDK:

    1. Os conjuntos de encadeamentos JDK são direcionados para enfileiramento. Então, ao invés de gerar um novo thread, eles irão enfileirar a tarefa. Somente se a fila atingir seu limite, o conjunto de encadeamentos gerará um novo encadeamento.

    2. A retirada de linha não acontece quando a carga clareia. Por exemplo, se tivermos um burst de jobs atingindo o pool que faz com que o pool chegue ao máximo, seguido por carga leve de no máximo 2 tarefas por vez, o pool usará todos os threads para atender a carga leve evitando a desativação do thread. (apenas 2 tópicos seriam necessários …)

    Infeliz com o comportamento acima, fui em frente e implementei um pool para superar as deficiências acima.

    Para resolver 2) Usando o planejamento Lifo resolve o problema. Esta ideia foi apresentada por Ben Maurer na conferência do aplicativo ACM 2015: Systems @ Facebook scale

    Então, uma nova implementação nasceu:

    LifoThreadPoolExecutorSQP

    Até agora, esta implementação melhora o desempenho da execução assíncrona para o ZEL .

    A implementação é capaz de girar para reduzir a sobrecarga do switch de contexto, gerando desempenho superior para certos casos de uso.

    Espero que ajude…

    PS: O JDK Fork Join Pool implementa o ExecutorService e funciona como um conjunto de threads “normal”. A implementação é de alto desempenho. Ele usa o agendamento do LIFO Thread, mas não há controle sobre o tamanho da fila interna, o tempo limite de aposentadoria …

    Nota: agora eu prefiro e recomendo minha outra resposta .

    Eu tenho outra proposta, seguindo a ideia original de alterar a fila para retornar false. Neste, todas as tarefas podem entrar na fila, mas sempre que uma tarefa é enfileirada após execute() , nós a seguimos com uma tarefa sentinel no-op que a fila rejeita, fazendo com que um novo thread seja gerado, que executará o não operacional imediatamente seguido por algo da fila.

    Como os threads de trabalho podem estar pesquisando o LinkedBlockingQueue para uma nova tarefa, é possível que uma tarefa seja enfileirada mesmo quando há um thread disponível. Para evitar gerar novos encadeamentos mesmo quando há encadeamentos disponíveis, precisamos acompanhar quantos encadeamentos estão aguardando novas tarefas na fila e gerar apenas um novo encadeamento quando houver mais tarefas na fila do que encadeamentos em espera.

     final Runnable SENTINEL_NO_OP = new Runnable() { public void run() { } }; final AtomicInteger waitingThreads = new AtomicInteger(0); BlockingQueue queue = new LinkedBlockingQueue() { @Override public boolean offer(Runnable e) { // offer returning false will cause the executor to spawn a new thread if (e == SENTINEL_NO_OP) return size() < = waitingThreads.get(); else return super.offer(e); } @Override public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { try { waitingThreads.incrementAndGet(); return super.poll(timeout, unit); } finally { waitingThreads.decrementAndGet(); } } @Override public Runnable take() throws InterruptedException { try { waitingThreads.incrementAndGet(); return super.take(); } finally { waitingThreads.decrementAndGet(); } } }; ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue) { @Override public void execute(Runnable command) { super.execute(command); if (getQueue().size() > waitingThreads.get()) super.execute(SENTINEL_NO_OP); } }; threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (r == SENTINEL_NO_OP) return; else throw new RejectedExecutionException(); } }); 

    A melhor solução que posso pensar é estender.

    ThreadPoolExecutor oferece alguns methods de gancho: beforeExecute e afterExecute . Em sua extensão, você poderia manter o uso de uma fila limitada para alimentar tarefas e uma segunda fila ilimitada para lidar com estouro. Quando alguém chama o submit , você pode tentar colocar a solicitação na fila limitada. Se você tiver uma exceção, basta colocar a tarefa na sua fila de estouro. Em seguida, você poderia utilizar o gancho afterExecute para ver se há algo na fila de estouro após concluir uma tarefa. Desta forma, o executor irá cuidar das coisas na sua primeira fila limitada, e puxar automaticamente desta fila ilimitada conforme o tempo permitir.

    Parece mais trabalho do que sua solução, mas pelo menos não envolve dar comportamentos inesperados às filas. Eu também imagino que há uma maneira melhor de verificar o status da fila e dos threads em vez de confiar em exceções, que são relativamente lentas para serem lançadas.