我想知道在Python中是否有任何用于异步方法调用的库.如果你可以做类似的事情会很棒
@async
def longComputation():
token = longComputation()
token.registerCallback(callback_function)
# alternative, polling
while not token.finished():
doSomethingElse()
if token.finished():
result = token.result()
或者异步调用非异步例程
def longComputation()
token = asynccall(longComputation())
如果在语言核心中使用更加精细的策略,那就太棒了.这是考虑过吗?
就像是:
import threading thr = threading.Thread(target=foo, args=(), kwargs={}) thr.start() # Will run "foo" .... thr.is_alive() # Will return whether foo is running currently .... thr.join() # Will wait till "foo" is done
有关详细信息,请参阅https://docs.python.org/2/library/threading.html#module-threading中的文档; 此代码也适用于Python 3.
您可以使用Python 2.6中添加的多处理模块.您可以使用进程池,然后异步获取结果:
apply_async(func[, args[, kwds[, callback]]])
例如:
from multiprocessing import Pool def f(x): return x*x if __name__ == '__main__': pool = Pool(processes=1) # Start a worker processes. result = pool.apply_async(f, [10], callback) # Evaluate "f(10)" asynchronously calling callback when finished.
这只是一种选择.该模块提供了许多设施来实现您的需求.从这里制作装饰器也很容易.
从Python 3.5开始,您可以使用增强型生成器来实现异步功能.
import asyncio import datetime
增强的生成器语法
@asyncio.coroutine def display_date(loop): end_time = loop.time() + 5.0 while True: print(datetime.datetime.now()) if (loop.time() + 1.0) >= end_time: break yield from asyncio.sleep(1) loop = asyncio.get_event_loop() # Blocking call which returns when the display_date() coroutine is done loop.run_until_complete(display_date(loop)) loop.close()
新async/await
语法:
async def display_date(loop): end_time = loop.time() + 5.0 while True: print(datetime.datetime.now()) if (loop.time() + 1.0) >= end_time: break await asyncio.sleep(1) loop = asyncio.get_event_loop() # Blocking call which returns when the display_date() coroutine is done loop.run_until_complete(display_date(loop)) loop.close()
它不是语言核心,而是一个非常成熟的图书馆,可以做你想要的就是Twisted.它引入了Deferred对象,您可以将回调或错误处理程序("errbacks")附加到.延迟基本上是一个"承诺",函数最终会产生结果.
你可以实现一个装饰器来使你的函数异步,虽然这有点棘手.该multiprocessing
模块充满了小怪癖和看似随意的限制 - 更有理由将它封装在一个友好的界面后面.
from inspect import getmodule from multiprocessing import Pool def async(decorated): r'''Wraps a top-level function around an asynchronous dispatcher. when the decorated function is called, a task is submitted to a process pool, and a future object is returned, providing access to an eventual return value. The future object has a blocking get() method to access the task result: it will return immediately if the job is already done, or block until it completes. This decorator won't work on methods, due to limitations in Python's pickling machinery (in principle methods could be made pickleable, but good luck on that). ''' # Keeps the original function visible from the module global namespace, # under a name consistent to its __name__ attribute. This is necessary for # the multiprocessing pickling machinery to work properly. module = getmodule(decorated) decorated.__name__ += '_original' setattr(module, decorated.__name__, decorated) def send(*args, **opts): return async.pool.apply_async(decorated, args, opts) return send
下面的代码说明了装饰器的用法:
@async def printsum(uid, values): summed = 0 for value in values: summed += value print("Worker %i: sum value is %i" % (uid, summed)) return (uid, summed) if __name__ == '__main__': from random import sample # The process pool must be created inside __main__. async.pool = Pool(4) p = range(0, 1000) results = [] for i in range(4): result = printsum(i, sample(p, 100)) results.append(result) for result in results: print("Worker %i: sum value is %i" % result.get())
在一个真实的案例中,我会在装饰器上详细说明一下,提供一些方法来关闭它以进行调试(同时保持未来的接口),或者可能是处理异常的工具; 但我认为这足以证明这一原则.
只是
import threading, time def f(): print "f started" time.sleep(3) print "f finished" threading.Thread(target=f).start()
我的解决方案是:
import threading class TimeoutError(RuntimeError): pass class AsyncCall(object): def __init__(self, fnc, callback = None): self.Callable = fnc self.Callback = callback def __call__(self, *args, **kwargs): self.Thread = threading.Thread(target = self.run, name = self.Callable.__name__, args = args, kwargs = kwargs) self.Thread.start() return self def wait(self, timeout = None): self.Thread.join(timeout) if self.Thread.isAlive(): raise TimeoutError() else: return self.Result def run(self, *args, **kwargs): self.Result = self.Callable(*args, **kwargs) if self.Callback: self.Callback(self.Result) class AsyncMethod(object): def __init__(self, fnc, callback=None): self.Callable = fnc self.Callback = callback def __call__(self, *args, **kwargs): return AsyncCall(self.Callable, self.Callback)(*args, **kwargs) def Async(fnc = None, callback = None): if fnc == None: def AddAsyncCallback(fnc): return AsyncMethod(fnc, callback) return AddAsyncCallback else: return AsyncMethod(fnc, callback)
并按照要求完成工作:
@Async def fnc(): pass
你可以使用eventlet.它允许您编写看似同步代码的内容,但让它在网络上异步操作.
以下是超级最小爬虫的示例:
urls = ["http://www.google.com/intl/en_ALL/images/logo.gif", "https://wiki.secondlife.com/w/images/secondlife.jpg", "http://us.i1.yimg.com/us.yimg.com/i/ww/beta/y3.gif"] import eventlet from eventlet.green import urllib2 def fetch(url): return urllib2.urlopen(url).read() pool = eventlet.GreenPool() for body in pool.imap(fetch, urls): print "got body", len(body)
这样的东西对我有用,然后你可以调用该函数,它会将自己调度到一个新的线程上.
from thread import start_new_thread def dowork(asynchronous=True): if asynchronous: args = (False) start_new_thread(dowork,args) #Call itself on a new thread. else: while True: #do something... time.sleep(60) #sleep for a minute return