1

I am trying to implement a reservoir sampling algorithm using java. I have N streams of data ( readings from sensors arriving at a sink node ) of unknown size. For the sake of simplicity lets assume i have one stream of unknown size.

So what one of the reservoir sampling algorithms suggests is to create a reservoir of size reservoirSize. Lets say it is 5. The first five readings you get, store them in your reservoir. Ok. Now as you get more and more readings, for each reading generate a random number from 0 to reading number and if that random number is smaller than the reservoirSize then store the reading in the reservoir[randomNumber].

So lets say i have reservoirSize = 5 and i just got my 10th reading. I will generate a random number from 0 to 10 and if that number is smaller than 5 i wll store the reading there where the random number points. Lets say random number is 3 so i store reading number 10 in reservoir[3].

public void sample (Vector pool, double Measurement, int streamIndex) {

    if (streamIndex < ReservoirSize){
        pool.addElement(Double.toString(Measurement));
    }
    else if ((randomIndex=(int)ranNum.nextInt((streamIndex+1)))<ReservoirSize) {
        pool.setElementAt(Double.toString(Measurement), randomIndex);
    }    
}  

The problem with this code is that once the streamIndex gets big enough ( above 4.000 for example ) i rarely sample any readings. And it does make sense because the propability of generating a random number from 0 to 4000 that is smaller than 5 is significanly smaller than the propability to generate a random number from 0 to lets say 100, that is smaller than 5.

I also implemented AlgorthmR from Vitters paper and another way described here:
Gregable ReservoirSampling

but all implementations have the same problem. The larger the stream gets the smaller the sampling frequency becomes. So for a sampling rate of 0.5s, one hour after i start sampling (which means that about 7000 readings have been forwarded to the sink node ), a change in the measured quantity will not be detected for another good half an hour i.e the reading indicating the change will be discarded from the reservoir.

AlgorthmR implemantation

    public RSAlgorithmR() {
        this.currentPool = null;
        this.randomStoreatIndex = 0;
        this.randomIndex = 0;
        this.ranNum = new Random();
    }

    public void sample (LLNode cNode, double Measurement) {

    int streamIndex = cNode.getStreamIndex();
    int storeatIndex =cNode.getStoreatIndex();


    if (streamIndex < ReservoirSize) {
        cNode.data.addElement(Double.toString(Measurement));

        if (streamIndex == ( ReservoirSize - 1) ) {
            randomStoreatIndex = (int)ranNum.nextInt(ReservoirSize);
            cNode.setStoreatIndex((int)randomStoreatIndex);                
        }
    }
    else {
        if (storeatIndex == streamIndex) {
            randomIndex=(int)ranNum.nextInt(ReservoirSize);
            cNode.data.setElementAt(Double.toString(Measurement), randomIndex); 

            randomStoreatIndex = (int)ranNum.nextInt(streamIndex - ReservoirSize) + ReservoirSize;
            cNode.setStoreatIndex(randomStoreatIndex);

            System.out.println("Index:: "+streamIndex);
            System.out.println("randomIndex:: " + randomIndex);  
        }               
    }
    cNode.setStreamIndex(); 
};  

Gregable Implementation

    public ReservoirSampler() {

        this.currentPool = null;
        this.randomIndex = 0;
        this.ranProp = new Random();
        this.ranInd = new Random();
    }

    public void sample (LLNode currentSpot, double humidityRead, 
        double temperatureRead, int streamIndex) {

    double acceptancePropability = (double)ReservoirSize/streamIndex;

    if (streamIndex < ReservoirSize){
        currentSpot.humidityData.addElement(Double.toString(humidityRead));
        currentSpot.temperatureData.addElement(Double.toString(temperatureRead));
    }
    else {

        ranProp.setSeed(System.currentTimeMillis());
        randomPropability=(double)ranProp.nextDouble();


        if ( randomPropability < acceptancePropability){
            ranInd.setSeed(System.currentTimeMillis());
            randomIndex=(int)ranInd.nextInt((ReservoirSize));
            currentSpot.humidityData.setElementAt(Double.toString(humidityRead),randomIndex);
            currentSpot.temperatureData.setElementAt(Double.toString(temperatureRead),randomIndex);

        }
    }                 
}  

Is that the normal behaviour of the algorthm or am i missing something here? And if that is the normal behaviour is there a way to make it work more "accuratelly"?

hippietrail
  • 15,848
  • 18
  • 99
  • 158
Silas
  • 125
  • 2
  • 11
  • 2
    What is your problem? The idea behind reservoir sampling is that at every point in time, every observation from 0 to this has (had) the same chance of being in the sample reservoir. (If your application needs a running average, which I suspect, use a running average) – joop Jul 23 '15 at 11:47
  • My problem is that after a point i rarely sample data. For example i take one sample at reading number 17.098 and the next reading i sample is reading number 23.491. Which means that i discarded a little more than 6.000 readings. That is a pretty small rate and i loose a lot of information. Maybe you are right. Maybe reservoir sampling is not what i actually need. – Silas Jul 23 '15 at 15:19

1 Answers1

2

This is the normal behavior of algorithm R (see Knuth's "The Art of Computer Programming" 3.4.2)

However, better algorithms are available:

In contrast to Algorithm R, these algorithms draw the number of stream elements to skip at each stage, so much less random numbers are generated, especially for long streams.

Re "accuracy": In all algorithms (R,X,Y,Z,K,L,M) each element in the input stream is equally likely to be in the sample. This can be proven mathematically and demonstrated empirically by running the same algorithm over the same input stream a large number of times and measuring the frequency each element is sampled (You'll have to use a good PRNG, e.g. Mersenne Twister). The major difference between the algorithms is the amount of random numbers generated.

All algorithms are relatively simple to implement and to test. Algorithm L, though not the most efficient one, is especially compact and straightforward to implement, and still much more efficient than algorithm R.

Lior Kogan
  • 19,919
  • 6
  • 53
  • 85
  • 1
    The thing is that while at time k, every element has the same propabillity to be in the reservoir, That propabillity is much higher than the propabillity that elements share to be in the reservoir at time k+10. In other words, as the stream grows bigger, the sampling rate becomes smaller. I think that as joop stated - reservoir sampling is not what i actually need. Thanks a lot for the response – Silas Jul 24 '15 at 11:45
  • 1
    If you suggest that the probability for each input element to end up as part of the sample is not equal - your intuition is wrong. Think about it this way: the probability for an item that was placed early in the reservoir to be replaced later on is higher. – Lior Kogan Jul 24 '15 at 11:49