我有一堆钥匙,每个钥匙都有一个不可靠的变量.我想随机选择其中一个键,但我希望它不太可能被选中(键,值)而不是不太可能(更可能)的对象.我想知道你是否会有任何建议,最好是我可以使用的现有python模块,否则我需要自己制作.
我检查了随机模块; 它似乎没有提供这个.
我必须为1000个不同的对象集做出数百万次这样的选择,每个对象包含2,455个对象.每个集合将在彼此之间交换对象,因此随机选择器需要是动态的.拥有1000套2,433件物品,即243.3万件物品; 低内存消耗至关重要.由于这些选择不是算法的主要部分,我需要这个过程非常快; CPU时间有限.
谢谢
更新:
好的,我试图明智地考虑你的建议,但时间是如此有限......
我查看了二叉搜索树方法,它看起来风险太大(复杂而复杂).其他建议都类似于ActiveState配方.我拿了它并稍微修改了一下,希望提高效率:
def windex(dict, sum, max): '''an attempt to make a random.choose() function that makes weighted choices accepts a dictionary with the item_key and certainty_value as a pair like: >>> x = [('one', 20), ('two', 2), ('three', 50)], the maximum certainty value (max) and the sum of all certainties.''' n = random.uniform(0, 1) sum = max*len(list)-sum for key, certainty in dict.iteritems(): weight = float(max-certainty)/sum if n < weight: break n = n - weight return key
我希望通过动态保持确定性和最大确定性之和来获得效率增益.欢迎任何进一步的建议.你们节省了我这么多时间和精力,同时提高我的效率,这很疯狂.谢谢!谢谢!谢谢!
UPDATE2:
我决定让它一次选择更多选择,从而提高效率.这将导致我的算法中的精度损失可接受,因为它本质上是动态的.无论如何,这就是我现在拥有的:
def weightedChoices(dict, sum, max, choices=10): '''an attempt to make a random.choose() function that makes weighted choices accepts a dictionary with the item_key and certainty_value as a pair like: >>> x = [('one', 20), ('two', 2), ('three', 50)], the maximum certainty value (max) and the sum of all certainties.''' list = [random.uniform(0, 1) for i in range(choices)] (n, list) = relavate(list.sort()) keys = [] sum = max*len(list)-sum for key, certainty in dict.iteritems(): weight = float(max-certainty)/sum if n < weight: keys.append(key) if list: (n, list) = relavate(list) else: break n = n - weight return keys def relavate(list): min = list[0] new = [l - min for l in list[1:]] return (min, new)
我还没有尝试过.如果您有任何意见/建议,请不要犹豫.谢谢!
UPDATE3:
我一整天都在为Rex Logan的答案定制任务.它实际上是一个特殊的字典类,而不是2个对象和权重数组; 因为Rex的代码生成一个随机索引,所以事情变得非常复杂......我还编写了一个测试用例,它类似于我算法中会发生的事情(但直到我尝试才真正知道!).基本原则是:经常随机生成密钥越多,再次生成密钥的可能性就越小:
import random, time import psyco psyco.full() class ProbDict(): """ Modified version of Rex Logans RandomObject class. The more a key is randomly chosen, the more unlikely it will further be randomly chosen. """ def __init__(self,keys_weights_values={}): self._kw=keys_weights_values self._keys=self._kw.keys() self._len=len(self._keys) self._findSeniors() self._effort = 0.15 self._fails = 0 def __iter__(self): return self.next() def __getitem__(self, key): return self._kw[key] def __setitem__(self, key, value): self.append(key, value) def __len__(self): return self._len def next(self): key=self._key() while key: yield key key = self._key() def __contains__(self, key): return key in self._kw def items(self): return self._kw.items() def pop(self, key): try: (w, value) = self._kw.pop(key) self._len -=1 if w == self._seniorW: self._seniors -= 1 if not self._seniors: #costly but unlikely: self._findSeniors() return [w, value] except KeyError: return None def popitem(self): return self.pop(self._key()) def values(self): values = [] for key in self._keys: try: values.append(self._kw[key][1]) except KeyError: pass return values def weights(self): weights = [] for key in self._keys: try: weights.append(self._kw[key][0]) except KeyError: pass return weights def keys(self, imperfect=False): if imperfect: return self._keys return self._kw.keys() def append(self, key, value=None): if key not in self._kw: self._len +=1 self._kw[key] = [0, value] self._keys.append(key) else: self._kw[key][1]=value def _key(self): for i in range(int(self._effort*self._len)): ri=random.randint(0,self._len-1) #choose a random object rx=random.uniform(0,self._seniorW) rkey = self._keys[ri] try: w = self._kw[rkey][0] if rx >= w: # test to see if that is the value we want w += 1 self._warnSeniors(w) self._kw[rkey][0] = w return rkey except KeyError: self._keys.pop(ri) # if you do not find one after 100 tries then just get a random one self._fails += 1 #for confirming effectiveness only for key in self._keys: if key in self._kw: w = self._kw[key][0] + 1 self._warnSeniors(w) self._kw[key][0] = w return key return None def _findSeniors(self): '''this function finds the seniors, counts them and assess their age. It is costly but unlikely.''' seniorW = 0 seniors = 0 for w in self._kw.itervalues(): if w >= seniorW: if w == seniorW: seniors += 1 else: seniorsW = w seniors = 1 self._seniors = seniors self._seniorW = seniorW def _warnSeniors(self, w): #a weight can only be incremented...good if w >= self._seniorW: if w == self._seniorW: self._seniors+=1 else: self._seniors = 1 self._seniorW = w def test(): #test code iterations = 200000 size = 2500 nextkey = size pd = ProbDict(dict([(i,[0,i]) for i in xrange(size)])) start = time.clock() for i in xrange(iterations): key=pd._key() w=pd[key][0] if random.randint(0,1+pd._seniorW-w): #the heavier the object, the more unlikely it will be removed pd.pop(key) probAppend = float(500+(size-len(pd)))/1000 if random.uniform(0,1) < probAppend: nextkey+=1 pd.append(nextkey) print (time.clock()-start)*1000/iterations, "msecs / iteration with", pd._fails, "failures /", iterations, "iterations" weights = pd.weights() weights.sort() print "avg weight:", float(sum(weights))/pd._len, max(weights), pd._seniorW, pd._seniors, len(pd), len(weights) print weights test()
任何评论仍然欢迎.@Darius:你的二叉树对我来说太复杂和复杂; 而且我不认为它的叶子可以被有效地移除......谢谢所有
这个activestate配方提供了一种易于理解的方法,特别是评论中的版本,不需要您预先标准化您的权重:
import random def weighted_choice(items): """items is a list of tuples in the form (item, weight)""" weight_total = sum((item[1] for item in items)) n = random.uniform(0, weight_total) for item, weight in items: if n < weight: return item n = n - weight return item
如果您有大量项目,这将很慢.在这种情况下,二进制搜索可能会更好......但写入也会更复杂,如果样本量很小,则获得的收益很小. 以下是python中二进制搜索方法的一个示例,如果您想要遵循该路由.
(我建议对数据集上的两种方法进行一些快速性能测试.这种算法的不同方法的性能通常有点不直观.)
编辑:我接受了自己的建议,因为我很好奇,并做了一些测试.
我比较了四种方法:
上面的weighted_choice函数.
二进制搜索选择函数如下:
def weighted_choice_bisect(items): added_weights = [] last_sum = 0 for item, weight in items: last_sum += weight added_weights.append(last_sum) return items[bisect.bisect(added_weights, random.random() * last_sum)][0]
编译版本1:
def weighted_choice_compile(items): """returns a function that fetches a random item from items items is a list of tuples in the form (item, weight)""" weight_total = sum((item[1] for item in items)) def choice(uniform = random.uniform): n = uniform(0, weight_total) for item, weight in items: if n < weight: return item n = n - weight return item return choice
2的编译版本:
def weighted_choice_bisect_compile(items): """Returns a function that makes a weighted random choice from items.""" added_weights = [] last_sum = 0 for item, weight in items: last_sum += weight added_weights.append(last_sum) def choice(rnd=random.random, bis=bisect.bisect): return items[bis(added_weights, rnd() * last_sum)][0] return choice
然后,我建立了一个很大的选择列表,如下所示:
choices = [(random.choice("abcdefg"), random.uniform(0,50)) for i in xrange(2500)]
一个过于简单的分析功能:
def profiler(f, n, *args, **kwargs): start = time.time() for i in xrange(n): f(*args, **kwargs) return time.time() - start
结果:
(对该函数进行1,000次调用需要几秒钟.)
简单未编译:0.918624162674
二进制未编译:1.01497793198
简单编译:0.287325024605
二进制编译:0.00327413797379
"编译"结果包括编译选择函数一次所花费的平均时间.(我计算了1000次编译,然后将该时间除以1,000,并将结果添加到选择函数时间.)
所以:如果你有一个项目+这很少改变权重的列表,二进制编译的方法是目前为止最快的.
在对原始帖子的评论中,尼古拉斯伦纳德认为交换和抽样都需要快速.这是一个关于那个案例的想法; 我没试过.
如果只需要快速采样,我们可以使用值的数组以及它们概率的运行总和,并对运行总和进行二进制搜索(键是一个统一的随机数) - 一个O(log( n))操作.但是交换将需要更新在交换条目之后出现的所有运行总和值--O(n)操作.(您可以选择只交换列表末尾附近的项目吗?我会假设没有.)
因此,让我们在两个操作中都以O(log(n))为目标.而不是数组,为每个样本保留一个二叉树来进行采样.叶子保存样本值及其(非标准化)概率.分支节点保存其子节点的总概率.
要进行采样,请x
在0和根的总概率之间生成一个均匀的随机数,然后下降树.在每个分支处,如果左孩子具有总概率,则选择左孩子<= x
.否则从左侧儿童减去概率x
并向右减去.返回您到达的叶子值.
要进行交换,请从树中移除叶子并调整导向它的分支(降低它们的总概率,并删除任何单子分支节点).将叶子插入目标树:您可以选择放置它的位置,因此请保持平衡.在每个级别挑选一个随机的孩子可能已经足够 - 这就是我开始的地方.增加每个父节点的概率,返回到根节点.
现在,采样和交换平均为O(log(n)).(如果需要保证平衡,一种简单的方法是在分支节点中添加另一个字段,保存整个子树中的叶子数.当添加叶子时,在每个级别选择叶子较少的子节点.这样就有可能树只通过删除变得不平衡;如果集合之间存在合理均匀的流量,则这不是问题,但如果是,则在删除期间使用遍历中每个节点上的叶子计数信息选择轮换.)
更新:根据要求,这是一个基本实现.根本没有调整过.用法:
>>> t1 = build_tree([('one', 20), ('two', 2), ('three', 50)]) >>> t1 Branch(Leaf(20, 'one'), Branch(Leaf(2, 'two'), Leaf(50, 'three'))) >>> t1.sample() Leaf(50, 'three') >>> t1.sample() Leaf(20, 'one') >>> t2 = build_tree([('four', 10), ('five', 30)]) >>> t1a, t2a = transfer(t1, t2) >>> t1a Branch(Leaf(20, 'one'), Leaf(2, 'two')) >>> t2a Branch(Leaf(10, 'four'), Branch(Leaf(30, 'five'), Leaf(50, 'three')))
码:
import random def build_tree(pairs): tree = Empty() for value, weight in pairs: tree = tree.add(Leaf(weight, value)) return tree def transfer(from_tree, to_tree): """Given a nonempty tree and a target, move a leaf from the former to the latter. Return the two updated trees.""" leaf, from_tree1 = from_tree.extract() return from_tree1, to_tree.add(leaf) class Tree: def add(self, leaf): "Return a new tree holding my leaves plus the given leaf." abstract def sample(self): "Pick one of my leaves at random in proportion to its weight." return self.sampling(random.uniform(0, self.weight)) def extract(self): """Pick one of my leaves and return it along with a new tree holding my leaves minus that one leaf.""" return self.extracting(random.uniform(0, self.weight)) class Empty(Tree): weight = 0 def __repr__(self): return 'Empty()' def add(self, leaf): return leaf def sampling(self, weight): raise Exception("You can't sample an empty tree") def extracting(self, weight): raise Exception("You can't extract from an empty tree") class Leaf(Tree): def __init__(self, weight, value): self.weight = weight self.value = value def __repr__(self): return 'Leaf(%r, %r)' % (self.weight, self.value) def add(self, leaf): return Branch(self, leaf) def sampling(self, weight): return self def extracting(self, weight): return self, Empty() def combine(left, right): if isinstance(left, Empty): return right if isinstance(right, Empty): return left return Branch(left, right) class Branch(Tree): def __init__(self, left, right): self.weight = left.weight + right.weight self.left = left self.right = right def __repr__(self): return 'Branch(%r, %r)' % (self.left, self.right) def add(self, leaf): # Adding to a random branch as a clumsy way to keep an # approximately balanced tree. if random.random() < 0.5: return combine(self.left.add(leaf), self.right) return combine(self.left, self.right.add(leaf)) def sampling(self, weight): if weight < self.left.weight: return self.left.sampling(weight) return self.right.sampling(weight - self.left.weight) def extracting(self, weight): if weight < self.left.weight: leaf, left1 = self.left.extracting(weight) return leaf, combine(left1, self.right) leaf, right1 = self.right.extracting(weight - self.left.weight) return leaf, combine(self.left, right1)
更新2:在回答另一个问题时,Jason Orendorff指出二进制树可以通过在数组中表示它们来保持完美平衡,就像经典的堆结构一样.(这也节省了在指针上花费的空间.)请参阅我对该答案的评论,以了解如何使他的代码适应这个问题.