4

I should stream an endpoint from a server that returns a json with Transfer-Encoding: chunked.

I have the following code but I am not able to read the response. I tried responseBody.streamBytes() and convert input stream into a String but I can't do it in main thread. How could I read the response?

@Streaming
@GET("stream/status")
Observable<ResponseBody> streamStatus();

Observable<ResponseBody> observable = ApiClientHelper.getClient().streamStatus();
    observable
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<ResponseBody>() {
                @Override
                public void onSubscribe(Disposable d) {
                }

                @Override
                public void onNext(final ResponseBody responseBody) {
                   //DON'T KNOW HOW TO READ DATA
                }

                @Override
                public void onError(Throwable e) {
                }

                @Override
                public void onComplete() {
                }
            });

EDIT:

Server response using CURL:

*   Trying 192.168.1.3...
* TCP_NODELAY set
* Connected to 192.168.1.3 (192.168.1.3) port 80 (#0)
> GET /stream/meter HTTP/1.1
> Host: 192.168.1.3
> User-Agent: curl/7.54.0
> Accept: */*
> Cookie:sessionId=bf2533346190e1c72b532b9d6ec6a405
> 
< HTTP/1.1 200 OK 
< Content-Type: text/event-stream
< Cache-Control: no-cache, no-store
< Date: Tue, 17 Oct 2017 12:02:03 GMT
< Transfer-Encoding: chunked
< Connection: Keep-Alive
< 
data: {"production":{"ph-a":{"p":-0.817,"q":0.0,"s":47.302,"v":225.697,"i":0.21,"pf":0.0,"f":50.0},"ph-b":{"p":-0.066,"q":-0.0,"s":0.643,"v":3.091,"i":0.206,"pf":0.0,"f":50.0},"ph-c":{"p":-0.195,"q":-0.0,"s":0.943,"v":7.577,"i":0.123,"pf":0.0,"f":50.0}},"net-consumption":{"ph-a":{"p":-0.598,"q":0.0,"s":51.931,"v":225.606,"i":0.231,"pf":0.0,"f":50.0},"ph-b":{"p":-0.088,"q":0.0,"s":0.875,"v":4.585,"i":0.19,"pf":0.0,"f":50.0},"ph-c":{"p":-0.043,"q":0.0,"s":0.16,"v":1.23,"i":0.13,"pf":-1.0,"f":50.0}},"total-consumption":{"ph-a":{"p":-1.415,"q":-0.0,"s":-4.599,"v":225.652,"i":-0.02,"pf":-1.0,"f":50.0},"ph-b":{"p":-0.154,"q":0.0,"s":0.06,"v":3.838,"i":0.016,"pf":-1.0,"f":50.0},"ph-c":{"p":-0.237,"q":0.0,"s":-0.033,"v":4.404,"i":-0.008,"pf":-1.0,"f":50.0}}}
robinCTS
  • 5,746
  • 14
  • 30
  • 37
Yamila
  • 443
  • 1
  • 9
  • 20
  • 1
    This is a guess, I'm no expert in http, but I think Retrfoit isn't in and of itself an HTTP Client. So as your response you might already get the whole downloaded object, and not the individual chunks. The handling of the chunks would probably happen somewhere at the level of OkHTTP. – Lukasz Oct 24 '17 at 10:19
  • Don´t understand. Please see my code, I only have the responsebody in the onNext method. – Yamila Oct 24 '17 at 13:31

1 Answers1

5

Adding the @Streaming annotation causes retrofit not to move the entire file into memory, but rather pass the incoming bytes on right away. This enables you to process streams of data which might be larger than your total available memory. However if you try to do this on the main thread you will get a android.os.NetworkOnMainThreadException, which is what I assume you got. So the problem lies in .observeOn(AndroidSchedulers.mainThread).

Edit: Fair warning. I haven't run this.

    observable
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io())
            .subscribe(new Observer<ResponseBody>() {
                @Override
                public void onSubscribe(Disposable disposable) {

                }

                @Override
                public void onNext(ResponseBody responseBody) {
                    InputStream inputStream = responseBody.byteStream();
                    BufferedReader br = null;
                    StringBuilder sb = new StringBuilder();

                    String line;
                    try {

                        br = new BufferedReader(new InputStreamReader(inputStream));
                        while (br.ready()) {
                            line = br.readLine();
                            sb.append(line);
                        }

                    } catch (IOException e) {
                        e.printStackTrace();
                    } finally {
                        if (br != null) {
                            try {
                                br.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    }

                    Log.d("streamed string", sb.toString()); // replace log with whatever you want to do with it.
                }

                @Override
                public void onError(Throwable throwable) {

                }

                @Override
                public void onComplete() {

                }
            });
Lukasz
  • 2,257
  • 3
  • 26
  • 44
  • when I try to debug it, it stucks at: StringBuilder sb = new StringBuilder(); and when I don't debug it and just click Run, it doesn't log anything – Yamila Oct 25 '17 at 13:15
  • @Yamila But there aren't any errors? How big is the content you are downloading? There is nothing which can go wrong at the line `StringBuilder sb = new StringBuilder();`. Maybe it's still loading and stays in the while loop? Add a log statement inside the loop. – Lukasz Oct 25 '17 at 14:21
  • Yes! It's stays in the loop. I edit my question with the server response using curl. It returns a response every 5 seconds – Yamila Oct 25 '17 at 14:30
  • Ok. I changed the condition in the while loop refering [this](https://stackoverflow.com/questions/5987970/socket-bufferedreader-hangs-at-readline) it should work now. – Lukasz Oct 25 '17 at 14:50
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/157489/discussion-between-lukasz-and-yamila). – Lukasz Oct 25 '17 at 14:52
  • well, each line contains the data I need! And because it's an endless streaming it's ok to have an infinite while. the problem is how to stop it and call finally when the user closes the activity – Yamila Oct 26 '17 at 17:00
  • @Yamila stack overflow isn't suited to be a discussion forum. If the answer satisfies your original question you should, accept it and ask a new question regarding your new problem. – Lukasz Oct 26 '17 at 20:36