3

I have created a fixed thread-pool to process an event emit per 300 milliseconds and assume the process need 1000 millisecond. Suppose the multi-thread will work but only one thread reused.

If i set the sleepTime smaller than 300ms, the processing thread change, but that is useless.

Questions: What can I do to make it concurrent? Why the program reuse the thread?

Thank you in advance

public static void main(String[] args) throws InterruptedException {
    long sleepTime = 1000;
    ExecutorService e = Executors.newFixedThreadPool(3);

    Observable.interval(300, TimeUnit.MILLISECONDS)
    .subscribeOn(Schedulers.computation())
    .flatMap(new Func1<Long, Observable<Long>>() {
        @Override
        public Observable<Long> call(Long pT) {
            return Observable.just(pT).subscribeOn(Schedulers.from(e));
        }
    })
    .doOnNext(new Action1<Long>() {

        @Override
        public void call(Long pT) {
            try {
                Thread.sleep(sleepTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    })
    .subscribe(new Action1<Long>() {

        @Override
        public void call(Long pT) {
            System.out.println("i am " + pT + "in thread:" + Thread.currentThread().getName());

        }
    });


    Thread.sleep(50000);
    e.shutdownNow();

}

Logs

i am 0in thread:pool-1-thread-1
i am 1in thread:pool-1-thread-1
i am 2in thread:pool-1-thread-1
i am 3in thread:pool-1-thread-1
i am 4in thread:pool-1-thread-1
i am 5in thread:pool-1-thread-1
i am 6in thread:pool-1-thread-1
i am 7in thread:pool-1-thread-1
i am 8in thread:pool-1-thread-1
i am 9in thread:pool-1-thread-1
i am 10in thread:pool-1-thread-1
i am 11in thread:pool-1-thread-1
Wins
  • 3,420
  • 4
  • 36
  • 70
Rockman12352
  • 101
  • 10
  • Just a note: you can use jvisualvm to more reliably figure out what is going on in terms of scheduling and what threads are used: http://docs.oracle.com/javase/6/docs/technotes/tools/share/jvisualvm.html – Reut Sharabani Dec 30 '15 at 07:55
  • @ReutSharabani In eclipse Debug view, I can see threads are generated, but the program only reuse one thread. – Rockman12352 Dec 30 '15 at 09:56

3 Answers3

0

From what I understand in your code, the producer is producing at faster rate than the subscriber. However the Observable<Long> interval(long interval, TimeUnit unit) actually doesn't support Backpressure. The documentation states that

This operator does not support backpressure as it uses time. If the downstream needs a slower it should slow the timer or use something like {@link #onBackpressureDrop}.

If your processing is really slower than the producer, what you can do at your subscriber code is something like this

.subscribe(new Action1<Long>() {

    @Override
    public void call(Long pT) {
        e.submit(new Runnable() {
            System.out.println("i am " + pT + "in thread:" + Thread.currentThread().getName());

        }
    }
});
Wins
  • 3,420
  • 4
  • 36
  • 70
  • Sure, I can submit the task to different thread like you said. But I want to do it naturally with Scheduler. – Rockman12352 Dec 30 '15 at 09:54
  • @Rockman12352 I agree, however as far as I remember, Observable will execute the whole execution (from producer to subscriber) in single thread per emission. Meaning for every `Long` data in your producer, it will call all the subscribers in single thread. I might be wrong here, but that's what I got so far – Wins Dec 31 '15 at 06:36
0

Instead

 .subscribeOn(Schedulers.computation())

try

 .observeOn(Schedulers.computation())

This example that I did time ago to play with concurrency with Rx works pretty good as example

   public class ObservableZip {

private Scheduler scheduler;
private Scheduler scheduler1;
private Scheduler scheduler2;

@Test
public void testAsyncZip() {
    scheduler = Schedulers.newThread();
    scheduler1 = Schedulers.newThread();
    scheduler2 = Schedulers.newThread();
    long start = System.currentTimeMillis();
    Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2)
                                                                                        .concat(s3))
              .subscribe(result -> showResult("Async:", start, result));
}




public void showResult(String transactionType, long start, String result) {
    System.out.println(result + " " +
                               transactionType + String.valueOf(System.currentTimeMillis() - start));
}


public Observable<String> obAsyncString() {
    return Observable.just("")
                     .observeOn(scheduler)
                     .doOnNext(val -> {
                         System.out.println("Thread " + Thread.currentThread()
                                                              .getName());
                     })
                     .map(val -> "Hello");
}

public Observable<String> obAsyncString1() {
    return Observable.just("")
                     .observeOn(scheduler1)
                     .doOnNext(val -> {
                         System.out.println("Thread " + Thread.currentThread()
                                                              .getName());
                     })
                     .map(val -> " World");
}

public Observable<String> obAsyncString2() {
    return Observable.just("")
                     .observeOn(scheduler2)
                     .doOnNext(val -> {
                         System.out.println("Thread " +  Thread.currentThread()
                                                               .getName());
                     })
                     .map(val -> "!");
  }

 }
paul
  • 12,873
  • 23
  • 91
  • 153
0

I found the answer in GitHub!

The inner observable did emit on multi-thread, but the follow do on next is not. If I want it parallel, I should do it in the inner observable .

Rockman12352
  • 101
  • 10