当前位置:  开发笔记 > 编程语言 > 正文

正确关闭asyncio任务的方法

如何解决《正确关闭asyncio任务的方法》经验,为你挑选了1个好方法。

我正在编写一个连接到X个UNIX套接字的工具,发送命令并将输出保存在本地文件系统中.它每X秒运行一次.为了在工具接收终止信号时执行一些清理,我将一个函数(关闭)注册到signal.SIGHUP和signal.SIGTERM信号.此函数取消所有任务,然后关闭事件循环.

我的问题是我得到了

RuntimeError:事件循环在Future完成之前停止

当我发送signal.SIGTERM(杀'pid').我已经阅读了两次取消任务的文档,但我没有发现我在这里做错了什么.

我也注意到一些奇怪的事情,当我发送终止信号时,程序处于睡眠模式,我在日志中看到它唤醒了pull_stats()协程,你可以在日志的前两行看到这一点.

日志:

21:53:44,194 [23031] [MainThread:supervisor  ] DEBUG    **sleeping for 9.805s secs**
21:53:45,857 [23031] [MainThread:pull_stats  ] INFO     pull statistics
21:53:45,858 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin1.sock
21:53:45,858 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin4.sock
21:53:45,858 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin3.sock
21:53:45,858 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin3.sock
21:53:45,858 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin2.sock
21:53:45,858 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin2.sock
21:53:45,858 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin4.sock
21:53:45,859 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin1.sock
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     received stop signal, cancelling tasks...
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,860 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,860 [23031] [MainThread:shutdown    ] INFO     stopping event loop
21:53:45,860 [23031] [MainThread:shutdown    ] INFO     bye, exiting...
Traceback (most recent call last):
  File "./pull.py", line 249, in 
    main()
  File "./pull.py", line 245, in main
    supervisor(loop, config)
  File "./pull.py", line 161, in supervisor
    config['pull']['socket-dir'], storage_dir, loop))
  File "/usr/lib/python3.4/asyncio/base_events.py", line 274, in run_until_complete
    raise RuntimeError('Event loop stopped before Future completed.')
RuntimeError: Event loop stopped before Future completed.

这是代码:

def shutdown(loop):
    LOGGER.info('received stop signal, cancelling tasks...')
    for task in asyncio.Task.all_tasks():
        LOGGER.info(task.cancel())
    LOGGER.info('stopping event loop')
    loop.stop()
    LOGGER.info('bye, exiting...')


def write_file(filename, data):
    try:
        with open(filename, 'w') as file_handle:
            file_handle.write(data.decode())
    except OSError as exc:
        return False
    else:
        return True


@asyncio.coroutine
def get(socket_file, cmd, storage_dir, loop):
    connect = asyncio.open_unix_connection(socket_file)
    reader, writer = yield from asyncio.wait_for(connect, 1)

    writer.write('{c}\n'.format(c=cmd).encode())
    data = yield from reader.read()
    writer.close()

    filename = os.path.basename(socket_file) + '_' + cmd.split()[1]
    filename = os.path.join(storage_dir, filename)
    result = yield from loop.run_in_executor(None, write_file, filename, data)

    return result


@asyncio.coroutine
def pull_stats(socket_dir, storage_dir, loop):
    socket_files = glob.glob(socket_dir + '/*sock*')
    coroutines = [get(socket_file, cmd, storage_dir, loop)
                  for socket_file in socket_files
                  for cmd in CMDS]
    status = yield from asyncio.gather(*coroutines)

    if len(set(status)) == 1 and True in set(status):
        return True
    else:
        return False


def supervisor(loop, config):
    dst_dir = config.get('pull', 'dst-dir')
    tmp_dst_dir = config.get('pull', 'tmp-dst-dir')

    while True:
        start_time = int(time.time())
        storage_dir = os.path.join(tmp_dst_dir, str(start_time))

        try:
            os.makedirs(storage_dir)
        except OSError as exc:
            msg = "failed to create directory {d}:{e}".format(d=storage_dir,
                                                              e=exc)
            LOGGER.critical(msg)

        # Launch all connections.
        result = loop.run_until_complete(pull_stats(
            config['pull']['socket-dir'], storage_dir, loop))

        if result:
            try:
                shutil.move(storage_dir, dst_dir)
            except OSError as exc:
                LOGGER.critical("failed to move %s to %s: %s", storage_dir,
                                dst_dir, exc)
                break
            else:
                LOGGER.info('statistics are saved in %s', os.path.join(
                    dst_dir, os.path.basename(storage_dir)))
        else:
            LOGGER.critical('failed to pull stats')
            shutil.rmtree(storage_dir)

        sleep = config.getint('pull', 'pull-interval') - (time.time() -
                                                          start_time)
        if 0 < sleep < config.getint('pull', 'pull-interval'):
            time.sleep(sleep)
    loop.close()
    sys.exit(1)


