我试图得到一个分布式TensorFlow工作的一个非常简单的例子.但是,我有一个在运行之间看起来不确定的错误.在某些运行中,它完美地运行.输出以下内容:
Worker 2 | step 0 Worker 0 | step 0 Worker 1 | step 0 Worker 3 | step 0 Worker 2 | step 1 Worker 0 | step 1 Worker 1 | step 1 Worker 3 | step 1 ...
但是,每隔一段时间,一个或多个工作人员就无法运行,导致输出如下:
Worker 0 | step 0 Worker 3 | step 0 Worker 0 | step 1 Worker 3 | step 1 Worker 0 | step 2 Worker 3 | step 2 ...
如果我无限期地运行循环,似乎缺少的工作人员总是在某个时刻启动,但仅仅几分钟后,这是不切实际的.
我发现有两件事情会让问题消失(但是让程序变得无用):1.不在with tf.device(tf.train.replica_device_setter())
范围内声明任何变量.如果我甚至声明一个变量(例如nasty_var
下面),问题就会开始出现.和2设置is_chief
PARAM在tf.train.MonitoredTrainingSession()
给True
所有工人.即使声明了变量,这也会导致bug消失,但是让所有的工作人员成为主管似乎是错误的.我目前在下面设置它的方式is_chief=(task_index == 0)
- 直接来自TensorFlow教程.
这是我可以复制问题的最简单的代码.(您可能需要多次运行才能看到错误,但它几乎总是在5次运行中显示出来
from multiprocessing import Process import tensorflow as tf from time import sleep from numpy.random import random_sample cluster = tf.train.ClusterSpec({'ps': ['localhost:2222'], 'worker': ['localhost:2223', 'localhost:2224', 'localhost:2225', 'localhost:2226']}) def create_worker(task_index): server = tf.train.Server(cluster, job_name='worker', task_index=task_index) with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % task_index, cluster=cluster)): nasty_var = tf.Variable(0) # This line causes the problem. No issue when this is commented out. with tf.train.MonitoredTrainingSession(master=server.target, is_chief=(task_index == 0)): for step in xrange(10000): sleep(random_sample()) # Simulate some work being done. print 'Worker %d | step %d' % (task_index, step) def create_ps(task_index): param_server = tf.train.Server(cluster, job_name='ps', task_index=task_index) param_server.join() # Launch workers and ps in separate processes. processes = [] for i in xrange(len(cluster.as_dict()['worker'])): print 'Forking worker process ', i p = Process(target=create_worker, args=[i]) p.start() processes.append(p) for i in xrange(len(cluster.as_dict()['ps'])): print 'Forking ps process ', i p = Process(target=create_ps, args=[i]) p.start() processes.append(p) for p in processes: p.join()
mrry.. 5
我猜这里的原因是tf.train.MonitoredTrainingSession
启动方式中的隐式协调协议,这是在这里实现的:
如果这次会议是主席:
运行变量初始化程序op.
否则(如果本次会议不是主席):
运行op以检查变量是否已初始化.
虽然任何变量尚未初始化.
等30秒.
尝试创建一个新会话,并检查变量是否已初始化.
(我在关于分布式TensorFlow的视频中讨论了该协议背后的基本原理.)
当每个会话都是主要会话,或者没有要初始化的变量时,tf.train.MonitoredTrainingSession
将始终立即开始.但是,一旦有一个变量,而你只有一个负责人,你就会发现非首席工作人员必须等待主管采取行动.
使用该协议的原因在于它对于各种进程失败是健壮的,并且与在典型的分布式训练作业的预期运行时间相比,在单个进程上运行所有内容时非常明显的延迟是很短的.
再看一下这个实现,似乎这个30秒的超时应该是可配置的(作为recovery_wait_secs
参数tf.train.SessionManager()
),但是当你创建一个时tf.train.MonitoredTrainingSession
,目前没有办法设置这个超时,因为它使用一组硬编码的参数来创建会话管理员.这似乎是API的疏忽,所以请随时在GitHub问题页面上打开功能请求!
我猜这里的原因是tf.train.MonitoredTrainingSession
启动方式中的隐式协调协议,这是在这里实现的:
如果这次会议是主席:
运行变量初始化程序op.
否则(如果本次会议不是主席):
运行op以检查变量是否已初始化.
虽然任何变量尚未初始化.
等30秒.
尝试创建一个新会话,并检查变量是否已初始化.
(我在关于分布式TensorFlow的视频中讨论了该协议背后的基本原理.)
当每个会话都是主要会话,或者没有要初始化的变量时,tf.train.MonitoredTrainingSession
将始终立即开始.但是,一旦有一个变量,而你只有一个负责人,你就会发现非首席工作人员必须等待主管采取行动.
使用该协议的原因在于它对于各种进程失败是健壮的,并且与在典型的分布式训练作业的预期运行时间相比,在单个进程上运行所有内容时非常明显的延迟是很短的.
再看一下这个实现,似乎这个30秒的超时应该是可配置的(作为recovery_wait_secs
参数tf.train.SessionManager()
),但是当你创建一个时tf.train.MonitoredTrainingSession
,目前没有办法设置这个超时,因为它使用一组硬编码的参数来创建会话管理员.这似乎是API的疏忽,所以请随时在GitHub问题页面上打开功能请求!