Tratamento de exceções de tarefas Java ExecutorService

Estou tentando usar a class ThreadPoolExecutor do Java para executar um grande número de tarefas pesadas com um número fixo de threads. Cada uma das tarefas tem muitos lugares durante os quais pode falhar devido a exceções.

Eu subclass ThreadPoolExecutor e substituí o método afterExecute , que deveria fornecer exceções não detectadas durante a execução de uma tarefa. No entanto, parece que não consigo dar certo.

Por exemplo:

 public class ThreadPoolErrors extends ThreadPoolExecutor { public ThreadPoolErrors() { super( 1, // core threads 1, // max threads 1, // timeout TimeUnit.MINUTES, // timeout units new LinkedBlockingQueue() // work queue ); } protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if(t != null) { System.out.println("Got an error: " + t); } else { System.out.println("Everything's fine--situation normal!"); } } public static void main( String [] args) { ThreadPoolErrors threadPool = new ThreadPoolErrors(); threadPool.submit( new Runnable() { public void run() { throw new RuntimeException("Ouch! Got an error."); } } ); threadPool.shutdown(); } } 

A saída deste programa é “Tudo está bem – situação normal!” mesmo que o único Runnable enviado ao pool de threads lança uma exceção. Alguma pista do que está acontecendo aqui?

Obrigado!

    Dos docs :

    Nota: Quando as ações são incluídas em tarefas (como FutureTask) explicitamente ou por meio de methods como submit, esses objects de tarefa capturam e mantêm exceções computacionais e, portanto, não causam interrupção abrupta, e as exceções internas não são passadas para esse método. .

    Quando você envia um Runnable, ele fica envolto em um futuro.

    Seu afterExecute deve ser algo assim:

     public final class ExtendedExecutor extends ThreadPoolExecutor { // ... protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if (t == null && r instanceof Future< ?>) { try { Future< ?> future = (Future< ?>) r; if (future.isDone()) { future.get(); } } catch (CancellationException ce) { t = ce; } catch (ExecutionException ee) { t = ee.getCause(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } } if (t != null) { System.out.println(t); } } } 

    AVISO : Deve-se notar que esta solução irá bloquear o segmento de chamada.


    Se você deseja processar exceções lançadas pela tarefa, geralmente é melhor usar o Callable vez do Runnable .

    Callable.call() tem permissão para lançar exceções verificadas e elas são propagadas de volta para o segmento de chamada:

     Callable task = ... Future future = executor.submit(task); try { future.get(); } catch (ExecutionException ex) { ex.getCause().printStackTrace(); } 

    Se Callable.call() lançar uma exceção, ela será empacotada em uma ExecutionException e lançada por Future.get() .

    É provável que isso seja muito mais adequado para a subclass de ThreadPoolExecutor . Também lhe dá a oportunidade de reenviar a tarefa se a exceção for recuperável.

    A explicação para esse comportamento está correta no javadoc para afterExecute :

    Nota: Quando as ações são incluídas em tarefas (como FutureTask) explicitamente ou por meio de methods como submit, esses objects de tarefa capturam e mantêm exceções computacionais e, portanto, não causam interrupção abrupta, e as exceções internas não são passadas para esse método. .

    Eu consegui contorná-lo envolvendo o executável fornecido enviado ao executor.

     CompletableFuture.runAsync( () -> { try { runnable.run(); } catch (Throwable e) { Log.info(Concurrency.class, "runAsync", e); } }, executorService ); 

    Estou usando a class VerboseRunnable do jcabi-log , que engole todas as exceções e as registra. Muito conveniente, por exemplo:

     import com.jcabi.log.VerboseRunnable; scheduler.scheduleWithFixedDelay( new VerboseRunnable( Runnable() { public void run() { // the code, which may throw } }, true // it means that all exceptions will be swallowed and logged ), 1, 1, TimeUnit.MILLISECONDS ); 

    Outra solução seria usar o ManagedTask e ManagedTaskListener .

    Você precisa de um Callable ou Runnable que implemente a interface ManagedTask .

    O método getManagedTaskListener retorna a instância desejada.

     public ManagedTaskListener getManagedTaskListener() { 

    E você implementa no ManagedTaskListener o método taskDone :

     @Override public void taskDone(Future< ?> future, ManagedExecutorService executor, Object task, Throwable exception) { if (exception != null) { LOGGER.log(Level.SEVERE, exception.getMessage()); } } 

    Mais detalhes sobre o ciclo de vida da tarefa gerenciada e o listener .

    Se você quiser monitorar a execução da tarefa, poderá girar 1 ou 2 threads (talvez mais dependendo da carga) e usá-los para executar tarefas de um wrapper ExecutionCompletionService.

    Se o seu ExecutorService vem de uma fonte externa (ou seja, não é possível subclass ThreadPoolExecutor e replace afterExecute() ), você pode usar um proxy dynamic para obter o comportamento desejado:

     public static ExecutorService errorAware(final ExecutorService executor) { return (ExecutorService) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[] {ExecutorService.class}, (proxy, method, args) -> { if (method.getName().equals("submit")) { final Object arg0 = args[0]; if (arg0 instanceof Runnable) { args[0] = new Runnable() { @Override public void run() { final Runnable task = (Runnable) arg0; try { task.run(); if (task instanceof Future< ?>) { final Future< ?> future = (Future< ?>) task; if (future.isDone()) { try { future.get(); } catch (final CancellationException ce) { // Your error-handling code here ce.printStackTrace(); } catch (final ExecutionException ee) { // Your error-handling code here ee.getCause().printStackTrace(); } catch (final InterruptedException ie) { Thread.currentThread().interrupt(); } } } } catch (final RuntimeException re) { // Your error-handling code here re.printStackTrace(); throw re; } catch (final Error e) { // Your error-handling code here e.printStackTrace(); throw e; } } }; } else if (arg0 instanceof Callable< ?>) { args[0] = new Callable() { @Override public Object call() throws Exception { final Callable< ?> task = (Callable< ?>) arg0; try { return task.call(); } catch (final Exception e) { // Your error-handling code here e.printStackTrace(); throw e; } catch (final Error e) { // Your error-handling code here e.printStackTrace(); throw e; } } }; } } return method.invoke(executor, args); }); } 

    Isso é por causa de AbstractExecutorService :: submit está envolvendo seu runnable em RunnableFuture (nada além de FutureTask ) como abaixo

     AbstractExecutorService.java public Future< ?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task, null); /////////HERE//////// execute(ftask); return ftask; } 

    Em seguida, execute irá passá-lo para Worker and Worker.run() irá chamar o abaixo.

     ThreadPoolExecutor.java final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); /////////HERE//////// } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } 

    Finalmente task.run(); na chamada de código acima chamará FutureTask.run() . Aqui está o código manipulador de exceção, por causa disso você NÃO está recebendo a exceção esperada.

     class FutureTask implements RunnableFuture public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { /////////HERE//////// result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } 

    Isso funciona

    • É derivado do SingleThreadExecutor, mas você pode adaptá-lo facilmente
    • Java 8 códigos lamdas, mas fáceis de corrigir

    Ele criará um Executor com um único thread, que pode ter muitas tarefas; e esperará que a atual finalize a execução para começar com a próxima

    Em caso de erro ou exceção de erro, o uncaughtExceptionHandler irá detectá -lo

     class final pública SingleThreadExecutorWithExceptions {
    
         public static ExecutorService newSingleThreadExecutorWithExceptions (final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
    
             Fábrica ThreadFactory = (Runnable runnable) -> {
                 Tópico final newThread = new Thread (runnable, "SingleThreadExecutorWithExceptions");
                 newThread.setUncaughtExceptionHandler ((final Thread caugthThread, final Throwable throwable) -> {
                     uncaughtExceptionHandler.uncaughtException (caugthThread, throwable);
                 });
                 return newThread;
             };
             devolver novo FinalizableDelegatedExecutorService
                     (novo ThreadPoolExecutor (1, 1,
                             0L, TimeUnit.MILLISECONDS,
                             novo LinkedBlockingQueue (),
                             fábrica){
    
    
                         protected void afterExecute (Runnable runnable, Throwable throwable) {
                             super.afterExecute (runnable, throwable);
                             if (throwable == null && runnable instanceof Future) {
                                 experimentar {
                                     Futuro futuro = (Futuro) executável;
                                     if (future.isDone ()) {
                                         future.get ();
                                     }
                                 } catch (CancellationException ce) {
                                     throwable = ce;
                                 } catch (ExecutionException ee) {
                                     throwable = ee.getCause ();
                                 } catch (InterruptedException ie) {
                                     Thread.currentThread (). Interrupt ();  // ignore / reset
                                 }
                             }
                             if (throwable! = null) {
                                 uncaughtExceptionHandler.uncaughtException (Thread.currentThread (), throwable);
                             }
                         }
                     });
         }
    
    
    
         class estática privada FinalizableDelegatedExecutorService
                 estende DelegatedExecutorService {
             FinalizableDelegatedExecutorService (Executor ExecutorService) {
                 super (executor);
             }
             void protegido finalize () {
                 super.shutdown ();
             }
         }
    
         / **
          * Uma class de wrapper que expõe apenas os methods ExecutorService
          * de uma implementação de ExecutorService.
          * /
         class estática privada DelegatedExecutorService estende AbstractExecutorService {
             final privado ExecutorService e;
             DelegatedExecutorService (executor ExecutorService) {e = executor;  }
             public void execute (comando Runnable) {e.execute (comando);  }
             shutdown public void () {e.shutdown ();  }
             public List shutdownNow () {return e.shutdownNow ();  }
             public booleano isShutdown () {return e.isShutdown ();  }
             public boolean isTerminated () {return e.isTerminated ();  }
             public boolean awaitTermination (tempo limite longo, unidade TimeUnit)
                     lança InterruptedException {
                 return e.awaitTermination (timeout, unit);
             }
             public Future submit (Tarefa executável) {
                 return e.submit (tarefa);
             }
             public Future submit (tarefa que pode ser chamada) {
                 return e.submit (tarefa);
             }
             public Future submit (Tarefa executável, resultado T) {
                 return e.submit (tarefa, resultado);
             }
             public List> invokeAll (coleção> tarefas)
                     lança InterruptedException {
                 return e.invokeAll (tarefas);
             }
             public List> invokeAll (coleção> tarefas,
                                                  longo tempo limite, unidade TimeUnit)
                     lança InterruptedException {
                 return e.invokeAll (tarefas, tempo limite, unidade);
             }
             public T invokeAny (coleção> tarefas)
                     lança InterruptedException, ExecutionException {
                 return e.invokeAny (tarefas);
             }
             public T invokeAny (coleção> tarefas,
                                    longo tempo limite, unidade TimeUnit)
                     lança InterruptedException, ExecutionException, TimeoutException {
                 return e.invokeAny (tarefas, tempo limite, unidade);
             }
         }
    
    
    
         private SingleThreadExecutorWithExceptions () {}
     }
    

    Em vez de criar uma subclass de ThreadPoolExecutor, eu fornecê-lo-ia com uma instância ThreadFactory que cria novos Threads e fornece a eles um UncaughtExceptionHandler