I have two streams, Stream<A>
and Stream<B>
. I have a constructor for a type C
that takes an A
and a B
. How do I merge the two Stream
s into a Stream<C>
?

- 623,577
- 216
- 2,003
- 1,567

- 2,399
- 1
- 17
- 23
6 Answers
import 'dart:async' show Stream;
import 'package:async/async.dart' show StreamGroup;
main() async {
var s1 = stream(10);
var s2 = stream(20);
var s3 = StreamGroup.merge([s1, s2]);
await for(int val in s3) {
print(val);
}
}
Stream<int> stream(int min) async* {
int i = min;
while(i < min + 10) {
yield i++;
}
}
See also http://news.dartlang.org/2016/03/unboxing-packages-async-part-2.html
prints
10
20
11
21
12
22
13
23
14
24
15
25
16
26
17
27
18
28
19
29

- 623,577
- 216
- 2,003
- 1,567
-
I think this has gotten me closer to an answer, but it isn't the answer. The thing is, the n input Streams will present in a random order. I want to hold off generating any result through the output Stream until I have seen at least one entry from each input Stream, and there after a new result through the output Stream each time one of the input Streams fire. Think of this as a reactive stream fusion, if you like. – Brett Apr 12 '16 at 13:01
-
Looks like `Combine.all(List
)` https://pub.dartlang.org/packages/stream_transformers#combine does what you want. – Günter Zöchbauer Apr 12 '16 at 13:10 -
3I have not have a chance to look at that library, but one thing I would be cautious about is single subscription streams and their ability to buffer events. If many events are delivered to one stream while the other is starved, the first will have to buffer while you wait for events on the second. – Argenti Apparatus Apr 12 '16 at 16:48
-
1Or they pause the subscription. If the `StreamController` is set up correctly it shouldn't produce events while paused (might not be possible, depending on the use case). – Günter Zöchbauer Apr 12 '16 at 16:51
-
1Presumably if one needs to do this kind of processing one can get some guarantees about the arrival of events on the A and B streams; and define the conditions under which an error occurs. – Argenti Apparatus Apr 12 '16 at 19:37
-
@Brett, does lrn's solution not work for you? It looks like it does what you want based on your first comment on this answer. – Argenti Apparatus Apr 12 '16 at 22:46
-
1@ArgentiApparatus By the looks of things, @lrn's utilisation of StreamZipper locks me into having to have the number of changes on `Stream` and `Stream` be exactly equal and matching in timing. The source of data for the streams are different points in a Firebase database, and thus the updates are at random points in time, and most definitely not equal in number. So, no, not really. I'll accept it anyway, because it answers the question as asked. – Brett Apr 14 '16 at 04:17
-
How should it work then? Exactly one event of each stream but the streams don't provide the same number of events, doesn't sound like compatible requirements. – Günter Zöchbauer Apr 14 '16 at 04:21
-
3@Brett StreamZip does not require the events on A and B to arrive at the same time. When it receives an event from one stream it waits for an event from the other. I'm not sure what StreamZip does when one stream closes but I assume it quits listening to other streams. However, If your A and B events are not really 'paired' I don't think the zipping streams approach is going to work for you. – Argenti Apparatus Apr 14 '16 at 11:21
-
*Note:* `StreamGroup.merge()` Merges into a single single-subscription stream, if you need a broadcast use: `StreamGroup.mergeBroadcast()` – Kirill Karmazin Apr 09 '21 at 13:10
You can use StreamZip
in package:async
to combine two streams into one stream of pairs, then create the C
objects from that.
import "package:async" show StreamZip;
...
Stream<C> createCs(Stream<A> as, Stream<B> bs) =>
new StreamZip([as, bs]).map((ab) => new C(ab[0], ab[1]));

