还是同样的问题:有一组数量未知的数据,每个元素有非负权重。要求只遍历一次,随机选取其中的一个元素,任何一个元素被选到的概率与其权重成正比。
在 前一篇 文章中介绍了概率分布的理论值,并用比较简洁高效的函数实现了选取一个元素的方法。现在来看一个神奇的算法,以及相关的证明和实现。
算法很简单:对于任意的 i(1 <= i <= n),按照如下方法给第 i 个元素分配一个键值 key(其中 ri 是一个 0 到 1 之间等概率分布的随机数):
之后,如果要随机选取一个元素,就去 key 最大的那个;如果要选取 m 个元素,就取 key 最大的 m 个。
真不知道是怎么想出来的这样的方法,不过还是先来关注一下证明的过程。
m=1 证明
对于 m=1 的证明过程会介绍得详细些,主要是怕我自己过几天就忘记了。概率达人可以直接秒杀之。
m=1 时,第 i 个元素被选取到的概率,就等于它所对应的键值 key(i) 是最大值的概率,即:
把 key(i) 的计算公式代入,但要注意公式中的 ri 并不是一个固定的数值,而是随机变量。不考虑计算机数值表示的精度,可以假设 ri 是一个在 0 到 1 之间的连续均匀概率分布,因此如果要计算 key(i) 是最大的概率,必须要对 ri 所有的可能值进行概率累加,也就是积分。于是上面的概率表达式就被写成:
再看式子中的 \(\forall\),它表示每一个 j 都要满足后面的条件,而各个 j 之间相互独立,因此可以写成概率乘积,于是得到:
对于给定的 j,\(r_j^{1/w_j} < r_i^{1/w_i}\Rightarrow r_j < r_i^{w_j/w_i}\),另外 rj 也是个均匀概率分布,将概率密度函数代入可以得到:
因此,上面的概率算式就变成(其中 w 就是前一篇文章中提到的所有元素的权重之和):
最后的结果跟 前一篇 文章中的理论值相等,证明完毕。
m>=1 证明
当 m 取任意值时,概率公式变得非常复杂,在前一篇文章中使用了第 i 个元素不被选到的概率来简化表达式。现在的证明也从同样的角度进行。
第 i 个元素不被选到的概率,显然等于这 n 个元素中,至少存在 m 个元素的键值大于 key(i),与之前的讨论一样,不妨设这 m 个元素的下标(按键值从大到小)依次为 j1, j2, ..., jm,\(\forall 1\leq k\leq m,j_k\notin\{i,j_1,j_2,\cdots,j_{k-1}\}\),满足 \(\forall 1\leq t_k\leq n,t_k\notin\{j_1,j_2,\cdots,j_{k}\},key(j_k) > key(t_k)\)。注意 jk 和 tk 的取值范围,为了简单起见,下面的式子中就不再重复了。
为了能够进一步求解,必须把这个连等式拆开。这里要非常小心,各个 jk 并不是相互独立的,比如当 j1 改变的时候,j2 的取值范围也会随之变化,依此类推。拆开之后的式子如下:
看起来还是相当恐怖的,一层套一层。注意等式右边已经没有显式地关于 i 的信息了,这些信息被隐含在 jk 和 tk 的取值范围中,切记。对每个 jk,把 key(jk) 的式子代进去,转换成积分;同时把 \(\forall t_k\) 转换为 \(\prod_{t_k}\)。这些在 m=1 的证明中都提到过了。新出现的是 \(\exists j_k\),这个显然适用概率加法,因为 jk 取不同的值对应于不同的互斥方案。经过这些变换得到:
其中的积分式在之前已经见过了,其运算过程如下(注意 tk 的取值范围):
最终,概率计算式子变成:
与 前一篇 文章中的理论值完全一样。
呼,可怕的推导过程。
程序实现
虽然证明过程异常恐怖,但实现起来却很简单。实际运算中,只要维持一个大小为 m 的最小堆(没错,是最小堆)来保存当前已知的最大的 m 个键值,每拿到一个新的元素,算出对应的键值,如果它比堆中的最小值大,就可以放入堆中替换掉最小值。Python 实现函数如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | from random import Random
from heapq import *
def WeightedRandomSample(m=1, rand=None):
assert m > 0, 'invalid m'
selection = []
heap = []
if rand is None:
rand = Random()
while True:
# Outputs the current selection and gets next item
(item, weight) = yield selection
if weight <= 0: continue
key = rand.random() ** (1.0 / weight)
if len(selection) < m:
heap.append((key, len(selection)))
selection.append(item)
if len(selection) == m:
heapify(heap)
else:
if key > heap[0][0]:
index = heap[0][1]
heapreplace(heap, (key, index))
selection[index] = item
|
每次拿到一个新的元素,通过 key = rand.random() ** (1.0 / weight) 产生一个与其权重有关的随机键值 key。当元素个数小于 m 时,直接将新的元素放入堆空间中(但并不建堆),这样只用 O(1) 时间;当遇到第 m 个元素后,堆空间放满了,这时候进行建堆操作(heapify(heap)),需要 O(m) 时间;之后每拿到一个新的元素,用 O(1) 时间从堆顶拿出最小值与新元素的键值比较,如果后者更大就用后者替换掉堆顶元素,对堆进行必要的操作(O(log m) 时间)以保持其结构(heapreplace(heap, (key, index)))。
关于 Python 中的堆可以参考:http://docs.python.org/library/heapq.html。
总体来看,整段程序用时 O(n * log m),占用 O(m) 辅助空间。这样的处理比较适用于 m << n 的情况。当 m 与 n 接近时,可以用 n 个辅助空间存储所有元素的键值,当遍历结束后用 O(n) 时间对这 n 个元素执行快速选择算法,选出 m 个最大的元素即可,耗时 O(n),辅助空间 O(n)。
用同样一组具有等差分布权重的元素调用 WeightedRandomSample 十万次,得到如下的概率分布,与理论分布非常接近。
Comments
So what do you think? Did I miss something? Is any part unclear? Leave your comments below.