136

    class MyPage extends StatelessWidget {
      @override
      Widget build(BuildContext context) {
        return DefaultTabController(
          length: 2,
          child: new Scaffold(
            appBar: TabBar(
              tabs: [
                Tab(child: Text("MY INFORMATION",style: TextStyle(color: Colors.black54),)),
                Tab(child: Text("WEB CALENDER",style: TextStyle(color: Colors.black54),)),
              ],
            ),
            body:PersonalInformationBlocProvider(
              movieBloc: PersonalInformationBloc(),
              child: TabBarView(
                children: [
                  MyInformation(),
                  new SmallCalendarExample(),
                ],
              ),
            ),
          ),
        );
      }
    }
    
    class MyInformation extends StatelessWidget{
      // TODO: implement build
      var deviceSize;
    
      //Column1
      Widget profileColumn(PersonalInformation snapshot) => Container(
        height: deviceSize.height * 0.24,
        child: Column(
          mainAxisAlignment: MainAxisAlignment.center,
          children: <Widget>[
            Row(
              mainAxisAlignment: MainAxisAlignment.spaceEvenly,
              children: <Widget>[
                Container(
                  decoration: BoxDecoration(
                    borderRadius:
                    new BorderRadius.all(new Radius.circular(50.0)),
                    border: new Border.all(
                      color: Colors.black,
                      width: 4.0,
                    ),
                  ),
                  child: CircleAvatar(
                    backgroundImage: NetworkImage(
                        "http://www.binaythapa.com.np/img/me.jpg"),
                    foregroundColor: Colors.white,
                    backgroundColor: Colors.white,
                    radius: 40.0,
                  ),
                ),
                ProfileTile(
                  title: snapshot.firstName,
                  subtitle: "Developer",
                ),
                SizedBox(
                  height: 10.0,
                ),
              ],
            )
          ],
        ),
      );
      Widget bodyData(PersonalInformation snapshot) {
        return SingleChildScrollView(
            child: Column(
              children: <Widget>[
                profileColumn(snapshot)
              ],
            ),
        );
      }
    
    
      @override
      Widget build(BuildContext context) {
        final personalInformationBloc = PersonalInformationBlocProvider.of(context);
    
        deviceSize = MediaQuery.of(context).size;
        return StreamBuilder(
            stream: personalInformationBloc.results,
            builder: (context,snapshot){
              if (!snapshot.hasData)
                return Center(
                  child: CircularProgressIndicator(),
                );
              return bodyData(snapshot.data);
            }
        );
      }
    }
   

I am using Bloc Pattern for retrieving data from Rest API (just called the whole object from JSON and parsed user name only). The Page consists of two tabs MyInformation and SmallCalendar. When the app runs the data are fetched correctly and everything is good. When I go to tab two and return to tab one then the whole screens in tab one goes to red showing error: Bad state: Stream has already been listened to.

Csaba Toth
  • 10,021
  • 5
  • 75
  • 121
BINAY THAPA MAGAR
  • 4,017
  • 4
  • 16
  • 24

20 Answers20

146

You should use the following.

StreamController<...> _controller = StreamController<...>.broadcast();
hrsma2i
  • 4,045
  • 6
  • 15
  • 24
  • 7
    `broadcast` doesn't emit old events. `BehaviorSubject` from `rxdart` does. See this: https://stackoverflow.com/a/55893532/1321917 – Andrey Gordeev Oct 05 '19 at 05:08
75

The most common form of Stream can be listened only once at a time. If you try to add multiple listeners, it will throw

Bad state: Stream has already been listened to

To prevent this error, expose a broadcast Stream. You can convert your stream to a broadcast using myStream.asBroadcastStream

This needs to be done inside your class that expose Stream. Not as parameter of StreamBuilder. Since asBroadcastStream internally listen to the original stream to generate the broadcast one, this imply you can't call this method twice on the same stream.

