Configuração de ThreadPoolExecutor

Eu tenho uma situação em que espero que a adição ao conjunto de encadeamentos seja mais rápida que o processamento. Não acho que uma fila ilimitada seja uma boa ideia, pois há dados suficientes para que a fila cresça para consumir toda a memory, se não for marcada. Dado isso, estou tentando determinar a configuração correta para um ThreadPoolExecutor.

Meu primeiro pensamento é um pool de thread fixo com handoff direto e chamador executa política de falha. Mas eu me pergunto se isso prejudicará a taxa de transferência (porque toda vez que o chamador executa a política é chamado, as tarefas do pool de threads provavelmente serão concluídas e permanecerão ociosas por algum tempo).

Outra ideia é o pool de threads fixo com ArrayBlockingQueue, mas na verdade não tenho certeza do comportamento disso. Espero que isso signifique que o Executor prefira criar encadeamentos se for menor que o tamanho do coreThread, depois as filas e, se a fila estiver cheia, bloqueará a espera pela fila para obter espaço. Mas lendo os documentos aqui:

http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.html

Parece que ele preferirá criar encadeamentos até o corePoolSize, depois adicionar à fila e, se a fila estiver cheia, ele tentará criar encadeamentos até maxThreads (o mesmo que coreThreads neste caso) e, caso falhe, ele executará a diretiva de falha .

Alguém pode esclarecer o comportamento do caso acima? E também sugerir qual seria a melhor configuração para esse caso específico (uma de minhas ideias sugeridas ou outra que poderia funcionar melhor)?

Eu acho que faço outra resposta porque é uma solução diferente para o mesmo problema.

Você pode usar apenas um ThreadPoolExecutor e Semaphore. O semáforo seria criado com o número máximo que você deseja permitir na fila e depois que cada thread conclui a execução, você invocaria release (beforeExecute, que é quando o item é retirado da fila)

Semaphore semaphore = new Semaphore(1000); ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,60,TimeUnit.SECONDS,new LinkedBlockingQueue()){ protected void beforeExecute(Runnable r, Throwable t) { semaphore.release(); } } public void doSubmit(Runnable r){ sempahore.acquire(); executor.submit(r); } 

Então, aqui todos os tópicos serão suspensos até que haja uma permissão disponível (input na fila).

O ThreadPoolExecutor criará mais encadeamentos quando todos os encadeamentos estiverem sendo usados ​​no momento. Isso significa que a fila pode estar vazia, mas se todos os encadeamentos estiverem executando tarefas anteriores, uma nova tarefa criará um novo encadeamento até que o máximo seja atingido.

Se a fila estiver cheia e os encadeamentos estiverem todos saturados, o ThreadPoolExecutor rejeitará a tarefa e lançará uma RejectedExecutionException . Portanto, usar um BlockingQueue não terá os resultados esperados.

Se você quiser limitar o número de tarefas que estão atualmente na fila, você pode usar um ExecutorCompletionService e a fila de apoio.

 //core 5 max 10 with 60 second idle time ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,60,TimeUnit.SECONDS,new LinkedBlockingQueue()); ExecutorCompletionService completionService = new ExecutorCompletionService(executor); private final static int MAX_IN_QUEUE = 1000; public void doSubmit(Runnable r){ while(executor.getQueue().size() >= MAX_IN_QUEUE) completionService.poll(100,TimeUnit.MILLISECONDS); completionService.submit(r); } 

Isso tem o efeito colateral perceptível de ter que esperar continuamente por um elemento para ser concluído. Eu loop na condição por causa da possível condição de corrida do enquanto realmente sendo verdadeira imaterialidade depois de entrar no bloco.

E, claro, a condição de corrida de vários envios, mas deve diminuir o suficiente para evitar a superlotação da fila. Isso pode ser resolvido simplesmente sincronizando o método doSubmit .

Se um único encadeamento estiver fazendo todas as solicitações, uma fila ilimitada será bloqueada sem o efeito colateral negativo do crescimento da fila.

Se vários encadeamentos fizerem as solicitações, a política de conjunto fixo com execuções de chamador deverá funcionar conforme desejado. Os threads restantes no pool serão mantidos ativos pelos outros threads solicitantes.

Você pode fazer com que o produtor pause sempre que o comprimento da fila ficar muito longo.

Algo como isso limita o tamanho da fila de tarefas em espera para um MAX_LEN.

 ExecutorService service = Queue workQ = // queue of service. BufferedReader br = String line; while((line = br.readline()) != null) { service.submit(new ProcessLineRunnable(line)); while(workQ.size() > MAX_LEN) Thread.sleep(1); }