如何在App Engine上运行后台任务?
您可以使用Task Queue Python API.
GAE是构建可伸缩Web应用程序的非常有用的工具.许多人指出的限制不支持后台任务,缺乏周期性任务以及严格限制每个HTTP请求花费的时间,如果请求超过该时间限制操作终止,这使得运行耗时的任务变得不可能.
如何运行后台任务?
在GAE中,仅在存在HTTP请求时才执行代码.关于代码可以花多长时间有一个严格的时间限制(我想10秒).因此,如果没有请求,则不执行代码.建议的工作之一是使用外部框连续发送请求,因此创建后台任务.但为此我们需要一个外部盒子,现在我们依赖于另外一个元素.另一种方法是发送302重定向响应,以便客户端重新发送请求,这也使我们依赖于客户端的外部元素.如果外部盒子是GAE本身怎么办?在语言中使用不支持循环构造的函数式语言的每个人都知道替代方法,即递归是循环的替代.那么如果我们完成部分计算并在同一个网址上进行HTTP GET并且时间非常短,比如1秒呢?这会在运行在apache上的php代码上创建一个循环(递归).
一些如何在GAE上不起作用.那么,如果我们在其他网址上执行HTTP GET,请说url2在第一个网址上执行HTTP GET会怎么样?这似乎适用于GAE.代码就是这样的.
class FirstUrl(webapp.RequestHandler): def get(self): self.response.out.write("ok") time.sleep(2) urlfetch.fetch("http://"+self.request.headers["HOST"]+'/url2') class SecondUrl(webapp.RequestHandler): def get(self): self.response.out.write("ok") time.sleep(2) urlfetch.fetch("http://"+self.request.headers["HOST"]+'/url1') application = webapp.WSGIApplication([('/url1', FirstUrl), ('/url2', SecondUrl)]) def main(): run_wsgi_app(application) if __name__ == "__main__": main()
由于我们找到了运行后台任务的方法,因此我们可以为周期性任务(计时器)和跨越许多HTTP请求(foreach)的循环结构构建抽象.
Timer
Now建立计时器是直截了当的.基本思想是列出计时器列表和每个计时器的间隔.一旦达到该间隔,就调用回调函数.我们将使用memcache来维护计时器列表.为了找出何时调用回调,我们将把一个密钥存储在memcache中,间隔为到期时间.我们定期(比如5secs)检查该密钥是否存在,如果不存在则调用回调并再次设置该密钥.
def timer(func, interval): timerlist = memcache.get('timer') if(None == timerlist): timerlist = [] timerlist.append({'func':func, 'interval':interval}) memcache.set('timer-'+func, '1', interval) memcache.set('timer', timerlist) def checktimers(): timerlist = memcache.get('timer') if(None == timerlist): return False for current in timerlist: if(None == memcache.get('timer-'+current['func'])): #reset interval memcache.set('timer-'+current['func'], '1', current['interval']) #invoke callback function try: eval(current['func']+'()') except: pass return True return False
Foreach
当需要长时间计算时,需要对1000个数据库行进行一些操作或获取1000个URL等,这是必需的.基本思路是维护memcache中的回调和参数列表,并且每次使用参数调用回调.
def foreach(func, args): looplist = memcache.get('foreach') if(None == looplist): looplist = [] looplist.append({'func':func, 'args':args}) memcache.set('foreach', looplist) def checkloops(): looplist = memcache.get('foreach') if(None == looplist): return False if((len(looplist) > 0) and (len(looplist[0]['args']) > 0)): arg = looplist[0]['args'].pop(0) func = looplist[0]['func'] if(len(looplist[0]['args']) == 0): looplist.pop(0) if((len(looplist) > 0) and (len(looplist[0]['args']) > 0)): memcache.set('foreach', looplist) else: memcache.delete('foreach') try: eval(func+'('+repr(arg)+')') except: pass return True else: return False # instead of # foreach index in range(0, 1000): # someoperaton(index) # we will say # foreach('someoperaton', range(0, 1000))
现在建立一个每小时获取网址列表的程序是直截了当的.这是代码.
def getone(url): try: result = urlfetch.fetch(url) if(result.status_code == 200): memcache.set(url, '1', 60*60) #process result.content except : pass def getallurl(): #list of urls to be fetched urllist = ['http://www.google.com/', 'http://www.cnn.com/', 'http://www.yahoo.com', 'http://news.google.com'] fetchlist = [] for url in urllist: if (memcache.get(url) is None): fetchlist.append(url) #this is equivalent to #for url in fetchlist: getone(url) if(len(fetchlist) > 0): foreach('getone', fetchlist) #register the timer callback timer('getallurl', 3*60)
完整的代码在这里http://groups.google.com/group/httpmr-discuss/t/1648611a54c01aa 我已经在appengine上运行这段代码几天没有太大问题.
警告:我们大量使用urlfetch.每天urlfetch的限制是160000.所以要小心不要达到这个限制.