我有一些数据库表,只要app正在运行,就需要一次处理5个数据库表.所以,它看起来像这样:
获取尚未处理的记录或其他线程现在不处理的记录.
处理它(这是一个依赖于互联网连接的漫长过程,因此可能会超时/抛出错误).
转到下一条记录.到达表的末尾从头开始.
我对线程没有多少经验,所以我看到两种可能的策略:
方法A.
1.创建新的ExecutorService:
ExecutorService taskExecutor = Executors.newFixedThreadPool(5);
2.添加5个任务:
for (int i = 0; i < 5; i++) { taskExecutor.execute(new MyTask()); }
3.每个任务都是无限循环,即:从表中读取记录,处理它,然后获取另一条记录.
这种方法的问题是如何通知其他线程当前正在处理哪些记录.为此,我可以使用表中的"status"字段,或者只使用一些包含当前处理ID的CopyOnWriteArraySet.
方法B.
1.创建相同的ExecutorService:
ExecutorService taskExecutor = Executors.newFixedThreadPool(5);
2.有一个无限循环,选择需要处理的记录并将它们传递给执行程序:
while (true) { //get next record here taskExecutor.execute(new MyTask(record)); //monitor the queue and wait until some thread is done processing, //so I can add another record }
3.每个任务处理一条记录.
这种方法的问题在于我需要将执行程序队列中的任务添加到比处理它们更慢的时间,以免它们随着时间的推移而堆积起来.这意味着我不仅需要监视当前正在运行的任务,还要监视它们何时完成处理,因此我可以将新记录添加到队列中.
我个人认为第一种方法更好(更容易),但我觉得第二种方法更正确.你怎么看?或者也许我应该做一些完全不同的事情?
如果需要,我也可以使用Spring或Quartz库.
谢谢.
我认为CompletionService(和ExecutorCompletionService)可以帮到你.
您通过完成服务提交所有任务,它允许您等到一个线程(任何线程)完成其任务.这样,只要有空闲线程,您就可以提交下一个任务.这意味着你使用方法B.
伪代码:
Create ThreadPoolExecutor and ExecutorCompletionService wrapping it while (true) { int freeThreads = executor.getMaximumPoolSize() - executor.getActiveCount() fetch 'freeThreads' tasks and submit to completion service (which in turn sends it to executor) wait until completion service reports finished task (with timeout) }
超时等待可以帮助您避免队列中没有任务时的情况,因此所有线程都处于空闲状态,您等待其中一个线程完成 - 这种情况永远不会发生.
您可以通过ThreadPoolExecutor方法检查可用线程的数量:( getActiveCount
活动线程)和getMaximumPoolSize
(最大可用配置线程).您需要直接创建ThreadPoolExecutor,或者转换从Executors.newFixedThreadPool()返回的对象,虽然我更喜欢直接创建...有关详细信息,请参阅Executors.newFixedThreadPool()方法的来源.