Rémi Rousselet
  • 256,336
  • 79
  • 519
  • 432
  • 1
    Thank you Rémi Rousselet for your response. I exposed the broadcast stream by simply calling myStream.asBroadcastStream. But i am getting snapshot.hasData as null and the progressbar keeps moving as the streambuilder might not able to fetch the data. – BINAY THAPA MAGAR Jul 18 '18 at 08:53
  • 21
    Rémi, could you elaborate a little more on the difference between `StreamControler.broadcast()` and `Stream.asBroadcastStream`? – Michel Feinstein Aug 06 '19 at 19:51
  • 1
    I get connectionstate.waiting if I am doing a broadcast – anoop4real Oct 17 '20 at 19:57
  • @anoop4real, broadcast stream doesn't emit past events so that your listener is hanging on. – Tran Chien Jun 20 '23 at 03:51
49

You could use broadcast, which allows to listen stream more than once, but it also prevents from listening past events:

Broadcast streams do not buffer events when there is no listener.

A better option is to use BehaviorSubject from rxdart package class as StreamController. BehaviorSubject is:

A special StreamController that captures the latest item that has been added to the controller, and emits that as the first item to any new listener.

The usage is as simple as:

StreamController<...> _controller = BehaviorSubject();
Andrey Gordeev
  • 30,606
  • 13
  • 135
  • 162
31

In my case, I was getting this error because the same line of code myStream.listen() was being called twice in the same widget on the same stream. Apparently this is not allowed!

UPDATE: If you intend to subscribe to the same stream more than once, you should use a behavior subject instead:


// 1- Create a behavior subject
final _myController = BehaviorSubject<String>();

// 2- To emit/broadcast new events, we will use Sink of the behavior subject.
Sink<String> get mySteamInputSink => _myController.sink;

// 3- To listen/subscribe to those emitted events, we will use Stream (observable) of the behavior subject. 
Stream<String> get myStream => _myController.stream;

// 4- Firstly, Listen/subscribe to stream events.
myStream.listen((latestEvent) {
   // use latestEvent data here.
});

// 5- Emit new events by adding them to the BehaviorSubject's Sink. 
myStreamInputSink.add('new event');

That's it!

However, there is one final important step.

6- We must unsubscribe from all stream listeners before a widget is destroyed.

Why? (You might ask)

Because if a widget subscribes to a stream, and when this widget is destroyed, the destroyed widget stream subscription will remain in app memory causing memory leaks and unpredictable behavior.:

_flush() {
  _myController.close();
  _myController = StreamController<String>();
}

############################### ###############################

Old Answer:

What fixed it for me is to both create a my stream controller as a broadcast stream controller:

var myStreamController = StreamController<bool>.broadcast();

AND

use stream as a broadcast stream:

myStreamController.stream.asBroadcastStream().listen(onData);
Samer
  • 3,848
  • 4
  • 25
  • 24
19

The problem was due to not disposing the controllers in bloc.

  void dispose() {
    monthChangedController.close();
    dayPressedController.close();
    resultController.close();
  }
BINAY THAPA MAGAR
  • 4,017
  • 4
  • 16
  • 24
4

Just to sum up:

The main difference is broadcast() creates a Stream listenable for multiple sources but it needs to be listened for at least one source to start emitting items.

A Stream should be inert until a subscriber starts listening on it (using the [onListen] callback to start producing events).

asBroadcastStream turns an existing Stream into a multi listenable one but it doesn't need to be listened to start emitting since it calls onListen() under the hood.

haroldolivieri
  • 2,173
  • 18
  • 29
3

I have had the same issue when I used a result of Observable.combineLatest2 for StreamBuilder into Drawer:

flutter: Bad state: Stream has already been listened to.

As for me, the best solution has added the result of this combine to new BehaviorSubject and listen new one.

Don't forget to listen old one !!!

class VisitsBloc extends Object {
    Map<Visit, Location> visitAndLocation;

    VisitsBloc() {
        visitAndLocations.listen((data) {
            visitAndLocation = data;
        });
    }

    final _newOne = new BehaviorSubject<Map<Visit, Location>>();

