in order to understand how streams work I was trying to implement an infinite number generator that uses random.org. The first thing I did, was implementing a version where I would call an async function called get_number and it would fill a buffer and return the next possible number:
struct RandomGenerator {
buffer: Vec<u8>,
position: usize,
}
impl RandomGenerator {
pub fn new() -> RandomGenerator {
Self {
buffer: Vec::new(),
position: 0,
}
}
pub async fn get_number(&mut self) -> u8 {
self.fill_buffer().await;
let value = self.buffer[self.position];
self.position += 1;
value
}
async fn fill_buffer(&mut self) {
if self.buffer.is_empty() || self.is_buffer_depleted() {
let new_numbers = self.fetch_numbers().await;
drop(replace(&mut self.buffer, new_numbers));
self.position = 0;
}
}
fn is_buffer_depleted(&self) -> bool {
self.buffer.len() >= self.position
}
async fn fetch_numbers(&mut self) -> Vec<u8> {
let response = reqwest::get("https://www.random.org/integers/?num=10&min=1&max=100&col=1&base=10&format=plain&rnd=new").await.unwrap();
let numbers = response.text().await.unwrap();
numbers
.lines()
.map(|line| line.trim().parse::<u8>().unwrap())
.collect()
}
}
with this implementation, I can call the function get_number
on a loop and get as many numbers I want but the idea was to have iterators so I can call a bunch of composition functions like take
, take_while
, and others.
But the moment I try to implement a Stream, the problems start to rise: My first try was to have a struct that would hold a reference to the generator
struct RandomGeneratorStream<'a> {
generator: &'a mut RandomGenerator,
}
and then I've implemented the following Stream
impl<'a> Stream for RandomGeneratorStream<'a> {
type Item = u8;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let f = self.get_mut().generator.get_number();
pin_mut!(f);
f.poll_unpin(cx).map(Some)
}
}
but calling this would just hang the process
generator.into_stream().take(18).collect::<Vec<u8>>().await
On the next tries, I tried to hold a state of the future on the stream struct using pin_mut! but ended up having many errors with lifetimes without being able to solve them. What can be done in that case? Here is a working code without the streams:
use std::mem::replace;
struct RandomGenerator {
buffer: Vec<u8>,
position: usize,
}
impl RandomGenerator {
pub fn new() -> RandomGenerator {
Self {
buffer: Vec::new(),
position: 0,
}
}
pub async fn get_number(&mut self) -> u8 {
self.fill_buffer().await;
let value = self.buffer[self.position];
self.position += 1;
value
}
async fn fill_buffer(&mut self) {
if self.buffer.is_empty() || self.is_buffer_depleted() {
let new_numbers = self.fetch_numbers().await;
drop(replace(&mut self.buffer, new_numbers));
self.position = 0;
}
}
fn is_buffer_depleted(&self) -> bool {
self.buffer.len() >= self.position
}
async fn fetch_numbers(&mut self) -> Vec<u8> {
let response = reqwest::get("https://www.random.org/integers/?num=10&min=1&max=100&col=1&base=10&format=plain&rnd=new").await.unwrap();
let numbers = response.text().await.unwrap();
numbers
.lines()
.map(|line| line.trim().parse::<u8>().unwrap())
.collect()
}
}
#[tokio::main]
async fn main() {
let mut generator = RandomGenerator::new();
dbg!(generator.get_number().await);
}
Here you can find a link to the first working sample (instead of calling random.org I've used a Cursor because dns resolution was not working on the playground) https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=730eaf1f7db842877d3f3e7ca1c6d2a5
And my last try with streams you can find here https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=de0b212ee70865f6ac6c19430cd952cd