我有一个生产者和许多消费者.
生产者很快并产生了很多结果
需要按顺序处理具有相同值的标记
必须并行处理具有不同值的令牌
创建新的Runnables会非常昂贵,并且生产代码也可以使用100k的Tokens(为了创建一个Runnable,我必须传递给构造函数一些复杂的构建对象)
使用更简单的算法可以获得相同的结果吗?使用可重入锁嵌套同步块似乎有点不自然.你可能会注意到任何比赛条件吗?
更新:我找到的第二个解决方案是使用3个集合.一个缓存生产者结果,第二个是阻塞队列,第三个使用列表来跟踪正在进行的任务.再有点复杂.
我的代码版本
import java.util.*; import java.util.concurrent.*; import java.util.concurrent.locks.ReentrantLock; public class Main1 { static class Token { private int order; private String value; Token() { } Token(int o, String v) { order = o; value = v; } int getOrder() { return order; } String getValue() { return value; } } private final static BlockingQueuequeue = new ArrayBlockingQueue (10); private final static ConcurrentMap locks = new ConcurrentHashMap (); private final static ReentrantLock reentrantLock = new ReentrantLock(); private final static Token STOP_TOKEN = new Token(); private final static List lockList = Collections.synchronizedList(new ArrayList ()); public static void main(String[] args) { ExecutorService producerExecutor = Executors.newSingleThreadExecutor(); producerExecutor.submit(new Runnable() { public void run() { Random random = new Random(); try { for (int i = 1; i <= 100; i++) { Token token = new Token(i, String.valueOf(random.nextInt(1))); queue.put(token); } queue.put(STOP_TOKEN); }catch(InterruptedException e){ e.printStackTrace(); } } }); ExecutorService consumerExecutor = Executors.newFixedThreadPool(10); for(int i=1; i<=10;i++) { // creating to many runnable would be inefficient because of this complex not thread safe object final Object dependecy = new Object(); //new ComplexDependecy() consumerExecutor.submit(new Runnable() { public void run() { while(true) { try { //not in order Token token = queue.take(); if (token == STOP_TOKEN) { queue.add(STOP_TOKEN); return; } System.out.println("Task start" + Thread.currentThread().getId() + " order " + token.getOrder()); Random random = new Random(); Thread.sleep(random.nextInt(200)); //doLongRunningTask(dependecy) lockList.remove(token.getValue()); } catch (InterruptedException e) { e.printStackTrace(); } } }}); } }}
Victor Sorok.. 6
您可以预先创建一组Runnables
将选择传入的任务(令牌)并根据其顺序值将它们放入队列中.
正如评论中所指出的那样,不能保证具有不同值的令牌将始终并行执行(总而言之,您至少可以通过框中的nr个物理核心来限制).但是,保证具有相同顺序的令牌将按到达顺序执行.
示例代码:
/** * Executor which ensures incoming tasks are executed in queues according to provided key (see {@link Task#getOrder()}). */ public class TasksOrderingExecutor { public interface Task extends Runnable { /** * @return ordering value which will be used to sequence tasks with the same value.
* Tasks with different ordering values may be executed in parallel, but not guaranteed to. */ String getOrder(); } private static class Worker implements Runnable { private final LinkedBlockingQueuetasks = new LinkedBlockingQueue<>(); private volatile boolean stopped; void schedule(Task task) { tasks.add(task); } void stop() { stopped = true; } @Override public void run() { while (!stopped) { try { Task task = tasks.take(); task.run(); } catch (InterruptedException ie) { // perhaps, handle somehow } } } } private final Worker[] workers; private final ExecutorService executorService; /** * @param queuesNr nr of concurrent task queues */ public TasksOrderingExecutor(int queuesNr) { Preconditions.checkArgument(queuesNr >= 1, "queuesNr >= 1"); executorService = new ThreadPoolExecutor(queuesNr, queuesNr, 0, TimeUnit.SECONDS, new SynchronousQueue<>()); workers = new Worker[queuesNr]; for (int i = 0; i < queuesNr; i++) { Worker worker = new Worker(); executorService.submit(worker); workers[i] = worker; } } public void submit(Task task) { Worker worker = getWorker(task); worker.schedule(task); } public void stop() { for (Worker w : workers) w.stop(); executorService.shutdown(); } private Worker getWorker(Task task) { return workers[task.getOrder().hashCode() % workers.length]; } }
David Siro.. 6
根据代码的性质,保证以串行方式处理具有相同值的令牌的唯一方法是等待STOP_TOKEN到达.
您需要单个生产者 - 单个消费者设置,消费者按其值收集和排序令牌(进入Multimap,比如说).
只有这样,您才能知道哪些令牌可以串行处理,哪些令牌可以并行处理.
无论如何,我建议你看看LMAX Disruptor,这是在线程之间共享数据的非常有效的方法.
它不像Executors那样受到同步开销的影响,因为它是无锁的(这可能会给你很好的性能优势,具体取决于你的数据处理的性质).
// single thread for processing as there will be only on consumer DisruptorinboundDisruptor = new Disruptor<>(InEvent::new, 32, Executors.newSingleThreadExecutor()); // outbound disruptor that uses 3 threads for event processing Disruptor outboundDisruptor = new Disruptor<>(OutEvent::new, 32, Executors.newFixedThreadPool(3)); inboundDisruptor.handleEventsWith(new InEventHandler(outboundDisruptor)); // setup 3 event handlers, doing round robin consuming, effectively processing OutEvents in 3 threads outboundDisruptor.handleEventsWith(new OutEventHandler(0, 3, new Object())); outboundDisruptor.handleEventsWith(new OutEventHandler(1, 3, new Object())); outboundDisruptor.handleEventsWith(new OutEventHandler(2, 3, new Object())); inboundDisruptor.start(); outboundDisruptor.start(); // publisher code for (int i = 0; i < 10; i++) { inboundDisruptor.publishEvent(InEventTranslator.INSTANCE, new Token()); }
入站破坏程序上的事件处理程序只是收集传入的令牌.收到STOP令牌后,它会将一系列令牌发布到出站扰乱器以进行进一步处理:
public class InEventHandler implements EventHandler{ private ListMultimap tokensByValue = ArrayListMultimap.create(); private Disruptor outboundDisruptor; public InEventHandler(Disruptor outboundDisruptor) { this.outboundDisruptor = outboundDisruptor; } @Override public void onEvent(InEvent event, long sequence, boolean endOfBatch) throws Exception { if (event.token == STOP_TOKEN) { // publish indexed tokens to outbound disruptor for parallel processing tokensByValue.asMap().entrySet().stream().forEach(entry -> outboundDisruptor.publishEvent(OutEventTranslator.INSTANCE, entry.getValue())); } else { tokensByValue.put(event.token.value, event.token); } } }
出站事件处理程序按顺序处理相同值的标记:
public class OutEventHandler implements EventHandler{ private final long order; private final long allHandlersCount; private Object yourComplexDependency; public OutEventHandler(long order, long allHandlersCount, Object yourComplexDependency) { this.order = order; this.allHandlersCount = allHandlersCount; this.yourComplexDependency = yourComplexDependency; } @Override public void onEvent(OutEvent event, long sequence, boolean endOfBatch) throws Exception { if (sequence % allHandlersCount != order ) { // round robin, do not consume every event to allow parallel processing return; } for (Token token : event.tokensToProcessSerially) { // do procesing of the token using your complex class } } }
其余所需的基础架构(Disruptor docs中描述的目的):
public class InEventTranslator implements EventTranslatorOneArg{ public static final InEventTranslator INSTANCE = new InEventTranslator(); @Override public void translateTo(InEvent event, long sequence, Token arg0) { event.token = arg0; } } public class OutEventTranslator implements EventTranslatorOneArg > { public static final OutEventTranslator INSTANCE = new OutEventTranslator(); @Override public void translateTo(OutEvent event, long sequence, Collection tokens) { event.tokensToProcessSerially = tokens; } } public class InEvent { // Note that no synchronization is used here, // even though the field is used among multiple threads. // Memory barrier used by Disruptor guarantee changes are visible. public Token token; } public class OutEvent { // ... again, no locks. public Collection tokensToProcessSerially; } public class Token { String value; }
Matt Timmerm.. 5
如果你有很多不同的令牌,那么最简单的解决方案是创建一个线程执行者一定数目(约2倍的核数),然后每个任务分配给它的令牌的哈希值决定的执行.
这样,具有相同令牌的所有任务将转到相同的执行器并按顺序执行,因为每个执行程序只有一个线程.
如果您对调度公平性有一些未说明的要求,那么通过让生产者线程在分发它们之前排队其请求(或阻止)来避免任何严重的不平衡是很容易的,直到每个执行者的未完成请求少于10个请求为止. .
您可以预先创建一组Runnables
将选择传入的任务(令牌)并根据其顺序值将它们放入队列中.
正如评论中所指出的那样,不能保证具有不同值的令牌将始终并行执行(总而言之,您至少可以通过框中的nr个物理核心来限制).但是,保证具有相同顺序的令牌将按到达顺序执行.
示例代码:
/** * Executor which ensures incoming tasks are executed in queues according to provided key (see {@link Task#getOrder()}). */ public class TasksOrderingExecutor { public interface Task extends Runnable { /** * @return ordering value which will be used to sequence tasks with the same value.
* Tasks with different ordering values may be executed in parallel, but not guaranteed to. */ String getOrder(); } private static class Worker implements Runnable { private final LinkedBlockingQueuetasks = new LinkedBlockingQueue<>(); private volatile boolean stopped; void schedule(Task task) { tasks.add(task); } void stop() { stopped = true; } @Override public void run() { while (!stopped) { try { Task task = tasks.take(); task.run(); } catch (InterruptedException ie) { // perhaps, handle somehow } } } } private final Worker[] workers; private final ExecutorService executorService; /** * @param queuesNr nr of concurrent task queues */ public TasksOrderingExecutor(int queuesNr) { Preconditions.checkArgument(queuesNr >= 1, "queuesNr >= 1"); executorService = new ThreadPoolExecutor(queuesNr, queuesNr, 0, TimeUnit.SECONDS, new SynchronousQueue<>()); workers = new Worker[queuesNr]; for (int i = 0; i < queuesNr; i++) { Worker worker = new Worker(); executorService.submit(worker); workers[i] = worker; } } public void submit(Task task) { Worker worker = getWorker(task); worker.schedule(task); } public void stop() { for (Worker w : workers) w.stop(); executorService.shutdown(); } private Worker getWorker(Task task) { return workers[task.getOrder().hashCode() % workers.length]; } }
根据代码的性质,保证以串行方式处理具有相同值的令牌的唯一方法是等待STOP_TOKEN到达.
您需要单个生产者 - 单个消费者设置,消费者按其值收集和排序令牌(进入Multimap,比如说).
只有这样,您才能知道哪些令牌可以串行处理,哪些令牌可以并行处理.
无论如何,我建议你看看LMAX Disruptor,这是在线程之间共享数据的非常有效的方法.
它不像Executors那样受到同步开销的影响,因为它是无锁的(这可能会给你很好的性能优势,具体取决于你的数据处理的性质).
// single thread for processing as there will be only on consumer DisruptorinboundDisruptor = new Disruptor<>(InEvent::new, 32, Executors.newSingleThreadExecutor()); // outbound disruptor that uses 3 threads for event processing Disruptor outboundDisruptor = new Disruptor<>(OutEvent::new, 32, Executors.newFixedThreadPool(3)); inboundDisruptor.handleEventsWith(new InEventHandler(outboundDisruptor)); // setup 3 event handlers, doing round robin consuming, effectively processing OutEvents in 3 threads outboundDisruptor.handleEventsWith(new OutEventHandler(0, 3, new Object())); outboundDisruptor.handleEventsWith(new OutEventHandler(1, 3, new Object())); outboundDisruptor.handleEventsWith(new OutEventHandler(2, 3, new Object())); inboundDisruptor.start(); outboundDisruptor.start(); // publisher code for (int i = 0; i < 10; i++) { inboundDisruptor.publishEvent(InEventTranslator.INSTANCE, new Token()); }
入站破坏程序上的事件处理程序只是收集传入的令牌.收到STOP令牌后,它会将一系列令牌发布到出站扰乱器以进行进一步处理:
public class InEventHandler implements EventHandler{ private ListMultimap tokensByValue = ArrayListMultimap.create(); private Disruptor outboundDisruptor; public InEventHandler(Disruptor outboundDisruptor) { this.outboundDisruptor = outboundDisruptor; } @Override public void onEvent(InEvent event, long sequence, boolean endOfBatch) throws Exception { if (event.token == STOP_TOKEN) { // publish indexed tokens to outbound disruptor for parallel processing tokensByValue.asMap().entrySet().stream().forEach(entry -> outboundDisruptor.publishEvent(OutEventTranslator.INSTANCE, entry.getValue())); } else { tokensByValue.put(event.token.value, event.token); } } }
出站事件处理程序按顺序处理相同值的标记:
public class OutEventHandler implements EventHandler{ private final long order; private final long allHandlersCount; private Object yourComplexDependency; public OutEventHandler(long order, long allHandlersCount, Object yourComplexDependency) { this.order = order; this.allHandlersCount = allHandlersCount; this.yourComplexDependency = yourComplexDependency; } @Override public void onEvent(OutEvent event, long sequence, boolean endOfBatch) throws Exception { if (sequence % allHandlersCount != order ) { // round robin, do not consume every event to allow parallel processing return; } for (Token token : event.tokensToProcessSerially) { // do procesing of the token using your complex class } } }
其余所需的基础架构(Disruptor docs中描述的目的):
public class InEventTranslator implements EventTranslatorOneArg{ public static final InEventTranslator INSTANCE = new InEventTranslator(); @Override public void translateTo(InEvent event, long sequence, Token arg0) { event.token = arg0; } } public class OutEventTranslator implements EventTranslatorOneArg > { public static final OutEventTranslator INSTANCE = new OutEventTranslator(); @Override public void translateTo(OutEvent event, long sequence, Collection tokens) { event.tokensToProcessSerially = tokens; } } public class InEvent { // Note that no synchronization is used here, // even though the field is used among multiple threads. // Memory barrier used by Disruptor guarantee changes are visible. public Token token; } public class OutEvent { // ... again, no locks. public Collection tokensToProcessSerially; } public class Token { String value; }
如果你有很多不同的令牌,那么最简单的解决方案是创建一个线程执行者一定数目(约2倍的核数),然后每个任务分配给它的令牌的哈希值决定的执行.
这样,具有相同令牌的所有任务将转到相同的执行器并按顺序执行,因为每个执行程序只有一个线程.
如果您对调度公平性有一些未说明的要求,那么通过让生产者线程在分发它们之前排队其请求(或阻止)来避免任何严重的不平衡是很容易的,直到每个执行者的未完成请求少于10个请求为止. .