    Stream<Map<Visit, Location>> get visitAndLocations => Observable.combineLatest2(_visits.stream, _locations.stream, (List<vis.Visit> visits, Map<int, Location> locations) {
        Map<vis.Visit, Location> result = {};

        visits.forEach((visit) {
            if (locations.containsKey(visit.skuLocationId)) {
                result[visit] = locations[visit.skuLocationId];
            }
        });

        if (result.isNotEmpty) {
            _newOne.add(result);
        }
    });
}

I didn't use .broadcast because it slowed my UI.

Serhii
  • 121
  • 1
  • 5
3

I think not all of the answers take into account the situation where you do not want or simply can't use broadcast stream.

More often than not, you have to rely on receiving past events because the listener might be created later than the stream it listens to and it's important to receive such information.

In Flutter what will often happen is that widget listening to the stream ("listener") gets destroyed and built again. If you attempt to attach listener to the same stream as before, you will get this error.

To overcome this, you will have to manage your streams manually. I created this gist demonstrating how that can be done. You can also run this code on this dartpad to see how it behaves and play with it. I have used simple String ids to refer to specific StreamController instances but there might be better solutions too (perhaps symbols).

The code from the gist is:

/* NOTE: This approach demonstrates how to recreate streams when
         your listeners are being recreated.
         It is useful when you cannot or do not want to use broadcast
         streams. Downside to broadcast streams is that it is not
         guaranteed that your listener will receive values emitted
         by the stream before it was registered.
*/

import 'dart:async';
import 'dart:math';

// [StreamService] manages state of your streams. Each listener
// must have id which is used in [_streamControllers] map to
// look up relevant stream controller.
class StreamService {
  final Map<String, StreamController<int>?> _streamControllers = {};

  Stream<int> getNamedStream(String id) {
    final controller = _getController(id);
    return controller.stream;
  }

  // Will get existing stream controller by [id] or create a new
  // one if it does not exist
  StreamController<int> _getController(String id) {
    final controller = _streamControllers[id] ?? _createController();

    _streamControllers[id] = controller;

    return controller;
  }

  void push(String id) {
    final controller = _getController(id);

    final rand = Random();
    final value = rand.nextInt(1000);

    controller.add(value);
  }

  // This method can be called by listener so
  // memory leaks are avoided. This is a cleanup
  // method that will make sure the stream controller
  // is removed safely
  void disposeController(String id) {
    final controller = _streamControllers[id];

    if (controller == null) {
      throw Exception('Controller $id is not registered.');
    }

    controller.close();
    _streamControllers.remove(id);
    print('Removed controller $id');
  }

  // This method should be called when you want to remove
  // all controllers. It should be called before the instance
  // of this class is garbage collected / removed from memory.
  void dispose() {
    _streamControllers.forEach((id, controller) {
      controller?.close();
      print('Removed controller $id during dispose phase');
    });
    _streamControllers.clear();
  }

  StreamController<int> _createController() {
    return StreamController<int>();
  }
}

class ManagedListener {
  ManagedListener({
    required this.id,
    required StreamService streamService,
  }) {
    _streamService = streamService;
  }

  final String id;
  late StreamService _streamService;
  StreamSubscription<int>? _subscription;

  void register() {
    _subscription = _streamService.getNamedStream(id).listen(_handleStreamChange);
  }

  void dispose() {
    _subscription?.cancel();
    _streamService.disposeController(id);
  }

  void _handleStreamChange(int n) {
    print('[$id]: streamed $n');
  }
}

void main(List<String> arguments) async {
  final streamService = StreamService();

  final listener1Id = 'id_1';
  final listener2Id = 'id_2';

  final listener1 = ManagedListener(id: listener1Id, streamService: streamService);
  listener1.register();
  
  streamService.push(listener1Id);
  streamService.push(listener1Id);
  streamService.push(listener1Id);

  await Future.delayed(const Duration(seconds: 1));

  final listener2 = ManagedListener(id: listener2Id, streamService: streamService);
  listener2.register();

  streamService.push(listener2Id);
  streamService.push(listener2Id);

  await Future.delayed(const Duration(seconds: 1));
  
  listener1.dispose();
  listener2.dispose();

  streamService.dispose();
}
user3056783
  • 2,265
  • 1
  • 29
  • 55
