当前位置:  开发笔记 > 编程语言 > 正文

在Python中每x秒重复执行一次函数的最佳方法是什么?

如何解决《在Python中每x秒重复执行一次函数的最佳方法是什么?》经验,为你挑选了8个好方法。

我想永远每60秒在Python中重复执行一个函数(就像目标C中的NSTimer一样).这段代码将作为守护进程运行,实际上就像使用cron每分钟调用python脚本一样,但不需要用户设置.

在这个关于用Python实现的cron的问题中,解决方案似乎实际上只是sleep() x秒.我不需要这样的高级功能,所以也许这样的东西可行

while True:
    # Code executed here
    time.sleep(60)

这段代码有可预见的问题吗?



1> nosklo..:

使用sched模块,它实现了一个通用的事件调度程序.

import sched, time
s = sched.scheduler(time.time, time.sleep)
def do_something(sc): 
    print "Doing stuff..."
    # do your stuff
    s.enter(60, 1, do_something, (sc,))

s.enter(60, 1, do_something, (s,))
s.run()


sched模块用于调度函数在一段时间后运行,如何使用它来每隔x秒重复一次函数调用而不使用time.sleep()?
@JavaSa:因为*"做你的东西"*不是瞬间的,来自`time.sleep`的错误可能在这里积累."每隔X秒执行一次"和"反复执行延迟~X秒"是不一样的.另请参阅[此评论](http://stackoverflow.com/questions/474528/what-is-the-best-way-to-repeatedly-execute-a-function-every-x-seconds-in-python/474543 ?noredirect = 1#comment64793337_25251804)
注意:此版本可能会漂移.您可以使用`enterabs()`来避免它.这是[用于比较的非漂移版本](http://stackoverflow.com/a/25251804/4279).
@Baishampayan:安排新的跑步.
然后在http://packages.python.org/APScheduler/上的apscheduler也应该在这一点上提到.
谢谢,接受的答案应该改变,并非所有人都阅读评论......

2> 小智..:

只需将时间循环锁定到系统时钟即可.简单.

import time
starttime=time.time()
while True:
  print "tick"
  time.sleep(60.0 - ((time.time() - starttime) % 60.0))


+1.你的和`twisted`答案是每隔一个'x`秒运行一个函数的唯一答案.其余的执行函数,每次调用后延迟"x"秒.
如果你在哪里添加一些代码,这需要超过一秒......它会抛出时间并开始落后......在这种情况下接受的答案是正确的...任何人都可以循环一个简单的打印命令让它每秒都运行一次......
工作非常好.如果你开始将它同步到某个时间,就没有必要减去你的`starttime`:`time.sleep(60 - time.time()%60)`对我来说一直很好.我用它作为`time.sleep(1200 - time.time()%1200)`它给我记录了`:00:20:40`,正如我想要的那样.
我更喜欢"从时间导入时间,睡眠",因为存在的影响;)
@AntonSchigur避免多次迭代后的漂移.单个迭代可能迟早或稍后开始,具体取决于`sleep()`,`timer()`精度以及执行循环体所需的时间,但平均迭代总是发生在区间边界上(即使有些被跳过) :[`while keep_doing_it():sleep(interval - timer()%interval)`](http://stackoverflow.com/a/26609843).与比较,它只是'而keep_doing_it():睡眠(间隔)`这里经过几次反复的错误可能会积累.

3> Aaron Maenpa..:

您可能想要考虑Twisted,它是一个实现Reactor Pattern的Python网络库.

from twisted.internet import task, reactor

timeout = 60.0 # Sixty seconds

def doWork():
    #do work here
    pass

l = task.LoopingCall(doWork)
l.start(timeout) # call every sixty seconds

reactor.run()

虽然"while True:sleep(60)"可能会工作Twisted可能已经实现了你最终需要的许多功能(如bobince所指出的守护进程,日志记录或异常处理),并且可能是一个更强大的解决方案


我知道Twisted可以做到这一点.感谢分享示例代码!

4> MestreLion..:

如果你想要一种非阻塞方式来定期执行你的函数,而不是阻塞无限循环,我会使用一个线程计时器.这样,您的代码可以继续运行并执行其他任务,并且每隔n秒仍然会调用您的函数.我在很长的CPU /磁盘/网络密集型任务中使用这种技术来打印进度信息.

这是我在类似问题中发布的代码,包含start()和stop()控件:

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.interval   = interval
        self.function   = function
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

用法:

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

特征:

仅限标准库,无外部依赖项

start()stop()即使定时器已经启动/停止,也可以安全地多次呼叫

要调用的函数可以有位置和命名参数

您可以interval随时更改,它将在下次运行后生效.同样的args,kwargs甚至是function!



5> Itxaka..:

我认为更简单的方法是:

import time

def executeSomething():
    #code here
    time.sleep(60)

while True:
    executeSomething()

这样你的代码就会被执行,然后等待60秒然后它再次执行,等待,执行等......不需要复杂化的东西:D


实际上这不是答案:time sleep()只能在每次执行后等待X秒.例如,如果您的函数需要0.5秒执行并且您使用time.sleep(1),则表示您的函数每1.5秒执行一次,而不是1.您应该使用其他模块和/或线程来确保某些内容适用于Y次在每X秒.

6> eraoul..:

以下是MestreLion代码的更新,可以避免随着时间的推移而进行漫游:

import threading 
import time

class RepeatedTimer(object):
  def __init__(self, interval, function, *args, **kwargs):
    self._timer = None
    self.interval = interval
    self.function = function
    self.args = args
    self.kwargs = kwargs
    self.is_running = False
    self.next_call = time.time()
    self.start()

  def _run(self):
    self.is_running = False
    self.start()
    self.function(*self.args, **self.kwargs)

  def start(self):
    if not self.is_running:
      self.next_call += self.interval
      self._timer = threading.Timer(self.next_call - time.time(), self._run)
      self._timer.start()
      self.is_running = True

  def stop(self):
    self._timer.cancel()
    self.is_running = False



7> Alfe..:
import time, traceback

def every(delay, task):
  next_time = time.time() + delay
  while True:
    time.sleep(max(0, next_time - time.time()))
    try:
      task()
    except Exception:
      traceback.print_exc()
      # in production code you might want to have this instead of course:
      # logger.exception("Problem while executing repetitive task.")
    # skip tasks if we are behind schedule:
    next_time += (time.time() - next_time) // delay * delay + delay

def foo():
  print("foo", time.time())

every(5, foo)

如果你想在不阻塞剩余代码的情况下执行此操作,可以使用它来让它在自己的线程中运行:

import threading
threading.Thread(target=lambda: every(5, foo)).start()

该解决方案结合了其他解决方案中很少结合的几种功能:

异常处理:在此级别上尽可能正确处理异常,即在不中止程序的情况下记录以进行调试.

没有链接:你在许多答案中找到的常见的链式实现(用于调度下一个事件)在调度机制(threading.Timer或其他)中出现任何问题时会很脆弱,这将终止链.即使问题的原因已经解决,也不会再发生进一步的执行.一个简单的循环和等待简单sleep()相比,更加强大.

没有漂移:我的解决方案可以准确跟踪它应该运行的时间.根据执行时间没有漂移(如许多其他解决方案中那样).

跳过:如果一次执行占用太多时间,我的解决方案将跳过任务(例如,每五秒执行一次X,但X需要6秒).这是标准的cron行为(并且有充分的理由).然后,许多其他解决方案只是连续几次执行任务而没有任何延迟.对于大多数情况(例如清理任务),这是不希望的.如果希望,简单地使用next_time += delay来代替.


不漂流的最佳答案。

8> Anay..:

一段时间后我遇到了类似的问题.可能是http://cronus.readthedocs.org可能有帮助吗?

对于v0.2,以下代码段有效

import cronus.beat as beat

beat.set_rate(2) # 2 Hz
while beat.true():
    # do some time consuming work here
    beat.sleep() # total loop duration would be 0.5 sec

推荐阅读
放ch养奶牛
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有