4

I am trying to implement A-Chao version of weighted reservoir sampling as shown in https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_A-Chao

But I found that the pseudo-code described in wiki seems to be wrong, especially on the initialization part. I read the paper, it mentions we need to handle over-weighted data points, but I still cannot get the idea how to initialize correctly.

In my understanding, on initialization step, we want to make sure all initial data points chosen should have same probability*weight to be chosen. However, I don't understand how the over-weighted points is related with that.

Code I implemented according to the wiki, but the results show it is incorrect.

const reservoirSampling = <T>(dataList: T[], k: number, getWeight: (point: T) => number): T[] => {
  const sampledList = dataList.slice(0, k);
  let currentWeightSum: number = sampledList.reduce((sum, item) => sum + getWeight(item), 0);
  for (let i = k; i < dataList.length; i++) {
    const currentItem = dataList[i];
    currentWeightSum += getWeight(currentItem);
    const probOfChoosingCurrentItem = getWeight(currentItem) / currentWeightSum;
    const rand = Math.random();
    if (rand <= probOfChoosingCurrentItem) {
      sampledList[getRandomInt(0, k - 1)] = currentItem;
    }
  }
  return sampledList;
};
hippietrail
  • 15,848
  • 18
  • 99
  • 158
zzz
  • 975
  • 8
  • 14

2 Answers2

2

The best way to get the distribution that Chao's algorithm produces is to implement VarOptk sampling as in the pseudocode labeled Algorithm 1 from the paper that introduced VarOptk sampling by Cohen et al.

That's an arXiv link and hence very stable, but to summarize, the idea is to separate the items into "heavy" (weight high enough to guarantee inclusion in the sample so far) and "light" (the others). Keep the heavy items in a priority queue where it is easy to remove the lightest of them. When a new item comes in, we have to determine whether it is heavy or light, and which heavy items became light (if any). Then there's a sampling procedure for dropping an item that treats the heavy → light items specially using weighted sampling and then falls back to choosing a uniform random light item (as in the easy case of Chao's algorithm).

The one trick with the pseudocode is that, if you use floating-point arithmetic, you have to be a little careful about "impossible" cases. Post your finished code on Code Review and ping me here if you would like feedback.

David Eisenstat
  • 64,237
  • 7
  • 60
  • 120
0

You will find a python implementation of Chao's strategy below. Here is a plot of 10000 samples from 0,..,99 with weights indicated by the yellow lines. The y-coordinate denotes how many times a given item was sampled.

counter of 1000 samples

I first implemented the pseudocode on Wikipedia, and agree completely with the OP that it is dead wrong. It then took me more than a day to understand Chao's paper. I also found the section of Tillé's book on Chao's method (see Algorithm 6.14 on page 120) helpful. (I don't know what the OP means by with the issues with initialization.)

Disclaimer: I am new to python, and just tried to do my best. I think posting code might be more helpful than posting pseudocode. (Mainly I want to save someone a day's work getting to the bottom of Chao's paper!) If you do end up using this, I'd appreciate any feedback. Standard health warnings apply!

First, Chao's computation of inclusion probabilities:

import numpy as np
import random
def compute_Chao_probs(weights, total_weight, sample_size):
"""

Consider a weighted population, some of its members, and their weights.
This function returns a list of probabilities that these members are selected
in a weighted sample of sample_size members of the population.

Example 1: If all weights are equal, this probability is sample_size /(size of population).
Example 2: If the size of our population is sample_size then these probabilities are all 1.

Naively we expect these probabilities to be given by sample_size*weight/total_weight, however
this may lead to a probability greater than 1. For example, consider a population
of 3 with weights [3,1,1], and suppose we want to select 2 elements. The naive
probability of selecting the first element is 2*3/5 > 1.

We follow Chao's description: compute naive guess, set any probs which are bigger
than 1 to 1, rinse and repeat.

We expect to call this routine many times, so we avoid for loops, and try to make numpy do the work.

"""

assert all(w > 0 for w in weights), "weights must be strictly positive."

# heavy_items is a True / False array of length sample_size.
# True indicates items deemed "heavy" (i.e. assigned probability 1)
# At the outset, no items are heavy:
heavy_items = np.zeros(len(weights),dtype=bool)

