O que é mais rápido? Menos trabalho em mais runnables ou mais trabalho em menos runnables? (ExecutorService)

Estou tentando descobrir como posso obter o máximo desempenho de um aplicativo multithread.
Eu tenho um pool de threads que eu criei assim:

ExecutorService executor = Executors.newFixedThreadPool(8); // I have 8 CPU colors. 

A minha pergunta é, devo dividir o trabalho em apenas 8 runnables / callables, que é o mesmo número que os threads no pool de threads, ou devo dividi-lo em, digamos, 1000000 runnables / callables?

 for (int i = 0; i < 1000000; i++) { Callable worker = new MyCallable(); // Each worker does little work. Future submit = executor.submit(worker); } long sum = 0; for (Future future : list) sum += future.get(); // Much more overhead from the for loops 

OU

 for (int i = 0; i < 8; i++) { Callable worker = new MyCallable(); // Each worker does much more work. Future submit = executor.submit(worker); } long sum = 0; for (Future future : list) sum += future.get(); // Negligible overhead from the for loops 

Dividir em callables de 1000000 parece mais lento para mim, já que há a sobrecarga de instanciar todos esses callables e coletar os resultados deles em loops. Por outro lado, se eu tenho 8 callables esta sobrecarga é insignificante. E como eu tenho apenas 8 threads, não posso executar os callables de 1000000 ao mesmo tempo, então não há ganho de desempenho a partir daí.

Estou certo ou errado?

BTW eu poderia testar esses casos, mas a operação é muito trivial e eu acho que o compilador percebe isso e faz algumas otimizações. Então o resultado pode ser enganoso. Eu quero saber qual abordagem é melhor para algo como um aplicativo de image processing.

Existem dois aspectos para essa questão.

Primeiro você tem o material técnico do Java. Como você tem algumas respostas sobre isso, vou resumir para estas noções básicas:

  • se você tiver N Núcleos, então o número N de encadeamentos lhe daria os melhores resultados, contanto que cada tarefa esteja apenas ligada à CPU (ou seja, sem I / O envolvido)
  • Cada Thread deve fazer mais trabalho do que o necessário para a tarefa, ou seja, ter N Threads contando até 10 seria muito mais lento, pois a sobrecarga de criar e gerenciar os Threads extras é maior do que o benefício de contar até 10 em paralelo
  • você precisa ter certeza de que qualquer sobrecarga de synchronization é menor do que o trabalho que está sendo feito, ou seja, ter N Threads chamando um método de incremento synchronized seria muito mais lento
  • Threads tomam resources, mais comumente memory. Quanto mais tópicos você tiver, mais difícil será estimar o uso de memory e afetar o tempo de GC (raro, mas eu já vi isso acontecer)

Em segundo lugar você tem a teoria do agendamento. Você precisa considerar o que seu programa está fazendo

  • Normalmente, use Threads para bloquear operações de E / S. Você não quer que você espere por rede ou HDD se você pudesse estar usando sua CPU para outras tarefas
  • Existem alguns bons livros sobre agendamento (não lembro os nomes) que podem ajudá-lo a projetar programas eficientes. No exemplo mencionado, pode haver casos em que encadeamentos extras façam sentido. Por exemplo, se suas tarefas não tiverem uma duração determinística, estão distorcidas e seu tempo médio de resposta é importante: suponha que você tenha 2 núcleos e 4 tarefas. A tarefa A e B levará 1 minuto cada, mas o C & D levará 10 minutos. Se você executar estes em 2 threads com C & D executando primeiro, seu tempo total será de 11 minutos, mas seu tempo médio de resposta será (10 + 10 + 11 + 11) /4 = 10,5 minutos. Se você executar contra 4 Threads, então o tempo de resposta será ((1 + a) + (1 + a) + (10 + a) + (10 + a)) / 4 = 5,5 + a, onde a é o agendamento aproximação do tempo de espera. Isso é muito teórico porque existem muitas variables ​​não explicadas, mas podem ajudar na criação de programas encadeados. (Também no exemplo acima, desde que você está esperando no Futures você provavelmente não se importa com os tempos médios de resposta)
  • É preciso ter cuidado ao usar vários pools de Thread . O uso de vários pools pode causar deadlocks (se as dependencies forem introduzidas entre os dois pools) e dificultar a otimização (a contenção pode ser criada entre os pools e a obtenção dos tamanhos corretos pode se tornar impossível)

–EDITAR–

Finalmente, se ajudar, a maneira como penso em desempenho é que eu tenho 4 resources principais: CPU, RAM, disco e rede. Eu tento descobrir qual é o meu gargalo e uso resources não saturados para otimizar. Por exemplo, se eu tiver muita CPU inativa e pouca memory, posso compactar meus dados na memory. Se eu tiver muita E / S de disco e memory grande, armazene mais dados em cache. Se os resources de rede (não a conexão de rede real) forem lentos, use muitos encadeamentos para paralelizar. Uma vez que você satura um tipo de recurso em seu caminho crítico e não pode usar outros resources para acelerar, você alcançou seu desempenho máximo e precisa atualizar seu P / P para obter resultados mais rápidos.

Não há uma resposta direta a esta pergunta, pois depende de muitas coisas, como o código, o aplicativo loigc, max, simultaneidade possível, hw etc.

Mas, ao considerar a concorrência, você deve considerar as coisas abaixo,

  1. Cada runnable precisa de uma pilha que é privada para esse segmento, portanto, se você criar não grande. de consumo de memory de threads no thread é mais do que o uso real do aplicativo
  2. O encadeamento deve executar tarefas independentes e paralelas.

    Descobrir o patch de código que pode ser realmente executado em paralelo sem qualquer dependência, caso contrário, a segmentação não ajudará muito

  3. O que é configuração de hardware?

    A execução simultânea máxima de encadeamentos que você pode alcançar é igual no total. de núcleos de cpu. Se você tem menos não. de núcleos e enorme não. de threads, em seguida, a tarefa de comutação é mais ativa (use cpu) do que o segmento real. Isso pode dificultar muito o desempenho

Em suma, sua segunda abordagem parece boa para mim, mas, se possível, descubra mais paralelismo e você pode estender até 20-30.

talvez esse código ajude. Ele calculará números de fibonacci usando um pool de junit fork. Com o fork-join podemos subdividir recursivamente um problema e combinar os resultados de cada nível de recursion. Teoricamente, poderíamos recorrer a fib (0) no pool fork-join, mas isso seria ineficiente. Portanto, introduzimos um limite de recursion onde paramos de subdividir a tarefa e calculamos o restante na tarefa atual. Este código registrará o tempo gasto por fib (x) e computará o tempo de thread único para cada fib (n) para n até x. Para cada limite de recursion, ele medirá quantas tarefas foram criadas e quanto tempo cada uma delas correu, em média.

Normalmente, o ponto ideal é um tamanho de tarefa acima de 1µs, mas então nossas tarefas simples de fibonacci aqui quase não precisam de memory / cache. Para tarefas mais intensivas de dados com maior poluição de cache, o switch é mais caro e tarefas simultâneas podem poluir os caches compartilhados.

 import java.util.concurrent.*; import java.util.concurrent.atomic.*; public class FibonacciFork extends RecursiveTask { private static final long serialVersionUID = 1L; public FibonacciFork( long n) { super(); this.n = n; } static ForkJoinPool fjp = new ForkJoinPool( Runtime.getRuntime().availableProcessors()); static long fibonacci0( long n) { if ( n < 2) { return n; } return fibonacci0( n - 1) + fibonacci0( n - 2); } static int rekLimit = 8; private static long stealCount; long n; private long forkCount; private static AtomicLong forks = new AtomicLong( 0); static class Result { long durMS; int rekLimit; } public static void main( String[] args) { int fiboArg = 49; BenchLogger.sysinfo( "Warmup"); long singleNS[] = getSingleThreadNanos( 20, 5e9); BenchLogger.sysinfo( "Warmup complete"); singleNS = getSingleThreadNanos( fiboArg, 1e9); BenchLogger.sysinfo( "Single Thread Times complete"); Result[] results = new Result[ fiboArg + 1]; for ( int rekLimit = 2; rekLimit <= fiboArg; rekLimit++) { results[ rekLimit] = new Result(); runWithRecursionLimit( rekLimit, fiboArg, singleNS[ rekLimit], results[ rekLimit]); } System.out.println( "CSV results for Fibo " + fiboArg + "\n" + "RekLimit\t" + "Jobs ns\t" + "time ms"); for ( int rekLimit = 2; rekLimit <= fiboArg; rekLimit++) { System.out.println( rekLimit + "\t" + singleNS[ rekLimit] + "\t" + results[ rekLimit].durMS); } } private static long[] getSingleThreadNanos( final int n, final double minRuntimeNS) { final long timesNS[] = new long[ n + 1]; ExecutorService es = Executors.newFixedThreadPool( Math.max( 1, Runtime.getRuntime().availableProcessors() / 8)); for ( int i = 2; i <= n; i++) { final int arg = i; Runnable runner = new Runnable() { @Override public void run() { long start = System.nanoTime(); long result = fibonacci0( arg); long end = System.nanoTime(); double durNS = end - start; long ntimes = 1; double fact = 1; while ( durNS < minRuntimeNS) { long oldNTimes = ntimes; if ( durNS > 0) { ntimes = Math.max( 1, ( long) ( oldNTimes * fact * minRuntimeNS / durNS)); } else { ntimes *= 2; } start = System.nanoTime(); for ( long i = 0; i < ntimes; i++) { result = fibonacci0( arg); } end = System.nanoTime(); durNS = end - start; fact *= 1.1; } timesNS[ arg] = ( long) ( durNS / ntimes); System.out.println( "Single Fib(" + arg + ")=" + result + " in " + ( timesNS[ arg] / 1e6) + "ms (" + ntimes + " loops in " + (durNS / 1e6) + " ms)"); } }; es.execute( runner); } es.shutdown(); try { es.awaitTermination( 1, TimeUnit.HOURS); } catch ( InterruptedException e) { BenchLogger.sysinfo( "Single Timeout"); } return timesNS; } private static void runWithRecursionLimit( int r, int arg, long singleThreadNanos, Result result) { rekLimit = r; long start = System.currentTimeMillis(); long fiboResult = fibonacci( arg); long end = System.currentTimeMillis(); // Steals zählen long currentSteals = fjp.getStealCount(); long newSteals = currentSteals - stealCount; stealCount = currentSteals; long forksCount = forks.getAndSet( 0); final long durMS = end-start; System.out.println( "Fib(" + arg + ")=" + fiboResult + " in " + durMS + "ms, recursion limit: " + r + " at " + ( singleThreadNanos / 1e6) + "ms, steals: " + newSteals + " forks " + forksCount); result.durMS = durMS; result.rekLimit = r; } static long fibonacci( final long arg) { FibonacciFork task = new FibonacciFork( arg); long result = fjp.invoke( task); forks.set( task.forkCount); return result; } @Override protected Long compute() { if ( n <= rekLimit) { return fibonacci0( n); } FibonacciFork ff1 = new FibonacciFork( n-1); FibonacciFork ff2 = new FibonacciFork( n-2); ff1.fork(); long r2 = ff2.compute(); long r1 = ff1.join(); forkCount = ff2.forkCount + ff1.forkCount + 1; return r1 + r2; } }