numpy.random.choice
允许从矢量加权选择,即
arr = numpy.array([1, 2, 3])
weights = numpy.array([0.2, 0.5, 0.3])
choice = numpy.random.choice(arr, p=weights)
选择1概率为0.2,2选择概率为0.5,3选择概率为0.3.
如果我们想以矢量化的方式快速完成2D阵列(矩阵),每个行都是概率矢量,该怎么办?也就是说,我们想要一个随机矩阵的选择向量?这是超级慢的方式:
import numpy as np
m = 10
n = 100 # Or some very large number
items = np.arange(m)
prob_weights = np.random.rand(m, n)
prob_matrix = prob_weights / prob_weights.sum(axis=0, keepdims=True)
choices = np.zeros((n,))
# This is slow, because of the loop in Python
for i in range(n):
choices[i] = np.random.choice(items, p=prob_matrix[:,i])
print(choices)
:
array([ 4., 7., 8., 1., 0., 4., 3., 7., 1., 5., 7., 5., 3., 1., 9., 1., 1., 5., 9., 8., 2., 3., 2., 6., 4., 3., 8., 4., 1., 1., 4., 0., 1., 8., 5., 3., 9., 9., 6., 5., 4., 8., 4., 2., 4., 0., 3., 1., 2., 5., 9., 3., 9., 9., 7., 9., 3., 9., 4., 8., 8., 7., 6., 4., 6., 7., 9., 5., 0., 6., 1., 3., 3., 2., 4., 7., 0., 6., 3., 5., 8., 0., 8., 3., 4., 5., 2., 2., 1., 1., 9., 9., 4., 3., 3., 2., 8., 0., 6., 1.])
这篇文章表明,cumsum
和bisect
可能是一个潜在的办法,并很快.但是虽然numpy.cumsum(arr, axis=1)
可以沿numpy数组的一个轴执行此操作,但该bisect.bisect
函数一次只能在单个数组上运行.同样,也numpy.searchsorted
只适用于1D阵列.
有没有一种快速方法只使用矢量化操作来做到这一点?
这是一个非常快速的完全矢量化版本:
def vectorized(prob_matrix, items): s = prob_matrix.cumsum(axis=0) r = np.random.rand(prob_matrix.shape[1]) k = (s < r).sum(axis=0) return items[k]
理论上,searchsorted
用于在累积求和概率中查找随机值是正确的函数,但是m
相对较小,k = (s < r).sum(axis=0)
最终会更快.它的时间复杂度是O(m),而searchsorted
方法是O(log(m)),但这只会更大m
. 另外,cumsum
是O(m),所以两者vectorized
和@ perimosocordiae improved
都是O(m).(如果你的m
实际上要大得多,你必须运行一些测试,看看m
在这个方法变慢之前有多大.)
这是我得到的时间m = 10
和n = 10000
(使用函数original
和improved
@ perimosocordiae的答案):
In [115]: %timeit original(prob_matrix, items) 1 loops, best of 3: 270 ms per loop In [116]: %timeit improved(prob_matrix, items) 10 loops, best of 3: 24.9 ms per loop In [117]: %timeit vectorized(prob_matrix, items) 1000 loops, best of 3: 1 ms per loop
定义函数的完整脚本是:
import numpy as np def improved(prob_matrix, items): # transpose here for better data locality later cdf = np.cumsum(prob_matrix.T, axis=1) # random numbers are expensive, so we'll get all of them at once ridx = np.random.random(size=n) # the one loop we can't avoid, made as simple as possible idx = np.zeros(n, dtype=int) for i, r in enumerate(ridx): idx[i] = np.searchsorted(cdf[i], r) # fancy indexing all at once is faster than indexing in a loop return items[idx] def original(prob_matrix, items): choices = np.zeros((n,)) # This is slow, because of the loop in Python for i in range(n): choices[i] = np.random.choice(items, p=prob_matrix[:,i]) return choices def vectorized(prob_matrix, items): s = prob_matrix.cumsum(axis=0) r = np.random.rand(prob_matrix.shape[1]) k = (s < r).sum(axis=0) return items[k] m = 10 n = 10000 # Or some very large number items = np.arange(m) prob_weights = np.random.rand(m, n) prob_matrix = prob_weights / prob_weights.sum(axis=0, keepdims=True)