还是同样的问题:有一组数量未知的数据,每个元素有非负权重。要求只遍历一次,随机选取其中的一个元素,任何一个元素被选到的概率与其权重成正比。

前一篇 文章中介绍了概率分布的理论值,并用比较简洁高效的函数实现了选取一个元素的方法。现在来看一个神奇的算法,以及相关的证明和实现。

算法很简单:对于任意的 i(1 <= i <= n),按照如下方法给第 i 个元素分配一个键值 key(其中 ri 是一个 0 到 1 之间等概率分布的随机数):

\begin{equation*} key(i)=r_i^{1/w_i} \end{equation*}

之后,如果要随机选取一个元素,就去 key 最大的那个;如果要选取 m 个元素,就取 key 最大的 m 个。

真不知道是怎么想出来的这样的方法,不过还是先来关注一下证明的过程。

m=1 证明

对于 m=1 的证明过程会介绍得详细些,主要是怕我自己过几天就忘记了。概率达人可以直接秒杀之。

m=1 时,第 i 个元素被选取到的概率,就等于它所对应的键值 key(i) 是最大值的概率,即:

\begin{equation*} p_i=p(\forall j\neq i,key(j) < key(i)) \end{equation*}

把 key(i) 的计算公式代入,但要注意公式中的 ri 并不是一个固定的数值,而是随机变量。不考虑计算机数值表示的精度,可以假设 ri 是一个在 0 到 1 之间的连续均匀概率分布,因此如果要计算 key(i) 是最大的概率,必须要对 ri 所有的可能值进行概率累加,也就是积分。于是上面的概率表达式就被写成:

\begin{equation*} p_i=\int_0^1p(\forall j\neq i,r_j^{1/w_j} < r_i^{1/w_i})\mathrm{d}r_i \end{equation*}

再看式子中的 \(\forall\),它表示每一个 j 都要满足后面的条件,而各个 j 之间相互独立,因此可以写成概率乘积,于是得到:

\begin{equation*} p_i=\int_0^1\prod_{j\neq i}{p(r_j^{1/w_j} < r_i^{1/w_i})}\mathrm{d}r_i \end{equation*}

对于给定的 j,\(r_j^{1/w_j} < r_i^{1/w_i}\Rightarrow r_j < r_i^{w_j/w_i}\),另外 rj 也是个均匀概率分布,将概率密度函数代入可以得到:

\begin{equation*} p(r_j < r_i^{w_j/w_i})=\int_0^{r_i^{w_j/w_i}}1\mathrm{d}r_j=r_i^{w_j/w_i} \end{equation*}

因此,上面的概率算式就变成(其中 w 就是前一篇文章中提到的所有元素的权重之和):

\begin{equation*} p_i=\int_0^1\prod_{j\neq i}{r_i^{w_j/w_i}}\mathrm{d}r_i=\int_0^1r_i^{(w-w_i)/w_i}\mathrm{d}r_i=\frac{w_i}{w} \end{equation*}

最后的结果跟 前一篇 文章中的理论值相等,证明完毕。

m>=1 证明

当 m 取任意值时,概率公式变得非常复杂,在前一篇文章中使用了第 i 个元素不被选到的概率来简化表达式。现在的证明也从同样的角度进行。

第 i 个元素不被选到的概率,显然等于这 n 个元素中,至少存在 m 个元素的键值大于 key(i),与之前的讨论一样,不妨设这 m 个元素的下标(按键值从大到小)依次为 j1, j2, ..., jm\(\forall 1\leq k\leq m,j_k\notin\{i,j_1,j_2,\cdots,j_{k-1}\}\),满足 \(\forall 1\leq t_k\leq n,t_k\notin\{j_1,j_2,\cdots,j_{k}\},key(j_k) > key(t_k)\)。注意 jk 和 tk 的取值范围,为了简单起见,下面的式子中就不再重复了。

\begin{equation*} \bar p_i(m)=p(\exists j_1,j_2,...,j_m\neq i,key(j_1) > key(j_2) > ... > key(j_m) > key(i)) \end{equation*}

为了能够进一步求解,必须把这个连等式拆开。这里要非常小心,各个 jk 并不是相互独立的,比如当 j1 改变的时候,j2 的取值范围也会随之变化,依此类推。拆开之后的式子如下:

\begin{equation*} \begin{array}{rrrl} \bar p_i(m)=p( & \exists j_1( & & \forall t_1,key(j_1) > key(t_1),\\ & & \exists j_2( & \forall t_2,key(j_2) > key(t_2),\\ & & & ...,\\ & & & \exists j_m(\forall t_m,key(j_m) > key(t_m))\\ & & ) & \\ & ) & & \\ ) & & & \end{array} \end{equation*}

看起来还是相当恐怖的,一层套一层。注意等式右边已经没有显式地关于 i 的信息了,这些信息被隐含在 jk 和 tk 的取值范围中,切记。对每个 jk,把 key(jk) 的式子代进去,转换成积分;同时把 \(\forall t_k\) 转换为 \(\prod_{t_k}\)。这些在 m=1 的证明中都提到过了。新出现的是 \(\exists j_k\),这个显然适用概率加法,因为 jk 取不同的值对应于不同的互斥方案。经过这些变换得到:

