Mick's answer is right. For scaling, server shoud outsource the caching.
But if you need to benchmark single server caching, check below code that uses only core modules of nodejs. Caching is important but only if network bandwidth is on the order of gigabytes per second and if hdd is too slow. Following code streams nearly 2GB/s from cache hit and still overlaps all cache misses as asynchronous loads on event/message queue.
const cache = require("./simplefastvideostreamcache.js").generateVideoCache;
const chunkSize = 1024*1024; // size (in bytes) of each video stream chunk
const numCachedChunks = 100; // total chunks cached (shared for all video files accessed)
const chunkExpireSeconds = 100; // when a chunk not accessed for 100 seconds, it is marked as removable
const perfCountObj={}; // just to see performance of cache (total hits and misses where each miss resolves into a hit later so hits = miss + cache hit)
setInterval(function(){console.log(perfCountObj);},1000);
const video = cache(chunkSize,numCachedChunks,chunkExpireSeconds, perfCountObj)
const http = require('http');
const options = {};
options.agent = new http.Agent({ keepAlive: true });
const server = http.createServer(options,async (req, res) => {
video.stream(req,res);
});
server.listen(8000, "0.0.0.0", () => {
console.log("Server running");
});
simplefastvideostreamcache.js:
const urlParse = require("url");
const fs = require("fs");
const path = require("path");
const Lru = require("./lrucache").Lru;
const stream = require('stream').Readable;
function generateVideoCache(chunkSize,numCachedChunks,chunkExpireSeconds, perfCountObj)
{
perfCountObj.videoCacheMiss=0;
perfCountObj.videoCacheHit=0;
let videoCache={chunkSize:chunkSize};
videoCache.cache= new Lru(numCachedChunks, function(key,callbackPrm){
perfCountObj.videoCacheMiss++;
let callback = callbackPrm;
let data=[];
let keyArr = key.split("##@@##");
let url2 = keyArr[0];
let startByte = parseInt(keyArr[1],10);
let stopByte = startByte+videoCache.chunkSize;
fs.stat(path.join(__dirname,url2),async function(err,stat){
if(err)
{
callback({data:[], maxSize:-1, startByte:-1, stopByte:-1});
return;
}
if(stopByte > stat.size)
{
stopByte = parseInt(stat.size,10);
}
if(startByte >= stopByte)
{
callback({data:[], maxSize:-1, startByte:-1, stopByte:-1});
return;
}
let readStream=fs.createReadStream(path.join(__dirname,url2),{start:startByte, end:stopByte});
readStream.on("readable",function(){
let dataChunk ="";
while(data.length<(stopByte-startByte))
{
let dataChunk = readStream.read((stopByte-startByte) - data.length);
if(dataChunk !== null)
{
data.push(dataChunk);
}
else
{
break;
}
}
});
readStream.on("error",function(err){
callback({data:[], maxSize:-1, startByte:-1, stopByte:-1});
return;
});
readStream.on("end",function(){
callback({data:Buffer.concat(data), maxSize:stat.size, startByte:startByte, stopByte:stopByte});
});
});
},chunkExpireSeconds*1000);
videoCache.get = function(filePath, offsetByte,callback){
filePath = decodeURI(urlParse.parse(filePath).pathname);
let rangeStart = offsetByte;
let rangeStop = videoCache.chunkSize;
if(rangeStart)
{
}
else
{
rangeStart=0;
}
if(rangeStop)
{
}
else
{
rangeStop = rangeStart + videoCache.chunkSize;
}
let dataVideo = [];
let cacheStart = rangeStart - (rangeStart%videoCache.chunkSize);
videoCache.cache.get(filePath+"##@@##"+cacheStart,function(video){
perfCountObj.videoCacheHit++;
if(video.startByte>=0)
{
let offs = rangeStart%videoCache.chunkSize;
let remain = videoCache.chunkSize - offs;
if(remain>video.maxSize)
remain = video.maxSize;
if(remain>video.data.length)
remain=video.data.length;
let vidChunk = video.data.slice(offs,offs+remain);
if(remain>vidChunk.length)
remain=vidChunk.length;
let result={ data:vidChunk, offs:rangeStart, remain:remain, maxSize:video.maxSize};
callback(result);
return;
}
else
{
callback(false);
return;
}
});
};
videoCache.stream = function(req,res){
let url2 = decodeURI(urlParse.parse(req.url).pathname);
let rangeStart = 0;
let rangeStop = videoCache.chunkSize;
if(req.headers.range)
{
let spRange = req.headers.range.split("=");
if(spRange.length>1)
{
let spRange2 = spRange[1].split("-");
if(spRange2.length>1)
{
rangeStart = parseInt(spRange2[0],10);
rangeStop = parseInt(spRange2[1],10);
}
else if(spRange2.length==1)
{
rangeStart = parseInt(spRange2[0],10);
rangeStop = rangeStart + videoCache.chunkSize;
}
}
}
if(rangeStart)
{
}
else
{
rangeStart=0;
}
if(rangeStop)
{
}
else
{
rangeStop = rangeStart + videoCache.chunkSize;
}
let dataVideo = [];
let cacheStart = rangeStart - (rangeStart%videoCache.chunkSize);
/* {data:[], maxSize:stat.size, startByte:-1, stopByte:-1} */
videoCache.cache.get(url2+"##@@##"+cacheStart,function(video){
if(video.startByte>=0)
{
let offs = rangeStart%videoCache.chunkSize;
let remain = videoCache.chunkSize - offs;
if(remain>video.maxSize)
remain = video.maxSize;
if(remain>video.data.length)
remain=video.data.length;
let vidChunk = video.data.slice(offs,offs+remain);
if(remain>vidChunk.length)
remain=vidChunk.length;
res.writeHead(206,{
"Content-Range": "bytes " + rangeStart + "-" + (rangeStart+remain-1) + "/" + video.maxSize,
"Accept-Ranges": "bytes",
"Content-Length": remain,
"Content-Type": ("video/"+(url2.indexOf(".mp4")!== -1 ? "mp4" : "ogg"))
});
perfCountObj.videoCacheHit++;
stream.from(vidChunk).pipe(res);
return;
}
else
{
res.writeHead(404);
perfCountObj.videoCacheHit++;
res.end("404: mp4/ogg video file not found.");
return;
}
});
}
return videoCache;
}
exports.generateVideoCache = generateVideoCache;
lrucache.js:
'use strict';
/*
cacheSize: number of elements in cache, constant, must be greater than or equal to number of asynchronous accessors / cache misses
callbackBackingStoreLoad: user-given cache-miss function to load data from datastore
elementLifeTimeMs: maximum miliseconds before an element is invalidated, only invalidated at next get() call with its key
*/
let Lru = function(cacheSize,callbackBackingStoreLoad,elementLifeTimeMs=1000){
const me = this;
const maxWait = elementLifeTimeMs;
const size = parseInt(cacheSize,10);
const mapping = {};
const mappingInFlightMiss = {};
const bufData = new Array(size);
const bufVisited = new Uint8Array(size);
const bufKey = new Array(size);
const bufTime = new Float64Array(size);
const bufLocked = new Uint8Array(size);
for(let i=0;i<size;i++)
{
let rnd = Math.random();
mapping[rnd] = i;
bufData[i]="";
bufVisited[i]=0;
bufKey[i]=rnd;
bufTime[i]=0;
bufLocked[i]=0;
}
let ctr = 0;
let ctrEvict = parseInt(cacheSize/2,10);
const loadData = callbackBackingStoreLoad;
let inFlightMissCtr = 0;
this.reload=function(){
for(let i=0;i<size;i++)
{
bufTime[i]=0;
}
};
this.get = function(keyPrm,callbackPrm){
const key = keyPrm;
const callback = callbackPrm;
// stop dead-lock when many async get calls are made
if(inFlightMissCtr>=size)
{
setTimeout(function(){
me.get(key,function(newData){
callback(newData);
});
},0);
return;
}
// delay the request towards end of the cache-miss completion
if(key in mappingInFlightMiss)
{
setTimeout(function(){
me.get(key,function(newData){
callback(newData);
});
},0);
return;
}
if(key in mapping)
{
let slot = mapping[key];
// RAM speed data
if((Date.now() - bufTime[slot]) > maxWait)
{
if(bufLocked[slot])
{
setTimeout(function(){
me.get(key,function(newData){
callback(newData);
});
},0);
}
else
{
delete mapping[key];
me.get(key,function(newData){
callback(newData);
});
}
}
else
{
bufVisited[slot]=1;
bufTime[slot] = Date.now();
callback(bufData[slot]);
}
}
else
{
// datastore loading + cache eviction
let ctrFound = -1;
while(ctrFound===-1)
{
// give slot a second chance before eviction
if(!bufLocked[ctr] && bufVisited[ctr])
{
bufVisited[ctr]=0;
}
ctr++;
if(ctr >= size)
{
ctr=0;
}
// eviction conditions
if(!bufLocked[ctrEvict] && !bufVisited[ctrEvict])
{
// evict
bufLocked[ctrEvict] = 1;
inFlightMissCtr++;
ctrFound = ctrEvict;
}
ctrEvict++;
if(ctrEvict >= size)
{
ctrEvict=0;
}
}
mappingInFlightMiss[key]=1;
let f = function(res){
delete mapping[bufKey[ctrFound]];
bufData[ctrFound]=res;
bufVisited[ctrFound]=0;
bufKey[ctrFound]=key;
bufTime[ctrFound]=Date.now();
bufLocked[ctrFound]=0;
mapping[key] = ctrFound;
callback(bufData[ctrFound]);
inFlightMissCtr--;
delete mappingInFlightMiss[key];
};
loadData(key,f);
}
};
};
exports.Lru = Lru;