当前位置:  开发笔记 > 编程语言 > 正文

什么是一个好的速率限制算法?

如何解决《什么是一个好的速率限制算法?》经验,为你挑选了4个好方法。

我可以使用一些伪代码,或者更好的Python.我正在尝试为Python IRC机器人实现速率限制队列,它部分工作,但如果有人触发的消息少于限制(例如,速率限制是每8秒5条消息,而此人只触发4条消息),并且下一个触发器超过8秒(例如,16秒之后),机器人发送消息,但是队列变满并且机器人等待8秒,即使自8秒时间段已经过去也不需要它.



1> Antti Huima..:

这里是最简单的算法,如果您只想在消息到达太快时丢弃它们(而不是排队它们,这是有道理的,因为队列可能会变得任意大):

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    discard_message();
  else:
    forward_message();
    allowance -= 1.0;

在这个解决方案中没有数据结构,定时器等,它干净利落地工作:)为了看到这一点,'allowance'最多以每秒5/8单位的速度增长,即每8秒最多5个单位.转发的每条消息都会扣除一个单元,因此每八秒钟不能发送五条以上的消息.

请注意,rate应该是一个整数,即没有非零小数部分,否则算法将无法正常工作(实际速率不会rate/per).例如rate=0.5; per=1.0;不起作用,因为allowance永远不会增长到1.0.但rate=1.0; per=2.0;工作正常.


这是一个标准算法 - 它是一个令牌桶,没有队列.桶是'津贴'.铲斗尺寸为`rate`.`allowance + = ...`行是每*rate*÷*每*秒添加一个令牌的优化.
@zwirbeltier你上面写的不是真的.'Allowance'总是受'rate'限制(查看"// throttle"行),因此它只允许在任何特定时间发出准确的'rate'消息,即5.
这很好,但可以超过速率.假设您在时间0转发5条消息,然后在时间N*(8/5)处转发N = 1,2,...您可以发送另一条消息,在8秒内产生5条以上的消息
值得指出的是,'time_passed'的维度和比例必须与'per'相同,例如秒.
嗨skaffman,谢谢你的赞美---我把它扔出了我的袖子,但99.9%的概率有人早先想出了类似的解决方案:)

2> Carlos A. Ib..:

在排队的函数之前使用此装饰器@RateLimited(ratepersec).

基本上,这会检查自上次以来是否已经过1 /速率秒,如果没有,则等待剩余的时间,否则它不会等待.这实际上限制了你的速率/秒.装饰器可以应用于您想要的速率限制的任何功能.

在您的情况下,如果您希望每8秒最多发送5条消息,请在sendToQueue函数之前使用@RateLimited(0.625).

import time

def RateLimited(maxPerSecond):
    minInterval = 1.0 / float(maxPerSecond)
    def decorate(func):
        lastTimeCalled = [0.0]
        def rateLimitedFunction(*args,**kargs):
            elapsed = time.clock() - lastTimeCalled[0]
            leftToWait = minInterval - elapsed
            if leftToWait>0:
                time.sleep(leftToWait)
            ret = func(*args,**kargs)
            lastTimeCalled[0] = time.clock()
            return ret
        return rateLimitedFunction
    return decorate

@RateLimited(2)  # 2 per second at most
def PrintNumber(num):
    print num

if __name__ == "__main__":
    print "This should print 1,2,3... at about 2 per second."
    for i in range(1,100):
        PrintNumber(i)


这是一个列表,因为像float这样的简单类型在被闭包捕获时是不变的.通过使其成为列表,列表是常量,但其内容不是.是的,它不是线程安全的,但可以通过锁定轻松修复.
对于速率限制,你绝对不想使用`time.clock()`来测量经过的CPU时间.CPU时间可以比"实际"时间快得多或慢得多.你想使用`time.time()`来测量墙上时间("实际"时间).

3> derobert..:

令牌桶实现起来相当简单.

从带有5个令牌的桶开始.

每5/8秒:如果存储桶少于5个令牌,请添加一个.

每次要发送消息时:如果存储桶有≥1个令牌,请取出一个令牌并发送消息.否则,等待/删除消息/等等.

(显然,在实际代码中,你使用整数计数器而不是真实的标记,你可以通过存储时间戳来优化每5/8步骤)


再次阅读问题,如果速率限制每8秒完全重置一次,那么这里是一个修改:

以时间戳开始,last_send很久以前(例如,在纪元).此外,从相同的5令牌桶开始.

每5/8秒执行一次规则.

每次发送消息时:首先,检查是否last_send≥8秒前.如果是这样,请填充桶(将其设置为5个令牌).其次,如果存储桶中有令牌,则发送消息(否则,丢弃/等待/等).第三,设置last_send为现在.

这应该适用于那种情况.


我实际上是用这样的策略编写了一个IRC机器人(第一种方法).它在Perl中,而不是Python,但这里有一些代码来说明:

这里的第一部分处理向桶添加令牌.您可以看到基于时间(第2行到最后一行)添加令牌的优化,然后最后一行将桶内容限制为最大值(MESSAGE_BURST)

    my $start_time = time;
    ...
    # Bucket handling
    my $bucket = $conn->{fujiko_limit_bucket};
    my $lasttx = $conn->{fujiko_limit_lasttx};
    $bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL;
    ($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST;

$ conn是一种传递的数据结构.这是在一个常规运行的方法中(它计算下次有什么事情要做,并且长时间睡眠或直到它获得网络流量).该方法的下一部分处理发送.它相当复杂,因为消息具有与之相关的优先级.

    # Queue handling. Start with the ultimate queue.
    my $queues = $conn->{fujiko_queues};
    foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) {
            # Ultimate is special. We run ultimate no matter what. Even if
            # it sends the bucket negative.
            --$bucket;
            $entry->{code}(@{$entry->{args}});
    }
    $queues->[PRIORITY_ULTIMATE] = [];

这是第一个队列,无论如何都会运行.即使它让我们的连接因洪水而死亡.用于非常重要的事情,比如响应服务器的PING.接下来,其余的队列:

    # Continue to the other queues, in order of priority.
    QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) {
            my $queue = $queues->[$pri];
            while (scalar(@$queue)) {
                    if ($bucket < 1) {
                            # continue later.
                            $need_more_time = 1;
                            last QRUN;
                    } else {
                            --$bucket;
                            my $entry = shift @$queue;
                            $entry->{code}(@{$entry->{args}});
                    }
            }
    }

最后,存储桶状态被保存回$ conn数据结构(实际上稍晚于该方法;它首先计算它将在多长时间内完成更多工作)

    # Save status.
    $conn->{fujiko_limit_bucket} = $bucket;
    $conn->{fujiko_limit_lasttx} = $start_time;

如您所见,实际的铲斗处理代码非常小 - 大约四行.其余代码是优先级队列处理.机器人具有优先级队列,例如,与之聊天的人不能阻止它执行其重要的启动/禁令职责.


如果人们对此表示反对,我将不胜感激,请解释原因...我想解决您遇到的任何问题,但是没有反馈就很难做到!

4> 小智..:

阻止处理直到可以发送消息,从而排队进一步消息,antti的漂亮解决方案也可以像这样修改:

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    time.sleep( (1-allowance) * (per/rate))
    forward_message();
    allowance = 0.0;
  else:
    forward_message();
    allowance -= 1.0;

它只是等到有足够的余量发送消息.为了不以两倍的速率开始,补贴也可以用0初始化.


当您睡眠((1-allowance)*(per / rate)`时,需要将相同的数量添加到`last_check`中。
推荐阅读
雨天是最美
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有