4

I'm trying to construct an object that can manage a feed from a websocket but be able to switch between multiple feeds.

There is a Feed trait:

trait Feed {
    async fn start(&mut self);
    async fn stop(&mut self);
}

There are three structs that implement Feed: A, B, and C.

When start is called, it starts an infinite loop of listening for messages from a websocket and processing each one as it comes in.

I want to implement a FeedManager that maintains a single active feed but can receive commands to switch what feed source it is using.

enum FeedCommand {
    Start(String),
    Stop,
}

struct FeedManager {
    active_feed_handle: tokio::task::JoinHandle,
    controller: mpsc::Receiver<FeedCommand>,
}

impl FeedManager {
    async fn start(&self) {
        while let Some(command) = self.controller.recv().await {
            match command {
                FeedCommand::Start(feed_type) => {
                    // somehow tell the active feed to stop (need channel probably) or kill the task?

                    if feed_type == "A" {
                        // replace active feed task with a new tokio task for consuming feed A
                    } else if feed_type == "B" {
                        // replace active feed task with a new tokio task for consuming feed B
                    } else {
                        // replace active feed task with a new tokio task for consuming feed C
                    }
                }
            }
        }
    }
}

I'm struggling to understand how to manage all the of Tokio tasks properly. FeedManager's core loop is to listen forever for new commands that come in, but it needs to be able to spawn another long lived task without blocking on it (so it can listen for commands).

My first attempt was:

if feed_type == "A" {
    self.active_feed_handle = tokio::spawn(async {
        A::new().start().await;
    });

    self.active_feed_handle.await
}
  • the .await on the handle would cause the core loop to no longer accept commands, right?
  • can I omit that last .await and have the task still run?
  • do I need to clean up the currently active task some way?
Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
John Cantrell
  • 51
  • 1
  • 3
  • What do you want to happen to the messages from inactive feeds? Should they queue forever, or be dropped if nobody is currently listening, or something else? – GManNickG Mar 31 '21 at 21:46
  • When a feed is stopped it will close the websocket so it should just stop existing. – John Cantrell Mar 31 '21 at 21:49
  • It's hard to answer your question because it doesn't include a [MRE]. We can't tell what crates (and their versions), types, traits, fields, etc. are present in the code. It would make it easier for us to help you if you try to reproduce your error on the [Rust Playground](https://play.rust-lang.org) if possible, otherwise in a brand new Cargo project, then [edit] your question to include the additional info. There are [Rust-specific MRE tips](//stackoverflow.com/tags/rust/info) you can use to reduce your original code for posting here. Thanks! – Shepmaster Apr 01 '21 at 00:43
  • Notably, your code doesn't use valid Rust syntax — [`async fn` cannot be used in traits in today's Rust](https://stackoverflow.com/q/65921581/155423). – Shepmaster Apr 01 '21 at 00:43

2 Answers2

6

You can I spawn a long running Tokio task without blocking the parent task by spawning a task — that's a primary reason tasks exist. If you don't .await the task, then you won't wait for the task:

use std::time::Duration;
use tokio::{task, time}; // 1.3.0

#[tokio::main]
async fn main() {
    task::spawn(async {
        time::sleep(Duration::from_secs(100)).await;
        eprintln!(
            "You'll likely never see this printed \
            out because the parent task has exited \
            and so has the entire program"
        );
    });
}

See also:

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
0

One way to do this would be to use Tokio's join!() macro, which takes multiple futures and awaits on all of them. You could the create multiple futures and join!() them together to await on them collectively.

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
transistor
  • 1,480
  • 1
  • 9
  • 12