我找不到适合Ruby的ThreadPool实现,所以我写了我的(部分基于此处的代码:http://web.archive.org/web/20081204101031/http : //snippets.dzone.com : 80/ posts/show/3276,但更改为等待/信号和ThreadPool关闭的其他实现.但是经过一段时间的运行(有100个线程并处理大约1300个任务),它在第25行死机 - 它等待一个新的工作有任何想法,为什么会发生?
require 'thread' begin require 'fastthread' rescue LoadError $stderr.puts "Using the ruby-core thread implementation" end class ThreadPool class Worker def initialize(callback) @mutex = Mutex.new @cv = ConditionVariable.new @callback = callback @mutex.synchronize {@running = true} @thread = Thread.new do while @mutex.synchronize {@running} block = get_block if block block.call reset_block # Signal the ThreadPool that this worker is ready for another job @callback.signal else # Wait for a new job @mutex.synchronize {@cv.wait(@mutex)} # <=== Is this line 25? end end end end def name @thread.inspect end def get_block @mutex.synchronize {@block} end def set_block(block) @mutex.synchronize do raise RuntimeError, "Thread already busy." if @block @block = block # Signal the thread in this class, that there's a job to be done @cv.signal end end def reset_block @mutex.synchronize {@block = nil} end def busy? @mutex.synchronize {!@block.nil?} end def stop @mutex.synchronize {@running = false} # Signal the thread not to wait for a new job @cv.signal @thread.join end end attr_accessor :max_size def initialize(max_size = 10) @max_size = max_size @workers = [] @mutex = Mutex.new @cv = ConditionVariable.new end def size @mutex.synchronize {@workers.size} end def busy? @mutex.synchronize {@workers.any? {|w| w.busy?}} end def shutdown @mutex.synchronize {@workers.each {|w| w.stop}} end alias :join :shutdown def process(block=nil,&blk) block = blk if block_given? while true @mutex.synchronize do worker = get_worker if worker return worker.set_block(block) else # Wait for a free worker @cv.wait(@mutex) end end end end # Used by workers to report ready status def signal @cv.signal end private def get_worker free_worker || create_worker end def free_worker @workers.each {|w| return w unless w.busy?}; nil end def create_worker return nil if @workers.size >= @max_size worker = Worker.new(self) @workers << worker worker end end
PierreBdR.. 11
好的,所以实现的主要问题是:如何确保没有信号丢失并避免死锁?
根据我的经验,使用条件变量和互斥量很难实现,但使用信号量很容易.事实上,ruby实现了一个名为Queue(或SizedQueue)的对象,可以解决问题.这是我建议的实现:
require 'thread' begin require 'fasttread' rescue LoadError $stderr.puts "Using the ruby-core thread implementation" end class ThreadPool class Worker def initialize(thread_queue) @mutex = Mutex.new @cv = ConditionVariable.new @queue = thread_queue @running = true @thread = Thread.new do @mutex.synchronize do while @running @cv.wait(@mutex) block = get_block if block @mutex.unlock block.call @mutex.lock reset_block end @queue << self end end end end def name @thread.inspect end def get_block @block end def set_block(block) @mutex.synchronize do raise RuntimeError, "Thread already busy." if @block @block = block # Signal the thread in this class, that there's a job to be done @cv.signal end end def reset_block @block = nil end def busy? @mutex.synchronize { !@block.nil? } end def stop @mutex.synchronize do @running = false @cv.signal end @thread.join end end attr_accessor :max_size def initialize(max_size = 10) @max_size = max_size @queue = Queue.new @workers = [] end def size @workers.size end def busy? @queue.size < @workers.size end def shutdown @workers.each { |w| w.stop } @workers = [] end alias :join :shutdown def process(block=nil,&blk) block = blk if block_given? worker = get_worker worker.set_block(block) end private def get_worker if !@queue.empty? or @workers.size == @max_size return @queue.pop else worker = Worker.new(@queue) @workers << worker worker end end end
这是一个简单的测试代码:
tp = ThreadPool.new 500 (1..1000).each { |i| tp.process { (2..10).inject(1) { |memo,val| sleep(0.1); memo*val }; print "Computation #{i} done. Nb of tasks: #{tp.size}\n" } } tp.shutdown
Filipe Migue.. 8
您可以尝试使用work_queue gem,它旨在协调生产者和工作线程池之间的工作.
好的,所以实现的主要问题是:如何确保没有信号丢失并避免死锁?
根据我的经验,使用条件变量和互斥量很难实现,但使用信号量很容易.事实上,ruby实现了一个名为Queue(或SizedQueue)的对象,可以解决问题.这是我建议的实现:
require 'thread' begin require 'fasttread' rescue LoadError $stderr.puts "Using the ruby-core thread implementation" end class ThreadPool class Worker def initialize(thread_queue) @mutex = Mutex.new @cv = ConditionVariable.new @queue = thread_queue @running = true @thread = Thread.new do @mutex.synchronize do while @running @cv.wait(@mutex) block = get_block if block @mutex.unlock block.call @mutex.lock reset_block end @queue << self end end end end def name @thread.inspect end def get_block @block end def set_block(block) @mutex.synchronize do raise RuntimeError, "Thread already busy." if @block @block = block # Signal the thread in this class, that there's a job to be done @cv.signal end end def reset_block @block = nil end def busy? @mutex.synchronize { !@block.nil? } end def stop @mutex.synchronize do @running = false @cv.signal end @thread.join end end attr_accessor :max_size def initialize(max_size = 10) @max_size = max_size @queue = Queue.new @workers = [] end def size @workers.size end def busy? @queue.size < @workers.size end def shutdown @workers.each { |w| w.stop } @workers = [] end alias :join :shutdown def process(block=nil,&blk) block = blk if block_given? worker = get_worker worker.set_block(block) end private def get_worker if !@queue.empty? or @workers.size == @max_size return @queue.pop else worker = Worker.new(@queue) @workers << worker worker end end end
这是一个简单的测试代码:
tp = ThreadPool.new 500 (1..1000).each { |i| tp.process { (2..10).inject(1) { |memo,val| sleep(0.1); memo*val }; print "Computation #{i} done. Nb of tasks: #{tp.size}\n" } } tp.shutdown
您可以尝试使用work_queue gem,它旨在协调生产者和工作线程池之间的工作.