1

I am running into a problem that I do not really understand and hoped that somebody might be able to see what I have misunderstood.

The problem is quite straightforward: I have a global state (shared between several tasks) and want to have an infinite cycle over a vector in the global state. I will then zip that with an interval stream and hence get a regular emission of the next value in the stream.

If the vector in the state changes, the inifinite stream should just reload the vector and start reading from the new one instead, and discard the old array.

Here is the code that I've gotten this far, and the questions are at the end of the post.

use futures::stream::Stream;
use futures::{Async, Poll};
use std::iter::Cycle;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tokio::timer::Interval;

We define a global state that hold an array that can be updated. Whenever the array is updated, we will step the version and set the array.

struct State<T> {
    version: u32,
    array: Vec<T>,
}

impl<T> State<T> {
    fn new(array: Vec<T>) -> Self {
        Self {
            version: 0,
            array: Vec::new(),
        }
    }

    fn update(&mut self, array: Vec<T>) {
        self.version += 1;
        self.array = array;
    }
}

Now, we create an stream over the state. When initialized, it will read the array and version from the state and store it and then keep an instance of std::iter::Cycle internally that will cycle over the array.

struct StateStream<I> {
    state: Arc<Mutex<State<I::Item>>>,
    version: u32,
    iter: Cycle<I>,
}

impl<I> StateStream<I>
where
    I: Iterator,
{
    fn new(state: Arc<Mutex<State<I::Item>>>) -> Self {
        let (version, array) = {
            let locked_state = state.lock().unwrap();
            (locked_state.version, locked_state.array)
        };

        Self {
            state: state,
            version: version,
            iter: array.iter().cycle(),
        }
    }
}   

We now implement the stream for the StateStream. With each poll, it will check if the version of the state changed, and if it did, reload the array and version.

We will then take the next item from the iterator and return that.

impl<I> Stream for StateStream<I>
where
    I: Iterator + Clone,
{
    type Item = I::Item;
    type Error = ();

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        let locked_state = self.state.lock().unwrap();
        if locked_state.version > self.version {
            self.iter = locked_state.array.clone().iter().cycle();
            self.version = locked_state.version;
        }
        Ok(Async::Ready(self.iter.next()))
    }
}

The main program looks like this. I do not update the vector here, but that is not important for the case at hand.

fn main() {
    let state = Arc::new(Mutex::new(State::new(vec![2, 3, 5, 7, 11, 13])));
    let primes = StateStream::new(state)
        .take(20)
        .zip(
            Interval::new(Instant::now(), Duration::from_millis(500))
                .map_err(|err| println!("Error: {}", err)),
        )
        .for_each(|(number, instant)| {
            println!("fire; number={}, instant={:?}", number, instant);
            Ok(())
        });
    tokio::run(primes);
}

When compiling this, I get the following errors:

cargo run --example cycle_stream_shared
   Compiling tokio-testing v0.1.0 (/home/mats/crates/tokio-examples)
error[E0308]: mismatched types
  --> examples/cycle_stream_shared.rs:66:19
   |
66 |             iter: array.iter().cycle(),
   |                   ^^^^^^^^^^^^^^^^^^^^ expected type parameter, found struct `std::slice::Iter`
   |
   = note: expected type `std::iter::Cycle<I>`
              found type `std::iter::Cycle<std::slice::Iter<'_, <I as std::iter::Iterator>::Item>>`

error[E0308]: mismatched types
  --> examples/cycle_stream_shared.rs:81:25
   |
81 |             self.iter = locked_state.array.clone().iter().cycle();
   |                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected type parameter, found struct `std::slice::Iter`
   |
   = note: expected type `std::iter::Cycle<I>`
              found type `std::iter::Cycle<std::slice::Iter<'_, <I as std::iter::Iterator>::Item>>`

error: aborting due to 2 previous errors

For more information about this error, try `rustc --explain E0308`.
error: Could not compile `tokio-testing`.

To learn more, run the command again with --verbose.

Now, the error and the explanation says that the concrete type is not possible to derive, but in this case, I am using the generic struct Cycle<I> and expect I to be instantiated to std::slice::Iter<'_, I::Item>. Since std::slice::Iter has implemented Iterator and, the type have implemented all necessary traits to match.

Some answers to similar questions exist, but nothing that seems to match this case:

The other questions are mostly referring to these two, or variations of these.

Update: Changed it to be a generic type to demonstrate that it still does not work.

Mats Kindahl
  • 1,863
  • 14
  • 25

0 Answers0