我正在编写一个连接到X个UNIX套接字的工具,发送命令并将输出保存在本地文件系统中.它每X秒运行一次.为了在工具接收终止信号时执行一些清理,我将一个函数(关闭)注册到signal.SIGHUP和signal.SIGTERM信号.此函数取消所有任务,然后关闭事件循环.
我的问题是我得到了
RuntimeError:事件循环在Future完成之前停止
当我发送signal.SIGTERM(杀'pid').我已经阅读了两次取消任务的文档,但我没有发现我在这里做错了什么.
我也注意到一些奇怪的事情,当我发送终止信号时,程序处于睡眠模式,我在日志中看到它唤醒了pull_stats()协程,你可以在日志的前两行看到这一点.
日志:
21:53:44,194 [23031] [MainThread:supervisor ] DEBUG **sleeping for 9.805s secs** 21:53:45,857 [23031] [MainThread:pull_stats ] INFO pull statistics 21:53:45,858 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin1.sock 21:53:45,858 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin4.sock 21:53:45,858 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin3.sock 21:53:45,858 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin3.sock 21:53:45,858 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin2.sock 21:53:45,858 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin2.sock 21:53:45,858 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin4.sock 21:53:45,859 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin1.sock 21:53:45,859 [23031] [MainThread:shutdown ] INFO received stop signal, cancelling tasks... 21:53:45,859 [23031] [MainThread:shutdown ] INFO True 21:53:45,859 [23031] [MainThread:shutdown ] INFO True 21:53:45,859 [23031] [MainThread:shutdown ] INFO True 21:53:45,859 [23031] [MainThread:shutdown ] INFO True 21:53:45,859 [23031] [MainThread:shutdown ] INFO True 21:53:45,859 [23031] [MainThread:shutdown ] INFO True 21:53:45,859 [23031] [MainThread:shutdown ] INFO True 21:53:45,859 [23031] [MainThread:shutdown ] INFO True 21:53:45,859 [23031] [MainThread:shutdown ] INFO True 21:53:45,859 [23031] [MainThread:shutdown ] INFO True 21:53:45,859 [23031] [MainThread:shutdown ] INFO True 21:53:45,859 [23031] [MainThread:shutdown ] INFO True 21:53:45,859 [23031] [MainThread:shutdown ] INFO True 21:53:45,859 [23031] [MainThread:shutdown ] INFO True 21:53:45,859 [23031] [MainThread:shutdown ] INFO True 21:53:45,859 [23031] [MainThread:shutdown ] INFO True 21:53:45,860 [23031] [MainThread:shutdown ] INFO True 21:53:45,860 [23031] [MainThread:shutdown ] INFO stopping event loop 21:53:45,860 [23031] [MainThread:shutdown ] INFO bye, exiting... Traceback (most recent call last): File "./pull.py", line 249, inmain() File "./pull.py", line 245, in main supervisor(loop, config) File "./pull.py", line 161, in supervisor config['pull']['socket-dir'], storage_dir, loop)) File "/usr/lib/python3.4/asyncio/base_events.py", line 274, in run_until_complete raise RuntimeError('Event loop stopped before Future completed.') RuntimeError: Event loop stopped before Future completed.
这是代码:
def shutdown(loop): LOGGER.info('received stop signal, cancelling tasks...') for task in asyncio.Task.all_tasks(): LOGGER.info(task.cancel()) LOGGER.info('stopping event loop') loop.stop() LOGGER.info('bye, exiting...') def write_file(filename, data): try: with open(filename, 'w') as file_handle: file_handle.write(data.decode()) except OSError as exc: return False else: return True @asyncio.coroutine def get(socket_file, cmd, storage_dir, loop): connect = asyncio.open_unix_connection(socket_file) reader, writer = yield from asyncio.wait_for(connect, 1) writer.write('{c}\n'.format(c=cmd).encode()) data = yield from reader.read() writer.close() filename = os.path.basename(socket_file) + '_' + cmd.split()[1] filename = os.path.join(storage_dir, filename) result = yield from loop.run_in_executor(None, write_file, filename, data) return result @asyncio.coroutine def pull_stats(socket_dir, storage_dir, loop): socket_files = glob.glob(socket_dir + '/*sock*') coroutines = [get(socket_file, cmd, storage_dir, loop) for socket_file in socket_files for cmd in CMDS] status = yield from asyncio.gather(*coroutines) if len(set(status)) == 1 and True in set(status): return True else: return False def supervisor(loop, config): dst_dir = config.get('pull', 'dst-dir') tmp_dst_dir = config.get('pull', 'tmp-dst-dir') while True: start_time = int(time.time()) storage_dir = os.path.join(tmp_dst_dir, str(start_time)) try: os.makedirs(storage_dir) except OSError as exc: msg = "failed to create directory {d}:{e}".format(d=storage_dir, e=exc) LOGGER.critical(msg) # Launch all connections. result = loop.run_until_complete(pull_stats( config['pull']['socket-dir'], storage_dir, loop)) if result: try: shutil.move(storage_dir, dst_dir) except OSError as exc: LOGGER.critical("failed to move %s to %s: %s", storage_dir, dst_dir, exc) break else: LOGGER.info('statistics are saved in %s', os.path.join( dst_dir, os.path.basename(storage_dir))) else: LOGGER.critical('failed to pull stats') shutil.rmtree(storage_dir) sleep = config.getint('pull', 'pull-interval') - (time.time() - start_time) if 0 < sleep < config.getint('pull', 'pull-interval'): time.sleep(sleep) loop.close() sys.exit(1) def main(): args = docopt(__doc__, version=VERSION) config = ConfigParser(interpolation=ExtendedInterpolation()) config.read_dict(copy.copy(DEFAULT_OPTIONS)) config.read(args['--file']) loop = asyncio.get_event_loop() loop.add_signal_handler(signal.SIGHUP, functools.partial(shutdown, loop)) loop.add_signal_handler(signal.SIGTERM, functools.partial(shutdown, loop)) num_level = getattr(logging, config.get('pull', 'loglevel').upper(), None) LOGGER.setLevel(num_level) supervisor(loop, config) # This is the standard boilerplate that calls the main() function. if __name__ == '__main__': main()
kwarunek.. 12
取消不是立竿见影的,需要运行ioloop才能解决异常CancelledError
.ioloop.stop
从关机中删除并在主管中处理异常,使事情有效.下面简化的例子.
重要的是,你可以取消Task
,它只停止观看/等待结束/结果,循环不会处理它的进一步事件.但是下面的请求/管道不会被停止.
简化示例:
import asyncio import functools import logging import signal import sys from concurrent.futures import CancelledError def shutdown(loop): logging.info('received stop signal, cancelling tasks...') for task in asyncio.Task.all_tasks(): task.cancel() logging.info('bye, exiting in a minute...') @asyncio.coroutine def get(i): logging.info('sleep for %d', i) yield from asyncio.sleep(i) @asyncio.coroutine def pull_stats(): coroutines = [get(i) for i in range(10,20)] status = yield from asyncio.gather(*coroutines) def supervisor(loop): try: while True: result = loop.run_until_complete(pull_stats()) except CancelledError: logging.info('CancelledError') loop.close() sys.exit(1) def main(): logging.getLogger().setLevel(logging.INFO) loop = asyncio.get_event_loop() loop.add_signal_handler(signal.SIGHUP, functools.partial(shutdown, loop)) loop.add_signal_handler(signal.SIGTERM, functools.partial(shutdown, loop)) supervisor(loop) if __name__ == '__main__': main()
请注意,如果您仅取消gather's
Future,则所有孩子也将被设置为已取消.
和睡眠的事情
任何信号或中断的接收都会导致程序恢复执行.因此,当进程接收SIGTERM并设置处理程序时,python允许您处理它,执行此线程恢复并调用sighandler.由于ioloop的实现及其信号处理,它在唤醒后继续运行.
取消不是立竿见影的,需要运行ioloop才能解决异常CancelledError
.ioloop.stop
从关机中删除并在主管中处理异常,使事情有效.下面简化的例子.
重要的是,你可以取消Task
,它只停止观看/等待结束/结果,循环不会处理它的进一步事件.但是下面的请求/管道不会被停止.
简化示例:
import asyncio import functools import logging import signal import sys from concurrent.futures import CancelledError def shutdown(loop): logging.info('received stop signal, cancelling tasks...') for task in asyncio.Task.all_tasks(): task.cancel() logging.info('bye, exiting in a minute...') @asyncio.coroutine def get(i): logging.info('sleep for %d', i) yield from asyncio.sleep(i) @asyncio.coroutine def pull_stats(): coroutines = [get(i) for i in range(10,20)] status = yield from asyncio.gather(*coroutines) def supervisor(loop): try: while True: result = loop.run_until_complete(pull_stats()) except CancelledError: logging.info('CancelledError') loop.close() sys.exit(1) def main(): logging.getLogger().setLevel(logging.INFO) loop = asyncio.get_event_loop() loop.add_signal_handler(signal.SIGHUP, functools.partial(shutdown, loop)) loop.add_signal_handler(signal.SIGTERM, functools.partial(shutdown, loop)) supervisor(loop) if __name__ == '__main__': main()
请注意,如果您仅取消gather's
Future,则所有孩子也将被设置为已取消.
和睡眠的事情
任何信号或中断的接收都会导致程序恢复执行.因此,当进程接收SIGTERM并设置处理程序时,python允许您处理它,执行此线程恢复并调用sighandler.由于ioloop的实现及其信号处理,它在唤醒后继续运行.