我正在寻找一种java并发习惯用法来匹配具有最高吞吐量的大量元素.
考虑一下我有"人"来自多个线程.每个"人"都在寻找一场比赛.当它找到另一个等待的"人"时,它们相互匹配并被移除以进行处理.
我不想锁定一个大的结构来改变状态.考虑Person有getMatch和setMatch.在提交之前,每个人的#getMatch都是null.但是当他们解锁(或被捕获)时,他们要么已经过期,因为他们等待长时间的比赛或#getMatch是非空的.
保持高通过率的一些问题是,如果PersonA与PersonB同时提交.它们相互匹配,但PersonB也匹配已经在等待的PersonC.PersonB的状态在提交时变为"可用".但是当PersonB与PersonC匹配时,PersonA不需要偶然获得PersonB.合理?另外,我想以异步方式执行此操作.换句话说,我不希望每个提交者都必须在具有waitForMatch类型的东西的线程上持有Person.
同样,我不希望请求必须在不同的线程上运行,但是如果有一个额外的匹配器线程也没关系.
似乎应该有一些成语,因为它似乎是一个非常普遍的事情.但我的谷歌搜索已经枯竭(我可能使用错误的条款).
UPDATE
有几件事让我很难解决这个问题.一个是我不想在内存中有对象,我想让所有等待的候选人都使用redis或memcache或类似的东西.另一个是任何人可能有几个可能的比赛.考虑如下界面:
person.getId(); // lets call this an Integer person.getFriendIds(); // a collection of other person ids
然后我有一个看起来像这样的服务器:
MatchServer: submit( personId, expiration ) -> void // non-blocking returns immediately isDone( personId ) -> boolean // either expired or found a match getMatch( personId ) -> matchId // also non-blocking
这是一个休息界面,它会使用重定向,直到你得到结果.我的第一个想法是在MatchServer中有一个Cache,它由redis之类的东西支持,并且对于当前被锁定和被操作的对象具有并发的弱值哈希映射.每个personId将由持久状态对象包装,状态为已提交,匹配和过期.
到目前为止?非常简单,提交代码完成了初始工作,它是这样的:
public void submit( Person p, long expiration ) { MatchStatus incoming = new MatchStatus( p.getId(), expiration ); if ( !tryMatch( incoming, p.getFriendIds() ) ) cache.put( p.getId(), incoming ); } public boolean isDone( Integer personId ) { MatchStatus status = cache.get( personId ); status.lock(); try { return status.isMatched() || status.isExpired(); } finally { status.unlock(); } } public boolean tryMatch( MatchStatus incoming, Iterablefriends ) { for ( Integer friend : friends ) { if ( match( incoming, friend ) ) return true; } return false; } private boolean match( MatchStatus incoming, Integer waitingId ) { CallStatus waiting = cache.get( waitingId ); if ( waiting == null ) return false; waiting.lock(); try { if ( waiting.isMatched() ) return false; waiting.setMatch( incoming.getId() ); incoming.setMatch( waiting.getId() ); return true } finally { waiting.unlock(); } }
所以这里的问题是,如果两个人同时进来并且他们是他们唯一的比赛,他们就不会找到对方.竞争条件对吗?我能看到解决它的唯一方法是同步"tryMatch()".但这会影响我的吞吐量.我不能无限期地循环tryMatch,因为我需要这些非常短的调用.
那么有什么更好的方法来解决这个问题呢?我提出的每一个解决方案都会一次一个地强迫人们使用吞吐量.例如,创建后台线程并使用阻塞队列一次放入和接收传入线程.
任何指导将不胜感激.