Reservoir Sampling in Clojure
Lately I’ve been moving our data backend to use Apache Kafka to store our many data sources. I think it’s a great way to deal with the problem of data collection, Kafka gives us great flexibility in how we consume the data and how we query it.
One of our data scientists asked if we could randomly sample a stream. This is a common activity in machine learning and statistics, and it’s trivial on a known dataset. If you have 100 items and you need a random sample of 10, just pick 10 indexes randomly and you’re done. A stream, though, is processed linearly. You don’t know how many items are in the stream. In other words, you need random indexes from 1 to n, but you don’t know what n is!
It turns out there is a simple algorithm for this called Reservoir Sampling. You can read a good description here.
Let’s build the intuition of the algorithm. Let’s assume that we want 10 random samples. If we have read 10 items, how many of those are in our random sample? Every one of the 10. Now we read 1 more, giving us a total of 11 items we have seen. What are the odds for each item to be in the random sample now?
Hopefully it is clear that each item has 10 out of 11 chances to be in the random sample. What about 99 samples? Each item would have 10/99 chances of being in the random sample. It should be clear now that item n has 10/n chances of being in the random sample. That’s it!
The first 10 items automatically go in, and each new item is given a random chance of satisfying the 10/n odds. If it passes, that means it should push out an existing random sample. So we choose an index from 1 to 10 and replace the corresponding value. It’s a beautifully simple algorithm for randomly sampling a stream of data.
Lazy sequences are prevalent in Clojure, so I wanted to write a simple implementation that would operate as part of a lazy sequence. You can find a robust implementation by BigML, but I wanted even simpler.
(defn reservoir-sampler ([size sample stream] (reservoir-sampler size 1 sample stream)) ([size position sample stream] (letfn [(take-sample [x key] (if (< (count @sample) size) (swap! sample conj x) (swap! sample assoc key x))) (test-sample [x] (let [chance (rand 1) limit (/ size position)] (cond (> limit 1) (take-sample x position) (< chance limit) (take-sample x (int (* size (rand)))))) x)] (lazy-seq (when-let [s (seq stream)] (cons (test-sample (first s)) (reservoir-sampler size (inc position) sample (rest s))))))))
The function takes three parameters, the size of the random sample, an vector atom to store the random sample, and sequence (lazy or not).
There are two inner functions, take-sample and test-sample. The test-sample function determines if the current item should become a random sample, and take-sample enters the item into the random sample.
Let’s try it out:
user> (def my-sample (atom )) user> (defn run-sampler [size stream-len] #_=> (doseq [x (take stream-len (reservoir-sampler size my-sample (iterate inc 1)))]) #_=> (println "Random sample:" @my-sample)) user> (run-sampler 10 1000) Random sample: [334 778 3 51 956 705 543 634 538 930] nil
I’m open to any improvements, with either technique or performance.