4

I have a sequential data source represented as simple Iterator (or Stream). The data is pretty big and don’t fit the memory. Also the source is traversable once and has heavy cost to fetch. This source is used in some heavy procedure (black-box) that takes Iterator (or Stream) as its argument to consume data linear. Ok, that’s simple. But what can I do if I have two different such consuming procedures?? As I told, I don’t want to suck the input data into collection like List. I can also do my task by re-read the source twice from its very begin but I don't like this because it isn't effective. If fact I need to “tee” (kind of clone) the Iterator (or Stream) to consume the single one twice by two parallel processes without caching it into memory collection. I suppose such approach should do back-pressure or rather throttling the sibling(s) if it consumes the source stream too fast. The effective solution should perhaps have some parallel-safe queue buffer. Does anyone know how to undertake such thing on Scala (or using any external stream libraries/frameworks)?

PS I found a 4 years old similar question: One upstream stream feeding multiple downstream streams The difference is that I ask how to perform it using standard Scala Iterators (or Streams) or better some existing library.

1 Answers1

0

You should check out fs2 streams. The example reads from a file and writes to another file incrementally using constant memory. Here is how you could modify their example to write to two files:

...

io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blockingEC, 4096)
  .through(text.utf8Decode)
  .through(text.lines)
  .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
  .map(line => fahrenheitToCelsius(line.toDouble).toString)
  .intersperse("\n")
  .through(text.utf8Encode)
  .observe(io.file.writeAll(Paths.get("testdata/celsius.txt"), blockingEC))
  .through(io.file.writeAll(Paths.get("testdata/celsius2.txt"), blockingEC))

...
codenoodle
  • 982
  • 6
  • 19
  • Nice, looks like I need! :-) Note, that I already have Iterator[MyStructure] . Can you give me example how to feed two functions taking Iterator or Stream as arguments - not two files (hope it isn't too complicated)? Is it possible to convert simple Scala Streams into fs2 streams and back? – Michael Shestero Dec 20 '18 at 19:58
  • There are a lot of ways to create an fs2 stream, many of which are outlined in the [guide](https://fs2.io/guide.html). You can always build one via [unfold](https://oss.sonatype.org/service/local/repositories/releases/archive/co/fs2/fs2-core_2.12/1.0.0/fs2-core_2.12-1.0.0-javadoc.jar/!/fs2/Stream.html) if all else fails. – codenoodle Dec 20 '18 at 21:20
  • Trying: import fs2._ import cats.effect.{IO, Sync} val st2 = Stream.fromIterator(it0) // creating fs2 srteam from my source Iterator[NginxRec] // Temporary consumers to try first (just print first 2 records): def sink1: Sink[IO, NginxRec] = _.take(2).evalMap(nr => IO.delay(println(nr.datetime))) def sink2: Sink[IO, NginxRec] = _.take(2).evalMap(nr => IO.delay(println(nr.datetime))) st2.observe(sink1).observe(sink2) // am I right here?? I'v got at Stream.fromIterator: Error: diverging implicit expansion for type cats.effect.Sync[F] ... – Michael Shestero Dec 21 '18 at 14:28
  • I found something similar to what I want here: https://fs2.io/concurrency-primitives.html#single-publisher--multiple-subscriber (part "Single Publisher / Multiple Subscriber"). But still I can hardly see how this can solve my task. – Michael Shestero Dec 21 '18 at 14:36
  • Can you post another question with a code example of what you're working with? – codenoodle Dec 21 '18 at 16:25
  • Hi everybody. I invent the bicycle and want to share my solution. But accidentally stackoverflow ban me for answers for "numerous low-quality answers in the past". I honestly don't remember I post any answers here before... Because "automatic bans never expire or time out", sorry my friends! :-( – Michael Shestero Dec 23 '18 at 20:17