21

In this blog, he gives this (copy/pasted the following code) example for the callback hell. However, there is no mention of how the issue can be eliminated by using Reactive Extensions.

So here F3 depends upon F1 completion and F4 and F5 depend upon F2 completion.

  1. Wondering what would be the functional equivalent in Rx.
  2. How to represent in Rx that F1, F2, F3, F4 and F5 should all be pulled asynchronously?

NOTE: I am currently trying to wrap my head around Rx so I didn't try solving this example before asking this question.

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class CallbackB {

    /**
     * Demonstration of nested callbacks which then need to composes their responses together.
     * <p>
     * Various different approaches for composition can be done but eventually they end up relying upon
     * synchronization techniques such as the CountDownLatch used here or converge on callback design
     * changes similar to <a href="https://github.com/Netflix/RxJava">Rx</a>.
     */
    public static void run() throws Exception {
        final ExecutorService executor = new ThreadPoolExecutor(4, 4, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>());
        /* the following are used to synchronize and compose the asynchronous callbacks */
        final CountDownLatch latch = new CountDownLatch(3);
        final AtomicReference<String> f3Value = new AtomicReference<String>();
        final AtomicReference<Integer> f4Value = new AtomicReference<Integer>();
        final AtomicReference<Integer> f5Value = new AtomicReference<Integer>();

        try {
            // get f3 with dependent result from f1
            executor.execute(new CallToRemoteServiceA(new Callback<String>() {

                @Override
                public void call(String f1) {
                    executor.execute(new CallToRemoteServiceC(new Callback<String>() {

                        @Override
                        public void call(String f3) {
                            // we have f1 and f3 now need to compose with others
                            System.out.println("intermediate callback: " + f3 + " => " + ("f4 * f5"));
                            // set to thread-safe variable accessible by external scope 
                            f3Value.set(f3);
                            latch.countDown();
                        }

                    }, f1));
                }

            }));

            // get f4/f5 after dependency f2 completes 
            executor.execute(new CallToRemoteServiceB(new Callback<Integer>() {

                @Override
                public void call(Integer f2) {
                    executor.execute(new CallToRemoteServiceD(new Callback<Integer>() {

                        @Override
                        public void call(Integer f4) {
                            // we have f2 and f4 now need to compose with others
                            System.out.println("intermediate callback: f3" + " => " + (f4 + " * f5"));
                            // set to thread-safe variable accessible by external scope 
                            f4Value.set(f4);
                            latch.countDown();
                        }

                    }, f2));
                    executor.execute(new CallToRemoteServiceE(new Callback<Integer>() {

                        @Override
                        public void call(Integer f5) {
                            // we have f2 and f5 now need to compose with others
                            System.out.println("intermediate callback: f3" + " => " + ("f4 * " + f5));
                            // set to thread-safe variable accessible by external scope 
                            f5Value.set(f5);
                            latch.countDown();
                        }

                    }, f2));
                }

            }));

            /* we must wait for all callbacks to complete */
            latch.await();
            System.out.println(f3Value.get() + " => " + (f4Value.get() * f5Value.get()));
        } finally {
            executor.shutdownNow();
        }
    }

    public static void main(String[] args) {
        try {
            run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static final class CallToRemoteServiceA implements Runnable {

        private final Callback<String> callback;

        private CallToRemoteServiceA(Callback<String> callback) {
            this.callback = callback;
        }

        @Override
        public void run() {
            // simulate fetching data from remote service
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            callback.call("responseA");
        }
    }

    private static final class CallToRemoteServiceB implements Runnable {

        private final Callback<Integer> callback;

        private CallToRemoteServiceB(Callback<Integer> callback) {
            this.callback = callback;
        }

        @Override
        public void run() {
            // simulate fetching data from remote service
            try {
                Thread.sleep(40);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            callback.call(100);
        }
    }

    private static final class CallToRemoteServiceC implements Runnable {

        private final Callback<String> callback;
        private final String dependencyFromA;

        private CallToRemoteServiceC(Callback<String> callback, String dependencyFromA) {
            this.callback = callback;
            this.dependencyFromA = dependencyFromA;
        }

        @Override
        public void run() {
            // simulate fetching data from remote service
            try {
                Thread.sleep(60);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            callback.call("responseB_" + dependencyFromA);
        }
    }

    private static final class CallToRemoteServiceD implements Runnable {

        private final Callback<Integer> callback;
        private final Integer dependencyFromB;

        private CallToRemoteServiceD(Callback<Integer> callback, Integer dependencyFromB) {
            this.callback = callback;
            this.dependencyFromB = dependencyFromB;
        }

        @Override
        public void run() {
            // simulate fetching data from remote service
            try {
                Thread.sleep(140);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            callback.call(40 + dependencyFromB);
        }
    }

    private static final class CallToRemoteServiceE implements Runnable {

        private final Callback<Integer> callback;
        private final Integer dependencyFromB;

        private CallToRemoteServiceE(Callback<Integer> callback, Integer dependencyFromB) {
            this.callback = callback;
            this.dependencyFromB = dependencyFromB;
        }

        @Override
        public void run() {
            // simulate fetching data from remote service
            try {
                Thread.sleep(55);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            callback.call(5000 + dependencyFromB);
        }
    }

    private static interface Callback<T> {
        public void call(T value);
    }
}
Teocci
  • 7,189
  • 1
  • 50
  • 48
Aravind Yarram
  • 78,777
  • 46
  • 231
  • 327
  • I'm not an Rx master, but from a session that he gave: you can avoid the callback hell by composing Observables. If @benjchristensen is around - he might be able to provide more details. – Nir Alfasi Feb 09 '15 at 03:35
  • 2
    @alfasin I learnt enough of RxJava to know that it should be solved by composing observables. The question here is HOW this can be composed :-) – Aravind Yarram Feb 09 '15 at 03:36
  • In that case - I would change the question from "how to avoid callback hell?" to "how to compose observables? " ;) – Nir Alfasi Feb 09 '15 at 03:39
  • 1
    @alfasin Re-worded the question...do you have an answer? – Aravind Yarram Feb 09 '15 at 03:43
  • @Pangea Shouldn't it be how to compose observables? – pt2121 Feb 09 '15 at 03:52
  • Sorry that I had to bail out, but I see that you got your answer from the master himself - and I could definitely NOT top that! :) – Nir Alfasi Feb 11 '15 at 06:32

2 Answers2

32

I'm the original author of the referenced blog post about callbacks and Java Futures. Here is an example of using flatMap, zip and merge to do service composition asynchronously.

It fetches a User object, then concurrently fetches Social and PersonalizedCatalog data, then for each Video from the PersonalizedCatalog concurrently fetches a Bookmark, Rating and Metadata, zips those together, and merges all of the responses into a progressive stream output as Server-Sent Events.

return getUser(userId).flatMap(user -> {
    Observable<Map<String, Object>> catalog = getPersonalizedCatalog(user)
            .flatMap(catalogList -> catalogList.videos().<Map<String, Object>> flatMap(
                    video -> {
                        Observable<Bookmark> bookmark = getBookmark(video);
                        Observable<Rating> rating = getRatings(video);
                        Observable<VideoMetadata> metadata = getVideoMetadata(video);
                        return Observable.zip(bookmark, rating, metadata, (b, r, m) -> combineVideoData(video, b, r, m));
                    }));

    Observable<Map<String, Object>> social = getSocial(user).map(s -> {
        return s.getDataAsMap();
    });

    return Observable.merge(catalog, social);
}).flatMap(data -> {
    String json = SimpleJson.mapToJson(data);
    return response.writeStringAndFlush("data: " + json + "\n");
});

This example can be seen in context of a functioning application at https://github.com/Netflix/ReactiveLab/blob/952362b89a4d4115ae0eecf0e73f273ecb27ba98/reactive-lab-gateway/src/main/java/io/reactivex/lab/gateway/routes/RouteForDeviceHome.java#L33

Since I can't possibly provide all of the information here you can also find an explanation in presentation form (with link to video) at https://speakerdeck.com/benjchristensen/reactive-streams-with-rx-at-javaone-2014?slide=32.

benjchristensen
  • 1,771
  • 1
  • 13
  • 9
  • Thank you. I presume the getBookmark(...) etc implementations internally use subscribeOn(Schedulers.io()) right? And also, the zip operation runs on the calling thread. – Aravind Yarram Feb 09 '15 at 23:29
  • 1
    Yes they can use subscribeOn with the IO scheduler to make blocking IO async. At Netflix we typically use Hystrix to take care of that for us and provide bulkheading, timeouts, fallbacks, metrics, etc. If the IO is nonblocking such as via Netty then subscribeOn is unnecessary. – benjchristensen Feb 10 '15 at 05:20
5

According to your code. Suppose that the remote call are done using Observable.

 Observable<Integer>  callRemoveServiceA()  { /* async call */  }

/* .... */

Observable<Integer>  callRemoveServiceE(Integer f2) { /* async call */  }

What you want :

  • call serviceA then call serviceB with the result of serviceA
  • call serviceC then call serviceD and serviceE with the result of serviceC
  • with the result of serviceE and serviceD, build a new value
  • display the new value with the result of serviceB

With RxJava, you'll achieve this with this code :

Observable<Integer> f3 = callRemoveServiceA() // call serviceA
            // call serviceB with the result of serviceA
            .flatMap((f1) -> callRemoveServiceB(f1)); 


Observable<Integer> f4Andf5 = callRemoveServiceC() // call serviceC
                    // call serviceD and serviceE then build a new value
                    .flatMap((f2) -> callRemoveServiceD(f2).zipWith(callRemoveServiceE(f2), (f4, f5) -> f4 * f5));

// compute the string to display from f3, and the f4, f5 pair
f3.zipWith(f4Andf5, (childF3, childF4Andf5) -> childF3 + " => " + childF4Andf5)
            // display the value
            .subscribe(System.out::println);

the important part here is the use of flapMap and zip (or zipWith)

You can get more info on flapMap here : When do you use map vs flatMap in RxJava?

Community
  • 1
  • 1
dwursteisen
  • 11,435
  • 2
  • 39
  • 36
  • RWith this can I assume that, once subscribed, f4Andf5 can execute sooner than f3. Provided service A and B take longer? I ask because with futures, both service A and B should return before others can run. – Aravind Yarram Feb 09 '15 at 12:18
  • Yes, if each call to remote service are async. – dwursteisen Feb 09 '15 at 13:08