1

In my Android app I need to use a Socket to send and receive arrays of bytes. For convenience sake I want to work with Observable connected to a Socket.

Looking on the internet I've found this code:

import rx.lang.scala.Observable

val s = Observable.using[Char,Socket](new Socket("10.0.2.2", 9002))(
  socket => Observable.from[Char](Source.fromInputStream(socket.getInputStream).toIterable),
  socket => Try(socket.close))
  .subscribeOn(rx.lang.scala.schedulers.IOScheduler.apply)

  val a = s.subscribe(println, println)

It works but outputs one character at a time, for example when sent a "hello there" string, the output is:

I/System.out: h
I/System.out: e
I/System.out: l
I/System.out: l
I/System.out: o
I/System.out:  
I/System.out: t
I/System.out: h
I/System.out: e
I/System.out: r
I/System.out: e

But I want to receive a buffered arrays of bytes in my subscription. How can I achieve that?

src091
  • 2,807
  • 7
  • 44
  • 74
  • 1
    When do you want to emit a `Bytes[]`? After `N` characters are received, after a `\n` or `\r\n` is received, or after the socket closes? (Or something else entirely?) – Sean Vieira Jan 07 '17 at 17:07
  • @SeanVieira I don't want it to emit too often (on each received byte), was thinking of some fixed size chunks which could be emitted once filled up. – src091 Jan 07 '17 at 17:48
  • @SeanVieira or perhaps a whole message sent could be emitted as a single byte array (I'm not sure if it's possible to do a websocket-style separation of whole messages when using a plain TCP socket, either way will work for me). – src091 Jan 07 '17 at 18:10

1 Answers1

1

As @SeanVieira already said, you first have to decide how to aggregate the stream elements, the characters. If you know the stream will be closed after each message, you can wait for the whole message to be received and then emit the sequence on onCompleted().

I think what you implemented so far is quite well, as it depends on the observer what and how it wants to process the characters.

You can then, depending on your needs, add a stream transformation, e. g.

  • a buffer, especially have a look at tumblingBuffer(boundary))
  • debounce with a buffer, as it is done in this SO answer

A solution using tumblingBuffer on your already created Observable could look like this (not tested):

 source.tumblingBuffer(source.filter(_ == '\n'))

where you buffer anything incoming from the source and emit the whole buffer once the boundary-observable source.filter(...) emits an element. Then you can transform the Sequence of characters to a string using mkString and subscribe to that Observable:

source.tumblingBuffer(source.filter(_ == '\n')).map(mkString(_))
Community
  • 1
  • 1
wtho
  • 500
  • 5
  • 14