- 64,680
- 7
- 105
- 121
-
22Note however, `StreamZip` only emits pairs. If As and Bs emit at different rates, C will wait until they've both emitted a value before emitting a merged pair. This may or may not be what people want/expect. – WSBT Oct 30 '19 at 18:23
-
I don't see any alternative, though. You can't create a new `C` object until you have both an A` and a `B` available. Unless you create a new `C` every time either an `A` or a `B` arrives, using the old value for the other one. That'll still give issues if you get two `A`s before the first `B`. – lrn Oct 30 '19 at 23:31
-
1@Irn `combineLatest ` solves this issue by waiting until each stream emits at least one value: https://pub.dev/documentation/stream_transform/latest/stream_transform/CombineLatest/combineLatest.html – masterwok Jan 07 '22 at 22:52
If you need to react when either Stream<A>
or Stream<B>
emits an event and use the latest value from both streams, use combineLatest
.
Stream<C> merge(Stream<A> streamA, Stream<B> streamB) {
return streamA
.combineLatest(streamB, (a, b) => new C(a, b));
}

- 8,391
- 5
- 34
- 53
-
10this only emits events, when both streams emitted at least one event, so when only ever one stream emits events, you wont get notified. One could argue, that this is the natural interpretation, i thought, i would just get result of Stream A and null for Stream B for example, but this is not the case. – scrimau Sep 06 '19 at 09:05
-
3@scrimau Good point. If that is an issue, you might want to use [startWith](https://pub.dev/documentation/stream_transform/latest/stream_transform/Concatenate/startWith.html) – cambunctious Jun 18 '20 at 21:28
For people that need to combine more than two streams of different types and get all latest values on each update of any stream.
import 'package:stream_transform/stream_transform.dart';
Stream<List> combineLatest(Iterable<Stream> streams) {
final Stream<Object> first = streams.first.cast<Object>();
final List<Stream<Object>> others = [...streams.skip(1)];
return first.combineLatestAll(others);
}
The combined stream will produce:
streamA: a----b------------------c--------d---|
streamB: --1---------2-----------------|
streamC: -------&----------%---|
combined: -------b1&--b2&---b2%---c2%------d2%-|
Why not StreamZip
? Because StreamZip
would produce:
streamA: a----b------------------c--------d---|
streamB: --1---------2-----------------|
streamC: -------&----------%---|
combined: -------a1&-------b2%--|
Usage:
Stream<T> sA;
Stream<K> sB;
Stream<Y> sC;
combineLatest([sA, sB, sC]).map((data) {
T resA = data[0];
K resB = data[1];
Y resC = data[2];
return D(resA, resB, resC);
});

- 5,648
- 3
- 39
- 63
Using rxdart
, you can use CombineLatestStream
to achieve what you want. Note that the new stream doesn't return any value until all streams emitted at least one event:
You can create the combined stream using CombineLatestStream.list()
:
import 'package:rxdart/rxdart.dart';
Stream<A> s1 = Stream.fromIterable([A()]);
Stream<B> s2 = Stream.fromIterable([B()]);
Stream<dynamic> s3 = CombineLatestStream.list<dynamic>([s1, s2])
..listen(
(value) {
A a = value[0];
B b = value[1];
},
);
Since Dart doesn't support union types, a downside of CombineLatestStream.list()
is that events from streams of different types should be casted afterwards (due to List<dynamic>
). Another approach is to use CombineLatestStream.combine2()
(e.g. with a combiner
that creates a Tuple2
) to keep the types.

- 1,095
- 1
- 17
- 35
To get combined two streams when the second takes a result from the first one use asyncExpand
Stream<UserModel?> getCurrentUserModelStream() {
return FirebaseAuth.instance.authStateChanges().asyncExpand<UserModel?>(
(currentUser) {
if (currentUser == null) {
return Stream.value(null);
}
return FirebaseFirestore.instance
.collection('users')
.doc(currentUser.uid)
.snapshots()
.map((doc) {
final userData = doc.data();
if (userData == null) {
return null;
}
return UserModel.fromJson(userData);
});
},
);
}

- 5,977
- 3
- 50
- 55