def main():
    args = docopt(__doc__, version=VERSION)
    config = ConfigParser(interpolation=ExtendedInterpolation())
    config.read_dict(copy.copy(DEFAULT_OPTIONS))
    config.read(args['--file'])

    loop = asyncio.get_event_loop()

    loop.add_signal_handler(signal.SIGHUP, functools.partial(shutdown, loop))
    loop.add_signal_handler(signal.SIGTERM, functools.partial(shutdown, loop))

    num_level = getattr(logging, config.get('pull', 'loglevel').upper(), None)
    LOGGER.setLevel(num_level)

    supervisor(loop, config)

# This is the standard boilerplate that calls the main() function.
if __name__ == '__main__':
    main()

kwarunek.. 12

取消不是立竿见影的,需要运行ioloop才能解决异常CancelledError.ioloop.stop从关机中删除并在主管中处理异常,使事情有效.下面简化的例子.

重要的是,你可以取消Task,它只停止观看/等待结束/结果,循环不会处理它的进一步事件.但是下面的请求/管道不会被停止.

简化示例:

import asyncio
import functools
import logging
import signal
import sys
from concurrent.futures import CancelledError


def shutdown(loop):
    logging.info('received stop signal, cancelling tasks...')
    for task in asyncio.Task.all_tasks():
        task.cancel()
    logging.info('bye, exiting in a minute...')    


@asyncio.coroutine
def get(i):
    logging.info('sleep for %d', i)
    yield from asyncio.sleep(i)    


@asyncio.coroutine
def pull_stats():
    coroutines = [get(i) for i in range(10,20)]
    status = yield from asyncio.gather(*coroutines)


def supervisor(loop):
    try:
        while True:
            result = loop.run_until_complete(pull_stats())
    except CancelledError:
        logging.info('CancelledError')
    loop.close()
    sys.exit(1)


def main():
    logging.getLogger().setLevel(logging.INFO)
    loop = asyncio.get_event_loop()
    loop.add_signal_handler(signal.SIGHUP, functools.partial(shutdown, loop))
    loop.add_signal_handler(signal.SIGTERM, functools.partial(shutdown, loop))
    supervisor(loop)


if __name__ == '__main__':
    main()

请注意,如果您仅取消gather'sFuture,则所有孩子也将被设置为已取消.

和睡眠的事情

任何信号或中断的接收都会导致程序恢复执行.因此,当进程接收SIGTERM并设置处理程序时,python允许您处理它,执行此线程恢复并调用sighandler.由于ioloop的实现及其信号处理,它在唤醒后继续运行.



1> kwarunek..:

取消不是立竿见影的,需要运行ioloop才能解决异常CancelledError.ioloop.stop从关机中删除并在主管中处理异常,使事情有效.下面简化的例子.

重要的是,你可以取消Task,它只停止观看/等待结束/结果,循环不会处理它的进一步事件.但是下面的请求/管道不会被停止.

简化示例:

import asyncio
import functools
import logging
import signal
import sys
from concurrent.futures import CancelledError


def shutdown(loop):
    logging.info('received stop signal, cancelling tasks...')
    for task in asyncio.Task.all_tasks():
        task.cancel()
    logging.info('bye, exiting in a minute...')    


@asyncio.coroutine
def get(i):
    logging.info('sleep for %d', i)
    yield from asyncio.sleep(i)    


@asyncio.coroutine
def pull_stats():
    coroutines = [get(i) for i in range(10,20)]
    status = yield from asyncio.gather(*coroutines)


def supervisor(loop):
    try:
        while True:
            result = loop.run_until_complete(pull_stats())
    except CancelledError:
        logging.info('CancelledError')
    loop.close()
    sys.exit(1)


def main():
    logging.getLogger().setLevel(logging.INFO)
    loop = asyncio.get_event_loop()
    loop.add_signal_handler(signal.SIGHUP, functools.partial(shutdown, loop))
    loop.add_signal_handler(signal.SIGTERM, functools.partial(shutdown, loop))
    supervisor(loop)


if __name__ == '__main__':
    main()

请注意,如果您仅取消gather'sFuture,则所有孩子也将被设置为已取消.

和睡眠的事情

任何信号或中断的接收都会导致程序恢复执行.因此,当进程接收SIGTERM并设置处理程序时,python允许您处理它,执行此线程恢复并调用sighandler.由于ioloop的实现及其信号处理,它在唤醒后继续运行.

推荐阅读
喜生-Da
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有