当前位置:  开发笔记 > 编程语言 > 正文

分布式TensorFlow - 没有运行一些工作者

如何解决《分布式TensorFlow-没有运行一些工作者》经验,为你挑选了1个好方法。

我试图得到一个分布式TensorFlow工作的一个非常简单的例子.但是,我有一个在运行之间看起来不确定的错误.在某些运行中,它完美地运行.输出以下内容:

Worker 2 | step 0
Worker 0 | step 0
Worker 1 | step 0
Worker 3 | step 0
Worker 2 | step 1
Worker 0 | step 1
Worker 1 | step 1
Worker 3 | step 1
...

但是,每隔一段时间,一个或多个工作人员就无法运行,导致输出如下:

Worker 0 | step 0
Worker 3 | step 0
Worker 0 | step 1
Worker 3 | step 1
Worker 0 | step 2
Worker 3 | step 2
...

如果我无限期地运行循环,似乎缺少的工作人员总是在某个时刻启动,但仅仅几分钟后,这是不切实际的.

我发现有两件事情会让问题消失(但是让程序变得无用):1.不在with tf.device(tf.train.replica_device_setter())范围内声明任何变量.如果我甚至声明一个变量(例如nasty_var下面),问题就会开始出现.和2设置is_chiefPARAM在tf.train.MonitoredTrainingSession()True所有工人.即使声明了变量,这也会导致bug消失,但是让所有的工作人员成为主管似乎是错误的.我目前在下面设置它的方式is_chief=(task_index == 0)- 直接来自TensorFlow教程.

这是我可以复制问题的最简单的代码.(您可能需要多次运行才能看到错误,但它几乎总是在5次运行中显示出来

from multiprocessing import Process
import tensorflow as tf
from time import sleep
from numpy.random import random_sample

cluster = tf.train.ClusterSpec({'ps': ['localhost:2222'],
                                'worker': ['localhost:2223',
                                           'localhost:2224',
                                           'localhost:2225',
                                           'localhost:2226']})


def create_worker(task_index):
    server = tf.train.Server(cluster, job_name='worker', task_index=task_index)

    with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % task_index, cluster=cluster)):
        nasty_var = tf.Variable(0)  # This line causes the problem. No issue when this is commented out.

    with tf.train.MonitoredTrainingSession(master=server.target, is_chief=(task_index == 0)):
        for step in xrange(10000):
            sleep(random_sample())  # Simulate some work being done.
            print 'Worker %d | step %d' % (task_index, step)


def create_ps(task_index):
    param_server = tf.train.Server(cluster, job_name='ps',
                                   task_index=task_index)
    param_server.join()

# Launch workers and ps in separate processes.
processes = []
for i in xrange(len(cluster.as_dict()['worker'])):
    print 'Forking worker process ', i
    p = Process(target=create_worker, args=[i])
    p.start()
    processes.append(p)

for i in xrange(len(cluster.as_dict()['ps'])):
    print 'Forking ps process ', i
    p = Process(target=create_ps, args=[i])
    p.start()
    processes.append(p)

for p in processes:
    p.join()

mrry.. 5

我猜这里的原因是tf.train.MonitoredTrainingSession启动方式中的隐式协调协议,这是在这里实现的:

如果这次会议是主席:

运行变量初始化程序op.

否则(如果本次会议不是主席):

运行op以检查变量是否已初始化.

虽然任何变量尚未初始化.

等30秒.

尝试创建一个新会话,并检查变量是否已初始化.

(我在关于分布式TensorFlow的视频中讨论了该协议背后的基本原理.)

当每个会话都是主要会话,或者没有要初始化的变量时,tf.train.MonitoredTrainingSession将始终立即开始.但是,一旦有一个变量,而你只有一个负责人,你就会发现非首席工作人员必须等待主管采取行动.

使用该协议的原因在于它对于各种进程失败是健壮的,并且与在典型的分布式训练作业的预期运行时间相比,在单个进程上运行所有内容时非常明显的延迟是很短的.

再看一下这个实现,似乎这个30秒的超时应该是可配置的(作为recovery_wait_secs参数tf.train.SessionManager()),但是当你创建一个时tf.train.MonitoredTrainingSession,目前没有办法设置这个超时,因为它使用一组硬编码的参数来创建会话管理员.这似乎是API的疏忽,所以请随时在GitHub问题页面上打开功能请求!



1> mrry..:

我猜这里的原因是tf.train.MonitoredTrainingSession启动方式中的隐式协调协议,这是在这里实现的:

如果这次会议是主席:

运行变量初始化程序op.

否则(如果本次会议不是主席):

运行op以检查变量是否已初始化.

虽然任何变量尚未初始化.

等30秒.

尝试创建一个新会话,并检查变量是否已初始化.

(我在关于分布式TensorFlow的视频中讨论了该协议背后的基本原理.)

当每个会话都是主要会话,或者没有要初始化的变量时,tf.train.MonitoredTrainingSession将始终立即开始.但是,一旦有一个变量,而你只有一个负责人,你就会发现非首席工作人员必须等待主管采取行动.

使用该协议的原因在于它对于各种进程失败是健壮的,并且与在典型的分布式训练作业的预期运行时间相比,在单个进程上运行所有内容时非常明显的延迟是很短的.

再看一下这个实现,似乎这个30秒的超时应该是可配置的(作为recovery_wait_secs参数tf.train.SessionManager()),但是当你创建一个时tf.train.MonitoredTrainingSession,目前没有办法设置这个超时,因为它使用一组硬编码的参数来创建会话管理员.这似乎是API的疏忽,所以请随时在GitHub问题页面上打开功能请求!

推荐阅读
手机用户2402852387
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有