I'm brand spakin' new to rxjs, and would like to use it to build a video downloader. The intention is to run it 24/7 and automatically record an occasional livestream for later watching. Here is what I have so far.
import { BehaviorSubject, from, defer, of } from "rxjs";
import { delay, mergeMap, repeat, tap } from "rxjs/operators";
const downloader = url => {
const defaultDelay = 1000;
const maxDelay = 10000;
const delayTime = new BehaviorSubject(defaultDelay);
/*
* Simulated download output.
*
* @return {String|Number} potentialOutput
* A {Number} 1 means "FAILURE, stream is offline."
* A {String} means "SUCCESS, video was downloaded."
* 1 is the most likely value returned
*
* greets https://stackoverflow.com/a/8877271/1004931
*/
function randomWithProbability() {
var potentialOutput = [1, 1, 1, 1, 1, "/tmp/video.mp4"];
var idx = Math.floor(Math.random() * potentialOutput.length);
return potentialOutput[idx];
}
/**
* Simulated download. Returns a promise which resolves after 1 second.
*/
const download = url => {
let downloadP = new Promise((resolve, reject) => {
setTimeout(() => {
resolve(randomWithProbability());
}, 1000);
});
return from(downloadP);
};
/**
* Conditionally adjust the delay inbetween download attempts.
* - If the video downloaded successfuly, reset the timer to it's default.
* (in case the stream went down by error, we want to record again ASAP.)
* - If the video stream was offline, increase the delay until our next download attempt.
* (we don't want to be rude and flood the server)
*/
const adjustTimer = (ytdlOutput) => {
if (typeof ytdlOutput === 'string') {
delayTime.next(defaultDelay); // video stream exited successfully, so reset in case the stream starts again
} else {
let adjustedTime = (delayTime.getValue() * 2 > maxDelay) ? maxDelay : delayTime.getValue() * 2;
delayTime.next(adjustedTime); // video stream exited abnormally, likely due to being offline. wait longer until next attempt
}
};
/**
* The Observable.
* 1. Start with the URL of the video stream
* 2. delay by the time defined in delayTime
* 3. download, merging the download observable with the parent observable.
* 4. adjust the delayTime based on download output.
* 5. repeat the process indefinitely.
*/
const stream = of(url)
.pipe(
delay(delayTime.getValue()),
mergeMap(download),
tap(res => {
adjustTimer(res);
}),
repeat()
)
stream.subscribe(val => {
console.log(
`download result:${val}, delayTime:${delayTime.getValue()}`
);
});
};
downloader("https://example.com/files/video.mp4");
The problem I'm having is that the {BehaviorSubject} delayTime is not getting updated on every iteration of my loop. delayTime is getting updated, as indicated by delayTime.getValue() being called in the subscriber's callback, but the changes aren't having an effect in the memory(?) of the observable/subscriber(?).
Instead, I'm seeing that delayTime in the scope(?) of the observable is staying the same, as it was when it was first subscribed to. In the observable's world, there is no update to the BehaviorSubject's value, as I want there to be.
And this is where I'm stuck. How can I refactor my code to have a delay timer which changes over time, and effects the delay until the next download attempt?