\begin{equation*} \begin{array}{rrrl} \bar p_i(m)= & \sum_{j_1}( & & \int_0^1\prod_{t_1}p(r_{j_1}^{1/w_{j_1}} > r_{t_1}^{1/w_{t_1}})\mathrm d r_{j_1}\times\\ & & \sum_{j_2}( & \int_0^1\prod_{t_2} p(r_{j_2}^{1/w_{j_2}} > r_{t_2}^{1/w_{t_2}})\mathrm d r_{j_2}\times\\ & & & ...\times\\ & & & \sum_{j_m}(\int_0^1\prod_{t_m} p(r_{j_m}^{1/w_{j_m}} > r_{t_m}^{1/w_{t_m}})\mathrm d r_{j_m})\\ & & ) & \\ & ) & & \\ \end{array} \end{equation*}

其中的积分式在之前已经见过了,其运算过程如下(注意 tk 的取值范围):

\begin{equation*} \begin{array}{rl} & \int_0^1\prod_{t_k}p(r_{j_k}^{1/w_{j_k}} > r_{t_k}^{1/w_{t_k}})\mathrm{d}r_{j_k} \\ & \\ = & \int_0^1\prod_{t_k}r_{j_k}^{w_{t_k}/w_{j_k}}\mathrm{d}r_{j_k} \\ & \\ = & \int_0^1r_{j_k}^{(\sum_{t_k}w_{t_k})/w_{j_k}}\mathrm{d}r_{j_k} \\ & \\ = & \frac{w_{j_k}}{(\sum_{t_k}w_{t_k})+w_{j_k}} \\ & \\ = & \frac{w_{j_k}}{w-(w_{j_1}+w_{j_2}+...+w_{j_{k-1}})} \end{array} \end{equation*}

最终,概率计算式子变成:

\begin{equation*} \bar p_i(m)=\sum_{j_1}\left(\frac{w_{j_1}}{w}\sum_{j_2}\left(\frac{w_{j_2}}{w-w_{j_1}}\sum_{j_3}\left(\frac{w_{j_2}}{w-w_{j_1}-w_{j_2}}\cdots\sum_{j_m}\frac{w_{j_m}}{w-\sum_{k=1}^{m-1}w_{j_k}}\right)\right)\right) \end{equation*}

前一篇 文章中的理论值完全一样。

呼,可怕的推导过程。

程序实现

虽然证明过程异常恐怖,但实现起来却很简单。实际运算中,只要维持一个大小为 m 的最小堆(没错,是最小堆)来保存当前已知的最大的 m 个键值,每拿到一个新的元素,算出对应的键值,如果它比堆中的最小值大,就可以放入堆中替换掉最小值。Python 实现函数如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from random import Random
from heapq import *

def WeightedRandomSample(m=1, rand=None):
  assert m > 0, 'invalid m'
  selection = []
  heap = []
  if rand is None:
    rand = Random()
  while True:
    # Outputs the current selection and gets next item
    (item, weight) = yield selection
    if weight <= 0: continue
    key = rand.random() ** (1.0 / weight)
    if len(selection) < m:
      heap.append((key, len(selection)))
      selection.append(item)
      if len(selection) == m:
        heapify(heap)
    else:
      if key > heap[0][0]:
        index = heap[0][1]
        heapreplace(heap, (key, index))
        selection[index] = item

每次拿到一个新的元素,通过 key = rand.random() ** (1.0 / weight) 产生一个与其权重有关的随机键值 key。当元素个数小于 m 时,直接将新的元素放入堆空间中(但并不建堆),这样只用 O(1) 时间;当遇到第 m 个元素后,堆空间放满了,这时候进行建堆操作(heapify(heap)),需要 O(m) 时间;之后每拿到一个新的元素,用 O(1) 时间从堆顶拿出最小值与新元素的键值比较,如果后者更大就用后者替换掉堆顶元素,对堆进行必要的操作(O(log m) 时间)以保持其结构(heapreplace(heap, (key, index)))。

关于 Python 中的堆可以参考:http://docs.python.org/library/heapq.html

总体来看,整段程序用时 O(n * log m),占用 O(m) 辅助空间。这样的处理比较适用于 m << n 的情况。当 m 与 n 接近时,可以用 n 个辅助空间存储所有元素的键值,当遍历结束后用 O(n) 时间对这 n 个元素执行快速选择算法,选出 m 个最大的元素即可,耗时 O(n),辅助空间 O(n)。

用同样一组具有等差分布权重的元素调用 WeightedRandomSample 十万次,得到如下的概率分布,与理论分布非常接近。

Like this post? Share on: TwitterFacebookEmail

Comments

So what do you think? Did I miss something? Is any part unclear? Leave your comments below.

comments powered by Disqus

Keep Reading


Published

Last Updated

random-selection

Category

算法

Tags

Stay in Touch