我应该使用哪个Java同步对象来确保完成任意大量的任务?限制是:
每项任务都需要花费大量的时间来完成,并行执行任务是合适的.
有太多的任务要适应内存(即我不能将Future
每个任务放入a Collection
然后调用get
所有期货).
我不知道将会有多少任务(即我不能使用a CountDownLatch
).
该ExecutorService
可共享的,所以我不能用awaitTermination( long, TimeUnit )
例如,在Grand Central Dispatch中,我可能会这样做:
let workQueue = dispatch_get_global_queue( QOS_CLASS_BACKGROUND, 0 ) let latch = dispatch_group_create() let startTime = NSDate() var itemsProcessed = 0 let countUpdateQueue = dispatch_queue_create( "countUpdateQueue", DISPATCH_QUEUE_SERIAL ) for item in fetchItems() // generator returns too many items to store in memory { dispatch_group_enter( latch ) dispatch_async( workQueue ) { self.processItem( item ) // method takes a non-trivial amount of time to run dispatch_async( countUpdateQueue ) { itemsProcessed++ } dispatch_group_leave( latch ) } } dispatch_group_wait( latch, DISPATCH_TIME_FOREVER ) let endTime = NSDate() let totalTime = endTime.timeIntervalSinceDate( startTime ) print( "Processed \(itemsProcessed) items in \(totalTime) seconds." )
它产生的输出看起来像这样(128项): Processed 128 items in 1.846794962883 seconds.
我试过类似的东西Phaser
:
final Executor executor = new ThreadPoolExecutor( 64, 64, 1l, MINUTES, new LinkedBlockingQueue( 8 ), new CallerRunsPolicy() ); final Phaser latch = new Phaser( 0 ); final long startTime = currentTimeMillis(); final AtomicInteger itemsProcessed = new AtomicInteger( 0 ); for( final String item : fetchItems() ) // iterator returns too many items to store in memory { latch.register(); final Runnable task = new Runnable() { public void run() { processItem( item ); // method takes a non-trivial amount of time to run itemsProcessed.incrementAndGet(); latch.arrive(); } }; executor.execute( task ); } latch.awaitAdvance( 0 ); final long endTime = currentTimeMillis(); out.println( "Processed " + itemsProcessed.get() + " items in " + ( endTime - startTime ) / 1000.0 + " seconds." );
任务并不总是在最后一个print语句之前完成,我可能会得到如下所示的输出(对于128项):Processed 121 items in 5.296 seconds.
是否Phaser
使用正确的对象?文档表明它只支持65,535个参与方,因此我需要批处理要处理的项目或引入某种Phaser
分层.