while True:
    new_probs = (sample_size - np.sum(heavy_items))/(total_weight - np.sum(heavy_items*weights))*weights
    valid_probs = np.less_equal(np.logical_not(heavy_items) * new_probs, np.ones((len(weights))))
    if all(valid_probs): # we are done
        return np.logical_not(heavy_items)*new_probs + heavy_items
    else: # we need to declare some more items heavy
        heavy_items = np.logical_or(heavy_items, np.logical_not(valid_probs))

Then Chao's rejection rule:

def update_sample(current_sample, new_item, new_weight):
"""

We have a weighted population, from which we have selected n items.
We know their weights, the total_weight of the population, and the
probability of their inclusion in the sample when we selected them.

Now new_item arrives, with a new_weight. Should we take it or not?

current_sample is a dictionary, with keys 'items', 'weights', 'probs'
and 'total_weight'. This function updates current_sample according to
Chao's recipe.

"""

items = current_sample['items']
weights = current_sample['weights']
probs = current_sample['probs']
total_weight = current_sample['total_weight']
    
assert len(items) == len(weights) and len(weights) == len(probs)

fixed_sample_size = len(weights)

total_weight = total_weight + new_weight

new_Chao_probs = compute_Chao_probs(np.hstack((weights,[new_weight])),total_weight,fixed_sample_size)
    
if random.random() <= new_Chao_probs[-1]: # we should take new_item
    
    #
    # Now we need to decide which element should be replaced.
    # Fix an index i in items, and let P denote probability. We have:
    # P(i is selected in previous step) = probs[i]
    # P(i is selected at current step) = new_Chao_probs[i]
    # Hence (by law of conditional probability)
    # P(i is selected at current step | i is selected at previous step) = new_Chao_probs[i] / probs[i]
    # Thus:
    # P(i is not selected at current step | i is selected at previous step) = 1 - new_Chao_probs[i] / probs[i]
    # Now is we condition this on the assumption that the new element is taken, we get
    # 1/new_Chao_probs[-1]*(1 - new_Chao_probs[i] / probs[i]).
    #
    # (*I think* this is what Chao is talking about in the two paragraphs just before Section 3 in his paper.)

    rejection_weights = 1/new_Chao_probs[-1]*(np.ones((fixed_sample_size)) - (new_Chao_probs[0:-1]/probs))

    # assert np.isclose(np.sum(rejection_weights),1)
    # In examples we see that np.sum(rejection_weights) is not necessarily 1.
    # I am a little confused by this, but ignore it for the moment.
    
    rejected_index = random.choices(range(fixed_sample_size), rejection_weights)[0]
    
    #make the changes:
    
    current_sample['items'][rejected_index] = new_item
    current_sample['weights'][rejected_index] = new_weight
    current_sample['probs'] = new_Chao_probs[0:-1]
    current_sample['probs'][rejected_index] = new_Chao_probs[-1]
    
current_sample['total_weight'] = total_weight

Finally, code to test and plot:

# Now we test Chao on some different distributions.
#
# This also illustrates how to use update_sample.
#

from collections import Counter
import matplotlib.pyplot as plt

n = 10 # number of samples
items_in = list(range(100))

weights_in = [random.random() for _ in range(10)]

# other possible tests:

weights_in = [i+1 for i in range(10)] # staircase
#weights_in = [9-i+1 for i in range(10)] # upside down staircase
#weights_in = [(i+1)**2 for i in range(10)] # parabola
#weights_in = [10**i for i in range(10)] # a very heavy tailed distribution (to check numerical stability)
random.shuffle(weights_in) # sometimes it is fun to shuffle

weights_in = np.array([w for w in weights_in for _ in range(10)])

count = Counter({})

for j in range(10000):

    # we take the first n with probability 1:
    current_sample = {}
    current_sample['items'] = items_in[:n]
    current_sample['weights'] = np.array(weights_in[:n])
    current_sample['probs'] = np.ones((n))
    current_sample['total_weight'] = np.sum(current_sample['weights'])

    for i in range(n,len(items_in)):
        update_sample(current_sample, items_in[i], weights_in[i])

    count.update(current_sample['items'])

plt.figure(figsize=(20,10))
plt.plot(100000*np.array(weights_in)/np.sum(weights_in), 'yo')
plt.plot(list(count.keys()), list(count.values()), 'ro')
plt.show()