56

I have two streams, Stream<A> and Stream<B>. I have a constructor for a type C that takes an A and a B. How do I merge the two Streams into a Stream<C>?

Günter Zöchbauer
  • 623,577
  • 216
  • 2,003
  • 1,567
Brett
  • 2,399
  • 1
  • 17
  • 23

6 Answers6

53
import 'dart:async' show Stream;
import 'package:async/async.dart' show StreamGroup;

main() async {
  var s1 = stream(10);
  var s2 = stream(20);
  var s3 = StreamGroup.merge([s1, s2]);
  await for(int val in s3) {
    print(val);
  }
}

Stream<int> stream(int min) async* {
  int i = min;
  while(i < min + 10) {
    yield i++;
  }
}

See also http://news.dartlang.org/2016/03/unboxing-packages-async-part-2.html

prints

10
20
11
21
12
22
13
23
14
24
15
25
16
26
17
27
18
28
19
29
Günter Zöchbauer
  • 623,577
  • 216
  • 2,003
  • 1,567
24

You can use StreamZip in package:async to combine two streams into one stream of pairs, then create the C objects from that.

import "package:async" show StreamZip;
...
Stream<C> createCs(Stream<A> as, Stream<B> bs) =>
  new StreamZip([as, bs]).map((ab) => new C(ab[0], ab[1]));
lrn
  • 64,680
  • 7
  • 105
  • 121
  • 22
    Note however, `StreamZip` only emits pairs. If As and Bs emit at different rates, C will wait until they've both emitted a value before emitting a merged pair. This may or may not be what people want/expect. – WSBT Oct 30 '19 at 18:23
  • I don't see any alternative, though. You can't create a new `C` object until you have both an A` and a `B` available. Unless you create a new `C` every time either an `A` or a `B` arrives, using the old value for the other one. That'll still give issues if you get two `A`s before the first `B`. – lrn Oct 30 '19 at 23:31
  • 1
    @Irn `combineLatest ` solves this issue by waiting until each stream emits at least one value: https://pub.dev/documentation/stream_transform/latest/stream_transform/CombineLatest/combineLatest.html – masterwok Jan 07 '22 at 22:52
22

If you need to react when either Stream<A> or Stream<B> emits an event and use the latest value from both streams, use combineLatest.

Stream<C> merge(Stream<A> streamA, Stream<B> streamB) {
  return streamA
    .combineLatest(streamB, (a, b) => new C(a, b));
}
cambunctious
  • 8,391
  • 5
  • 34
  • 53
  • 10
    this only emits events, when both streams emitted at least one event, so when only ever one stream emits events, you wont get notified. One could argue, that this is the natural interpretation, i thought, i would just get result of Stream A and null for Stream B for example, but this is not the case. – scrimau Sep 06 '19 at 09:05
  • 3
    @scrimau Good point. If that is an issue, you might want to use [startWith](https://pub.dev/documentation/stream_transform/latest/stream_transform/Concatenate/startWith.html) – cambunctious Jun 18 '20 at 21:28
20

For people that need to combine more than two streams of different types and get all latest values on each update of any stream.

import 'package:stream_transform/stream_transform.dart';

Stream<List> combineLatest(Iterable<Stream> streams) {
  final Stream<Object> first = streams.first.cast<Object>();
  final List<Stream<Object>> others = [...streams.skip(1)];
  return first.combineLatestAll(others);
}

The combined stream will produce:

streamA:  a----b------------------c--------d---|
streamB:  --1---------2-----------------|
streamC:  -------&----------%---|
combined: -------b1&--b2&---b2%---c2%------d2%-|

Why not StreamZip? Because StreamZip would produce:

streamA:  a----b------------------c--------d---|
streamB:  --1---------2-----------------|
streamC:  -------&----------%---|
combined: -------a1&-------b2%--|

Usage:

Stream<T> sA;
Stream<K> sB;
Stream<Y> sC;
combineLatest([sA, sB, sC]).map((data) {
  T resA = data[0];
  K resB = data[1];
  Y resC = data[2];
  return D(resA, resB, resC);
});
Alex.F
  • 5,648
  • 3
  • 39
  • 63
2

Using rxdart, you can use CombineLatestStream to achieve what you want. Note that the new stream doesn't return any value until all streams emitted at least one event:

combineLatest-marbles

You can create the combined stream using CombineLatestStream.list():

import 'package:rxdart/rxdart.dart';

Stream<A> s1 = Stream.fromIterable([A()]);
Stream<B> s2 = Stream.fromIterable([B()]);
Stream<dynamic> s3 = CombineLatestStream.list<dynamic>([s1, s2])
  ..listen(
    (value) {
      A a = value[0];
      B b = value[1];
    },
  );

Since Dart doesn't support union types, a downside of CombineLatestStream.list() is that events from streams of different types should be casted afterwards (due to List<dynamic>). Another approach is to use CombineLatestStream.combine2() (e.g. with a combiner that creates a Tuple2) to keep the types.

DurandA
  • 1,095
  • 1
  • 17
  • 35
1

To get combined two streams when the second takes a result from the first one use asyncExpand

  Stream<UserModel?> getCurrentUserModelStream() {
    return FirebaseAuth.instance.authStateChanges().asyncExpand<UserModel?>(
      (currentUser) {
        if (currentUser == null) {
          return Stream.value(null);
        }
        return FirebaseFirestore.instance
            .collection('users')
            .doc(currentUser.uid)
            .snapshots()
            .map((doc) {
          final userData = doc.data();
          if (userData == null) {
            return null;
          }
          return UserModel.fromJson(userData);
        });
      },
    );
  }
Adam Smaka
  • 5,977
  • 3
  • 50
  • 55