当前位置:  开发笔记 > 编程语言 > 正文

Python守护程序使用pyxs Xenstore客户端监视GPIO引脚

如何解决《Python守护程序使用pyxsXenstore客户端监视GPIO引脚》经验,为你挑选了1个好方法。

我正在使用pyxsPython Xenstore客户端模块编写Upstart守护程序,该守护程序监视盒子上GPIO控制器上的一堆输出引脚.启动后,守护程序的基本结构是导出相关引脚,为引脚添加相应的Xenstore路径,以及为每个Xenstore路径添加和监视监视.手表部分是线程化的 - 对于每个手表,使用目标工人方法创建一个线程,该方法监视手表的变化.根据PyXS文档,您基本上必须执行以下操作:

# monitor is a pyxs.client.Client.Monitor object, and watch adds a
# watch to the given path
monitor.watch(path, path_token)
# wait for events on the watched path - returns a pair if there is an
# event, the first is the event path and the second is the path token
monitor.wait(sleep=...)

我的问题是,wait如果没有sleep=指定参数,是否调用块- 从PyXS文档中不清楚是否是这种情况.

代码大致是这样的:

from pyxs.client import Client
from pyxs.exceptions import PyXSError
from threading import Thread

...
class gpiod(object):

    def __init__(self,...):
        ...
        # stores pin numbers using descriptive labels as keys
        self._gpio_pins = {}
        # stores Xenstore paths for the pins, using the pin labels as keys
        self._xenstore_paths = {}
        self._xenstore_client = Client()
        self._xenstore_monitor = self._xenstore_client.monitor()
        # stores threads that monitor the watches added for the paths
        self._xenstore_watchers = {}

        self.start()
        self.run()

    def _watch_xenstore_path(self, watch_path):
    """
    A worker method that is a target for a watch thread.
    """
        while True:
            try:
                path_change = self._xenstore_monitor.wait(sleep=1)
                while not path_change:
                    path_change = self._xenstore_monitor.wait(sleep=1)
                if path_change[0] == watch_path:
                # write changed value to the pin
           except PyXSError as e:
                # log the error

    def start(self):
        # load config
        ...
        # export relevant GPIO output pins (using system calls to sysfs)
        ...
        # create Xenstore paths using _xenstore_client
        ...
        # set watches on the paths
        for path in self._xenstore_paths:
            self._xenstore_monitor.watch(wpath=path, token=path)
            self._xenstore_watchers.update(
                {pin_label: Thread(target=self._watch_xenstore_path, args=(path,))}
            )

    def run(self):
        for watcher in self._xenstore_watchers:
            watcher.start()

Iron Fist.. 5

pyxs文档:

wait(sleep=None)

等待任何观察路径生成事件,该事件是(路径,令牌)对,其中第一个元素是事件路径,即被修改的实际路径,第二个元素是一个令牌,传递给watch() .参数:sleep(float) - 事件检查之间休眠的秒数.

这意味着,sleep参数实际上是分开对wait方法的检查,如果你看一下github 的代码:

def wait(self, sleep=None):
    """Waits for any of the watched paths to generate an event,
    which is a ``(path, token)`` pair, where the first element
    is event path, i.e. the actual path that was modified and
    second element is a token, passed to the :meth:`watch`.
    :param float sleep: number of seconds to sleep between event
                        checks.
    """
    while True:
        if self.client.events:
            packet = self.client.events.popleft()
            return Event(*packet.payload.split("\x00")[:-1])

        # Executing a noop, hopefuly we'll get some events queued
        # in the meantime. Note: I know it sucks, but it seems like
        # there's no other way ...
        self.client.execute_command(Op.DEBUG, "")

        if sleep is not None: #sleep here is to provide tap gap between event check
            time.sleep(sleep)

你会看到,sleep参数只是提供事件之间检查水龙头差距在哪里作为while True循环一直走,直到你得到取消event或者如果你想:如何快速检查event,小sleep更快的定期检查.

所以,最后:

wait方法的调用已经阻塞,sleep只是为了在event检查之间分配时间.



1> Iron Fist..:

pyxs文档:

wait(sleep=None)

等待任何观察路径生成事件,该事件是(路径,令牌)对,其中第一个元素是事件路径,即被修改的实际路径,第二个元素是一个令牌,传递给watch() .参数:sleep(float) - 事件检查之间休眠的秒数.

这意味着,sleep参数实际上是分开对wait方法的检查,如果你看一下github 的代码:

def wait(self, sleep=None):
    """Waits for any of the watched paths to generate an event,
    which is a ``(path, token)`` pair, where the first element
    is event path, i.e. the actual path that was modified and
    second element is a token, passed to the :meth:`watch`.
    :param float sleep: number of seconds to sleep between event
                        checks.
    """
    while True:
        if self.client.events:
            packet = self.client.events.popleft()
            return Event(*packet.payload.split("\x00")[:-1])

        # Executing a noop, hopefuly we'll get some events queued
        # in the meantime. Note: I know it sucks, but it seems like
        # there's no other way ...
        self.client.execute_command(Op.DEBUG, "")

        if sleep is not None: #sleep here is to provide tap gap between event check
            time.sleep(sleep)

你会看到,sleep参数只是提供事件之间检查水龙头差距在哪里作为while True循环一直走,直到你得到取消event或者如果你想:如何快速检查event,小sleep更快的定期检查.

所以,最后:

wait方法的调用已经阻塞,sleep只是为了在event检查之间分配时间.

推荐阅读
郑小蒜9299_941611_G
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有