我应该开始说我对任何类型的并行/多线程/多处理编程都是全新的.
现在,我有机会在32个核心上运行我的TensorFlow CNN(每个核心有2个超线程).我花了很多时间试图理解我应该如何修改(如果必须的话)我的代码以便利用所有的计算能力.不幸的是,我没有做任何事情.我希望TF可以自动完成,但是当我启动我的模型并检查top
CPU使用情况时,我发现大部分时间都是100%的CPU使用率和几个200%的峰值.如果使用了所有核心,我希望看到100*64 = 6400%的使用率(正确吗?).我怎么能做到这一点?我应该做一些类似于这里解释的事情吗?如果是这种情况,我是否正确理解所有多线程仅适用于涉及队列的计算?这是否真的可以使用所有可用的计算能力(因为在我看来,队列仅用于阅读和批量训练样本时)?
如果需要,这就是我的代码:(main.py)
# pylint: disable=missing-docstring from __future__ import absolute_import from __future__ import division from __future__ import print_function import time from six.moves import xrange # pylint: disable=redefined-builtin import tensorflow as tf from pylab import * import argparse import cnn import freader_2 training_feats_file = ["file_name"] training_lbls_file = ["file_name"] test_feats_file = 'file_name' test_lbls_file = 'file_name' learning_rate = 0.1 testset_size = 1000 batch_size = 1000 testset_size = 793 tot_samples = 810901 max_steps = 3300 def placeholder_inputs(batch_size): images_placeholder = tf.placeholder(tf.float32, shape=(testset_size, cnn.IMAGE_HEIGHT, cnn.IMAGE_WIDTH, 1)) labels_placeholder = tf.placeholder(tf.float32, shape=(testset_size, 15)) return images_placeholder, labels_placeholder def reader(images_file, lbls_file, images_pl, labels_pl, im_height, im_width): images = loadtxt(images_file) labels_feed = loadtxt(lbls_file) images_feed = reshape(images, [images.shape[0], im_height, im_width, 1]) feed_dict = { images_pl: images_feed, labels_pl: labels_feed, } return feed_dict tot_training_loss = [] tot_test_loss = [] tot_grad = [] print('Starting TensorFlow session...') with tf.Graph().as_default(): DS = freader_2.XICSDataSet() images, labels = DS.trainingset_files_reader(training_feats_file, training_lbls_file) keep_prob = tf.placeholder(tf.float32) logits = cnn.inference(images, batch_size, keep_prob) loss = cnn.loss(logits, labels) global_step = tf.Variable(0, trainable=False) train_op, grad_norm = cnn.training(loss, learning_rate, global_step) summary_op = tf.merge_all_summaries() test_images_pl, test_labels_pl = placeholder_inputs(testset_size) test_pred = cnn.inference(test_images_pl, testset_size, keep_prob, True) test_loss = cnn.loss(test_pred, test_labels_pl) saver = tf.train.Saver() sess = tf.Session() summary_writer = tf.train.SummaryWriter("CNN", sess.graph) init = tf.initialize_all_variables() sess.run(init) tf.train.start_queue_runners(sess=sess) test_feed = reader(test_feats_file, test_lbls_file, test_images_pl, test_labels_pl, DS.height, DS.width) test_feed[keep_prob] = 1. # Start the training loop. print('Starting training loop...') start_time = time.time() for step in xrange(max_steps): _, grad, loss_value= sess.run([train_op, grad_norm, loss], feed_dict = {keep_prob:0.5}) tot_training_loss.append(loss_value) tot_grad.append(grad) _, test_loss_val = sess.run([test_pred, test_loss], feed_dict=test_feed) tot_test_loss.append(test_loss_val) if step % 1 == 0: duration = time.time() - start_time print('Step %d (%.3f sec):\n training loss = %f\n test loss = %f ' % (step, duration, loss_value, test_loss_val)) print(' gradient = %f'%grad) # summary_str = sess.run(summary_op)#, feed_dict=feed_dict) # summary_writer.add_summary(summary_str, step) # summary_writer.flush() if (step+1) % 100 == 0: print('Saving checkpoint...') saver.save(sess, "chkpts/medias-res", global_step = global_step) if test_loss_val < 0.01:# or grad < 0.01: print("Stopping condition reached.") break print('Saving final network...') saver.save(sess, "chkpts/final.chkpt") print('Total training time: ' + str((time.time() - start_time)/3600) + ' h')
cnn.py:
from __future__ import absolute_import from __future__ import division from __future__ import print_function import math import tensorflow as tf NUM_OUTPUT = 15 IMAGE_WIDTH = 195 IMAGE_HEIGHT = 20 IMAGE_PIXELS = IMAGE_WIDTH * IMAGE_HEIGHT def inference(images, num_samples, keep_prob, reuse=None): with tf.variable_scope('conv1', reuse=reuse): kernel = tf.get_variable(name='weights', shape=[3, 30, 1, 5], initializer=tf.contrib.layers.xavier_initializer(uniform=False)) weight_decay = tf.mul(tf.nn.l2_loss(kernel), 0.001, name='weight_loss') tf.add_to_collection('losses', weight_decay) conv = tf.nn.conv2d(images, kernel, [1, 1, 5, 1], padding='VALID') # output dim: 18x34 biases = tf.Variable(tf.constant(0.0, name='biases', shape=[5])) bias = tf.nn.bias_add(conv, biases) conv1 = tf.nn.relu(bias, name='conv1') pool1 = tf.nn.max_pool(conv1, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID', name='pool1') #output dim: 9x17 with tf.variable_scope('conv2', reuse=reuse): kernel = tf.get_variable(name='weights', shape=[2, 2, 5, 5], initializer=tf.contrib.layers.xavier_initializer(uniform=False)) weight_decay = tf.mul(tf.nn.l2_loss(kernel), 0.001, name='weight_loss') tf.add_to_collection('losses', weight_decay) conv = tf.nn.conv2d(pool1, kernel, [1, 1, 1, 1], padding='VALID') #output dim: 8x16 biases = tf.Variable(tf.constant(0.1, name='biases', shape=[5])) bias = tf.nn.bias_add(conv, biases) conv2 = tf.nn.relu(bias, name='conv2') pool2 = tf.nn.max_pool(conv2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID', name='pool2') #output dim: 4x8 h_fc1_drop = tf.nn.dropout(pool2, keep_prob) with tf.variable_scope('fully_connected', reuse=reuse): reshape = tf.reshape(h_fc1_drop, [num_samples, -1]) dim = reshape.get_shape()[1].value weights = tf.get_variable(name='weights', shape=[dim, 20], initializer=tf.contrib.layers.xavier_initializer(uniform=False)) weight_decay = tf.mul(tf.nn.l2_loss(weights), 0.004, name='weight_loss') tf.add_to_collection('losses', weight_decay) biases = tf.Variable(tf.zeros([20], name='biases')) fully_connected = tf.nn.relu(tf.matmul(reshape, weights) + biases, name='fully_connected') with tf.variable_scope('identity', reuse=reuse): weights = tf.get_variable(name='weights', shape=[20,NUM_OUTPUT], initializer=tf.contrib.layers.xavier_initializer(uniform=False)) weight_decay = tf.mul(tf.nn.l2_loss(weights), 0.004, name='weight_loss') tf.add_to_collection('losses', weight_decay) biases = tf.Variable(tf.zeros([NUM_OUTPUT], name='biases')) output = tf.matmul(fully_connected, weights) + biases return output def loss(outputs, labels): rmse = tf.sqrt(tf.reduce_mean(tf.square(tf.sub(labels, outputs))), name="rmse") loss_list = tf.get_collection('losses') loss_list.append(rmse) rmse_tot = tf.add_n(loss_list, name='total_loss') return rmse_tot def training(loss, starter_learning_rate, global_step): tf.scalar_summary(loss.op.name, loss) # optimizer = tf.train.AdamOptimizer() learning_rate = tf.train.exponential_decay(starter_learning_rate, global_step, 200, 0.8, staircase=True) optimizer = tf.train.MomentumOptimizer(learning_rate, 0.8) grads_and_vars = optimizer.compute_gradients(loss) grad_norms = [tf.nn.l2_loss(g[0]) for g in grads_and_vars] grad_norm = tf.add_n(grad_norms) train_op = optimizer.apply_gradients(grads_and_vars, global_step=global_step) # train_op = optimizer.minimize(loss, global_step=global_step) return train_op, grad_norm
freader_2.py:
# -*- coding: utf-8 -*- from __future__ import absolute_import from __future__ import division from __future__ import print_function import os import collections import numpy as np from six.moves import xrange import tensorflow as tf class XICSDataSet: def __init__(self, height=20, width=195, batch_size=1000, noutput=15): self.depth = 1 self.height = height self.width = width self.batch_size = batch_size self.noutput = noutput def trainingset_files_reader(self, im_file_name, lb_file_name, nfiles=1): im_filename_queue = tf.train.string_input_producer(im_file_name, shuffle=False) lb_filename_queue = tf.train.string_input_producer(lb_file_name, shuffle=False) imreader = tf.TextLineReader() lbreader = tf.TextLineReader() imkey, imvalue = imreader.read(im_filename_queue) lbkey, lbvalue = lbreader.read(lb_filename_queue) im_record_defaults = [[.0]]*self.height*self.width lb_record_defaults = [[.0]]*self.noutput im_data_tuple = tf.decode_csv(imvalue, record_defaults=im_record_defaults, field_delim = ' ') lb_data_tuple = tf.decode_csv(lbvalue, record_defaults=lb_record_defaults, field_delim = ' ') features = tf.pack(im_data_tuple) label = tf.pack(lb_data_tuple) depth_major = tf.reshape(features, [self.height, self.width, self.depth]) min_after_dequeue = 10 capacity = min_after_dequeue + 3 * self.batch_size example_batch, label_batch = tf.train.shuffle_batch([depth_major, label], batch_size=self.batch_size, capacity=capacity, min_after_dequeue=min_after_dequeue) return example_batch, label_batch
Marco D.G... 6
根据Tensorflow:
下面列出的两个配置用于通过调整线程池来优化CPU性能。
intra_op_parallelism_threads
:可以使用多个线程并行执行的节点将把各个片段调度到该池中。
inter_op_parallelism_threads
:所有就绪节点都在此池中调度。
这些配置通过设置
tf.ConfigProto
并传递给tf.Session
在config
属性作为显示在下面的代码段。对于这两个配置选项,如果未设置或将其设置为0,则默认为逻辑CPU内核数。测试表明,默认设置对从一个具有4个内核的CPU到具有70多个组合逻辑内核的多个CPU的系统有效。一种常见的替代优化是将两个池中的线程数设置为等于物理核心而不是逻辑核心的数量config = tf.ConfigProto() config.intra_op_parallelism_threads = 44 config.inter_op_parallelism_threads = 44 tf.session(config=config)
在1.2之前的TensorFlow版本中,建议使用多线程,基于队列的输入管道以提高性能。但是从TensorFlow 1.4开始,建议改用tf.data模块。
是的,在Linux中,您可以使用检查CPU使用率,top
然后按1显示每个CPU的使用率。注意:百分比取决于Irix / Solaris模式。
根据Tensorflow:
下面列出的两个配置用于通过调整线程池来优化CPU性能。
intra_op_parallelism_threads
:可以使用多个线程并行执行的节点将把各个片段调度到该池中。
inter_op_parallelism_threads
:所有就绪节点都在此池中调度。
这些配置通过设置
tf.ConfigProto
并传递给tf.Session
在config
属性作为显示在下面的代码段。对于这两个配置选项,如果未设置或将其设置为0,则默认为逻辑CPU内核数。测试表明,默认设置对从一个具有4个内核的CPU到具有70多个组合逻辑内核的多个CPU的系统有效。一种常见的替代优化是将两个池中的线程数设置为等于物理核心而不是逻辑核心的数量config = tf.ConfigProto() config.intra_op_parallelism_threads = 44 config.inter_op_parallelism_threads = 44 tf.session(config=config)
在1.2之前的TensorFlow版本中,建议使用多线程,基于队列的输入管道以提高性能。但是从TensorFlow 1.4开始,建议改用tf.data模块。
是的,在Linux中,您可以使用检查CPU使用率,top
然后按1显示每个CPU的使用率。注意:百分比取决于Irix / Solaris模式。