2

Call .broadcast() on your stream controller

example:

StreamController<T> sampleController =
      StreamController<T>.broadcast();
Folorunso
  • 135
  • 1
  • 1
  • 12
1

For those of you running into this while doing Future.asStream(), you'll need Future.asStream().shareReplay(maxSize: 1) to make it a broadcast/hot stream.

E. Sun
  • 1,093
  • 9
  • 15
1

For me defining my stream as a global variable worked

Stream infostream (was inside the ...State in a stateful widget i defined it outside the widget and it worked

(not sure if the best solution but give it a try)

Sanskar Tiwari
  • 139
  • 3
  • 10
1

I experienced this because, i was using a stream builder to create a list for tabs of tabview and anytime i switch tabs and come back to the previous i get this error. "wrapping the stream builder with a builder widget" did the magic for me.

0

StreamSplitter.split() from the async can be used for this use case

import 'package:async/async.dart';
...

main() {
  var process = Process.start(...);
  var stdout = StreamSplitter<List<int>>(process.stdout);
  readStdoutFoo(stdout.split());
  readStdoutBar(stdout.split());
}

readStdoutFoo(Stream<List<int>> stdout) {
  stdout.transform(utf8.decoder)...
}

readStdoutBar(Stream<List<int>> stdout) {
  stdout.transform(utf8.decoder)...
}
Günter Zöchbauer
  • 623,577
  • 216
  • 2,003
  • 1,567
0

In my case I was Using the Package Connectivity while on flutter web. Commenting all Connectivity calls solved the issue.

I'm now just using Connectivity while only on Android/iOS.

So maybe check your Packages im you are using some packages that have some issues on Web in case you are developing for web.

Hopefully I could help someone with this Information.

nani
  • 183
  • 2
  • 15
0

This is a problem for the provider, I solved it by change provider initialization

Eg

locator.registerSingleton<LoginProvider>(LoginProvider());

TO

 locator.registerFactory(() => TaskProvider());

Where locator is

GetIt locator = GetIt.instance;
abhijith k
  • 369
  • 4
  • 4
0

This could help any other person. In my case i was using two StreamBuilder one in each tab. So when i swipe to above the tab and back. The other stream was already listened so i get the error.

What i did was to remove the StreamBuilder from the tabs and put it on top. I setState each time there is a change. I return an empty Text('') to avoid showing anything. I hope this methods

b.john
  • 773
  • 1
  • 6
  • 9
0

For other case scenarios. Watch out if you are somehow using a stream inside a stateless class. This is one of the reasons you get the above error. Convert the stateless class to stateful and call init and dispose method on the streamController:

 @override
 void initState() {
   super.initState();
   YourStreamController.init();
 }

 @override
 void dispose() {
   YourStreamController.dispose();
   super.dispose();
 }
0

make sure you dispose controllers!

@override
  void dispose() {
    scrollController.dispose();
    super.dispose();
  }
tensor
  • 733
  • 1
  • 13
  • 22
0

I was getting this error when navigating away and then back to the view listening to the stream because I was pushing a new instance of the same view into the Navigator stack, which effectively ended up creating a new listener even though it was the same place in code.

Specifically and in more detail, I had a ListItemsView widget which uses StreamBuilder to show all the items in a stream. User taps on the "Add Item" button which pushes the AddItemView in the Navigator stack, and after submitting the form, the user is brought back to the ListItemsView, where the "Bad state: Stream has already been listened to." error happens.

For me the fix was to replace Navigator.pushNamed(context, ListItemsView.routeName) with Navigator.pop(context). This effectively prevents the instantiation of a new ListItemsView (as the second subscriber to the same stream), and just takes the user back to the previous ListItemsView instance.

tonyslowdown
  • 14,069
  • 2
  • 25
  • 20
0

i have experienced this, always closing the streamcontroller worked for me.

  • I have var request = http.Request('GET', Uri.parse('https://reqres.in/api/users?page=2')); http.StreamedResponse response = await request.send(); now how would i close my stream? – Saad Mansoor Nov 28 '22 at 10:53