I have been interested in streaming data algorithms for some time. These algorithms assume that data arrive sequentially over time and/or that the data set is too large to fit into memory for random access. Perhaps the most widely-known streaming algorithm is HyperLogLog, which calculates the approximate number of distinct values in a stream, with fixed memory use. In this post, I will discuss a simple algorithm for randomly sampling from from a data stream.
Let the values of the stream be x1,x2,x3,…; we do not need to assume that the stream ever terminates, although it may. The reservoir algorithm samples k random items from the stream (without replacement) in the sense that after seeing n data points, the probability that any individual data point is in the sample is kn. This algorithm only requires one pass through the stream, and uses storage proportionale to k (not the total, possibly infinite, size of the stream).
The reservoir algorithm is so named because at each step it updates a "reservoir" of candidate samples. We will denote the reservoir of candidate samples by R. We will use Rt to denote the state of R after observing the first t data points. We think of R as a vector of length k, so Rt[0] is the first candidate sample after t data points have been seen, Rt[1] is the second, Rt[k−1] is the last, etc. It is important that k is small enough that the reservoir vectors can be stored in memory (or at least accessed reasonably quickly on disk). Also, it is not necessary for the
We initialize the first reservoir, Rk with the first k data points we see. At this point, we have a random sample (without replacement) of the first k data points from the stream.
Suppose now that we have seen t−1 elements and have a reservoir of sample candidates Rt−1. When we receive xt, we generate an integer i uniformly distributed in the interval [1,t]. If i≤k, we set Rt[i−1]=xt; otherwise, we wait for the next data point.
Intuitively, this algorithm seems reasonable, because P(xt∈Rt)=P(i≤k)=kt, as we expect from a uniform random sample. What is less clear at this point, is that for any s<t P(xs∈Rt)=kt as well. We will now prove this fact.
First, we calculate the probability that a candidate sample in the reservoir remains after another data point is received. We Let xs∈Rt, and suppose we have observed xt+1. The candidate sample xs will be in Rt+1 if and only if the random integer i generated for xt+1 is not the index of xs in Rt. Since i is uniformly distributed in the interval [1,t+1], we have that
P(xs∈Rt+1 | xs∈Rt)=tt+1.Now, suppose that we have received n data points. First consider the case where k<s<n. Then
P(xs∈Rn)=P(xs∈Rn | xs∈Rs)⋅P(xs∈Rs).The second term, P(xs∈Rs), is the probability that xs entered the reservoir when it was first observed, so
P(xs∈Rs)=ks.To calculate the first term, P(xs∈Rn | xs∈Rs), we multiply the probability that xs remains in the reservoir after each subsequent observation, yielding
P(xs∈Rn | xs∈Rs)=n−1∏t=sP(xs∈Rt+1 | xs∈Rt)=ss+1⋅s+1s+2⋅⋯⋅n−1n=sn.Therefore
P(xs∈Rn)=sn⋅ks=kn,as desired.
Now consider the case where s≤k, so that P(xs∈Rk)=1. In this case,
P(xs∈Rn)=P(xs∈Rn | xs∈Rk)=n−1∏t=kP(xs∈Rt+1 | xs∈Rt)=kk+1⋅k+1k+2⋅⋯⋅n−1n=kn,as desired.
Below we give an implementation of this algorithm in Python.
import itertools as itl
import numpy as np
k = 10
def sample_after(stream, k):
"""
Return a random sample ok k elements drawn without replacement from stream.
This function is designed to be used when the elements of stream cannot
fit into memory.
"""
r = np.array(list(itl.islice(stream, k)))
for t, x in enumerate(stream, k + 1):
i = np.random.randint(1, t + 1)
if i <= k:
r[i - 1] = x
return r
sample_after(xrange(1000000000), 10)
array([950266975, 378108182, 637777154, 915372867, 298742970, 629846773, 749074581, 893637541, 328486342, 685539979])
Vitter^[Vitter, Jeffrey S. "Random sampling with a reservoir." ACM Transactions on Mathematical Software (TOMS) 11.1 (1985): 37-57.] gives three generalizations of this simple reservoir sampling algorithm, all based on the following idea.
Instead of generating a random integer for each data point, we generate the number of data points to skip before the next candidate data point. Let St be the number of data points to advance from xt before adding a candidate to the reservoir. For example, if St=3, we would ignore xt+1 and xt+2 and add xt+3 to the reservoir.
The simple reservoir algorithm leads to the following distribution on St:
P(St=1)=kt+1In general,
P(St=s)=k⋅t!(t+s−(k+1))!(t+s)!(t−k)!.Vitter gives three generalizations of the reservor algorithm, each based on different ways of sampling from the distribution of St. These generalizations have the advantage of requiring the generation of fewer random integers than the simple reservoir algorithm given above.