RxJava e execução paralela do código do observador

Eu estou tendo o seguinte código usando RxJava Observable api:

Observable observable = fileProcessor.processFileObservable(processedFile.getAbsolutePath()); observable .buffer(10000) .observeOn(Schedulers.computation()) .subscribe(recordInfo -> { _logger.info("Running stage2 on thread with id : " + Thread.currentThread().getId()); for(Info info : recordInfo) { // some I/O operation logic } }, exception -> { }, () -> { }); 

Minha expectativa era que o código de observação, ou seja, o código dentro do método subscribe (), fosse executado em paralelo após eu ter especificado o planejador de computação. Em vez disso, o código ainda está sendo executado sequencialmente em um único encadeamento. Como pode fazer o código rodar em paralelo usando a API RxJava.

RxJava é frequentemente mal entendido quando se trata dos aspectos asynchronouss / multithread. A codificação de operações multithread é simples, mas entender a abstração é outra coisa.

Uma pergunta comum sobre o RxJava é como alcançar a paralelização ou emitir vários itens simultaneamente de um Observable. Naturalmente, essa definição quebra o contrato observável que afirma que onNext () deve ser chamado seqüencialmente e nunca simultaneamente por mais de um thread por vez.

Para alcançar o paralelismo, você precisa de múltiplos observáveis.

Isso é executado em um único segmento:

 Observable vals = Observable.range(1,10); vals.subscribeOn(Schedulers.computation()) .map(i -> intenseCalculation(i)) .subscribe(val -> System.out.println("Subscriber received " + val + " on " + Thread.currentThread().getName())); 

Isso é executado em vários segmentos:

 Observable vals = Observable.range(1,10); vals.flatMap(val -> Observable.just(val) .subscribeOn(Schedulers.computation()) .map(i -> intenseCalculation(i)) ).subscribe(val -> System.out.println(val)); 

Código e texto vem desta postagem do blog.

Você precisa especificar subscribeOn(Schedulers.computation()) vez de observeOn(Schedulers.computation()) para essa finalidade. Em subscribeOn você declara em qual thread você vai emitir seus valores. observeOn você declara em qual segmento você vai lidar e observá-los.

Usando flatMap e especifique para se inscrever no Schedulers.computation() irá obter a simultaneidade.

Aqui está um exemplo mais prático usando Callable , da saída, podemos ver que levará cerca de 2000 milissegundos para concluir todas as tarefas.

 static class MyCallable implements Callable { private static final Object CALLABLE_COUNT_LOCK = new Object(); private static int callableCount; @Override public Integer call() throws Exception { Thread.sleep(2000); synchronized (CALLABLE_COUNT_LOCK) { return callableCount++; } } public static int getCallableCount() { synchronized (CALLABLE_COUNT_LOCK) { return callableCount; } } } private static void runMyCallableConcurrentlyWithRxJava() { long startTimeMillis = System.currentTimeMillis(); final Semaphore semaphore = new Semaphore(1); try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } Observable.just(new MyCallable(), new MyCallable(), new MyCallable(), new MyCallable()) .flatMap(new Function>() { @Override public ObservableSource apply(@NonNull MyCallable myCallable) throws Exception { return Observable.fromCallable(myCallable).subscribeOn(Schedulers.computation()); } }) .subscribeOn(Schedulers.computation()) .subscribe(new Observer() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull Object o) { System.out.println("onNext " + o); } @Override public void onError(@NonNull Throwable e) { } @Override public void onComplete() { if (MyCallable.getCallableCount() >= 4) { semaphore.release(); } } }); try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } System.out.println("durationMillis " + (System.currentTimeMillis()-startTimeMillis)); } 

O RxJava 2.0.5 introduziu streams paralelos e ParallelFlowable , o que torna a execução paralela mais simples e mais declarativa.

Você não precisa mais criar Observable / flatMap dentro do flatMap , você pode simplesmente chamar parallel() no Flowable e ele retorna ParallelFlowable .

Não é tão rico em resources como um Flowable regular, porque a simultaneidade gera muitos problemas com contratos de Rx, mas você tem map() , filter() e muito mais básicos, que devem ser suficientes na maioria dos casos.

Então, ao invés deste stream da resposta @LordRaydenMK

 Observable vals = Observable.range(1,10); vals.flatMap(val -> Observable.just(val) .subscribeOn(Schedulers.computation()) .map(i -> intenseCalculation(i)) ).subscribe(val -> System.out.println(val)); 

agora você pode fazer:

 Flowable vals = Flowable.range(1, 10); vals.parallel() .runOn(Schedulers.computation()) .map(i -> intenseCalculation(i)) .sequential() .subscribe(val -> System.out.println(val)); 

Isso ainda vem na mesma sequência. Mesmo em novos tópicos

Ob3 observável = intervalo observável (1, 5);

  ob3.flatMap(new Func1>() { @Override public Observable call(Integer pArg0) { return Observable.just(pArg0); } }).subscribeOn(Schedulers.newThread()).map(new Func1() { @Override public Integer call(Integer pArg0) { try { Thread.sleep(1000 - (pArg0 * 100)); System.out.println(pArg0 + " ccc " + Thread.currentThread().getName()); } catch (Exception e) { e.printStackTrace(); } return pArg0; } }).subscribe(); 

RxNewThreadScheduler-1 2 ccc RxNewThreadScheduler-1 3 ccc