9

I want to do something like this:

def splitStream[F, A](stream: fs2.Stream[F, A], split: A => B): (Stream[F, A], Stream[F, B)) = 
  (stream, stream.map(split)

But this does not work as it "pulls" from the source twice - once each when I drain both stream and stream.map(split). How do I prevent this? Somehow switch to a "push" based model by maintaining my own internal buffer so I don't pull twice?

pathikrit
  • 32,469
  • 37
  • 142
  • 221

1 Answers1

8

Somehow switch to a "push" based model by maintaining my own internal buffer so I don't pull twice?

Yes. E.g., you can use a queue from fs2:

def splitStream[F[_], A](stream: Stream[F, A], split: A => B): F[(Stream[F, A], Stream[F, B])] = 
  for {
    q <- Queue.noneTerminated[F, A]
  } yield (stream.evalTap(a => q.enqueue1(Some(a)).onFinalize(q.enqueue1(None)), q.dequeue.map(split))

Of course, here the problem is that if a caller ignores either stream, the other one will deadlock and never emit anything. This is generally the issue you run into when trying to make a stream into several ones, and have a value guaranteed to appear in each substream irrespective of when it's subscribed to.

The solution I usually go for is to combine larger actions and use operators like broadcast or parJoin:

def splitAndRun[F[_]: Concurrent, A](
  base: Stream[F, A],
  runSeveralThings: List[Stream[F, A] => Stream[F, Unit]]
): F[Unit] =
  base.broadcastTo(run: _*).compile.drain

Here, you know how many consumers you are going to have, so there will not be an ignored stream in the first place.

Oleg Pyzhcov
  • 7,323
  • 1
  • 18
  • 30
  • Can we then somehow "duplicate" each `A` in `stream` and send half to `stream` and the other half to `steam.map(split)`? This would solve not needing a intermediary buffer? – pathikrit Dec 10 '19 at 20:11
  • 1
    @pathikrit the mental model is different - you can indeed "duplicate" things (`stream.map(a => (a, a))` is the dumbest example, but broadcast "duplicates" too), but you can not _send_ things into a stream. – Oleg Pyzhcov Dec 12 '19 at 13:24