3

I have a problem on how to properly make a inifite stream of data to stop beeing added to a subject. Like the example below:

void main() async {
  var infinite = Stream.periodic(0.5.seconds, (it) => it);

  var subject = BehaviorSubject<int>();
  subject.addStream(infinite);

  subject.listen(print);
  await Future.delayed(Duration(milliseconds: 1000));
  await subject.close(); //It will not allow me to close the subject
}

The infite is a , well, infinite source of data. It will never emits a close event. So, how to properly close the subject since it will never finish to receive data?

Edit: the solution proposed on Bad state: You cannot close the subject while items are being added from addStream in flutter doesn't work simply because drain never returns.

Daniel Oliveira
  • 8,053
  • 11
  • 40
  • 76

1 Answers1

4

Don't use addStream.

You need a way to stop receiving events, and the addStream method does not provide any way to stop before the stream is done, which is never.

If you know ahead of time how many elements you want, you can do subject.addStream(infinite.take(1000)) to only take the first 1000 events.

If not, perhaps it's like your example where you base it on time, then I'd go with something like:

var subscription = infinite.listen(subject.add);
subject.listen(print);
await Future.delayed(Duration(milliseconds: 100));
await subscription.cancel();
subject.close();

The StreamSubscription returned by Stream.listen gives you a way to stop listening again. Or to pause and later resume the events, if that's all you need.

If something else tells you when to stop, like the user clicking a button somewhere else, you either need to keep the subscription accessible so that you can call cancel on it at that later point from the code which knows that it's time to stop, or do so when the subject is cancelled:

var subscription = infinite.listen(subject.add);
subject.onCancel = subscription.cancel;
subject.listen(print);
lrn
  • 64,680
  • 7
  • 105
  • 121