我使用datastax java驱动程序3.1.0连接到cassandra集群,我的cassandra集群版本是2.0.10.我正在以QUORUM
一致性异步写作.
private final ExecutorService executorService = Executors.newFixedThreadPool(10); private final Semaphore concurrentQueries = new Semaphore(1000); public void save(String process, int clientid, long deviceid) { String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)"; try { BoundStatement bs = CacheStatement.getInstance().getStatement(sql); bs.setConsistencyLevel(ConsistencyLevel.QUORUM); bs.setString(0, process); bs.setInt(1, clientid); bs.setLong(2, deviceid); concurrentQueries.acquire(); ResultSetFuture future = session.executeAsync(bs); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(ResultSet result) { concurrentQueries.release(); logger.logInfo("successfully written"); } @Override public void onFailure(Throwable t) { concurrentQueries.release(); logger.logError("error= ", t); } }, executorService); } catch (Exception ex) { logger.logError("error= ", ex); } }
我的上述save方法将以非常快的速度从多个线程调用.如果我以高于我的Cassandra集群可以处理的速度写入,那么它将开始抛出错误,我希望我的所有写入都应成功进入cassandra而不会有任何损失.
题:
我正在考虑使用一些排序队列或缓冲区来排队请求(例如java.util.concurrent.ArrayBlockingQueue
)."缓冲区已满"意味着客户应该等待.缓冲区还将用于重新排队失败的请求.然而,更公平的失败请求可能应该放在队列前面,以便首先重试它们.此外,我们应该以某种方式处理队列已满并且同时存在新的失败请求的情况.然后,单线程工作者将从队列中挑选请求并将其发送到Cassandra.因为它不应该做太多,所以它不太可能成为一个瓶颈.该工作人员可以应用自己的速率限制,例如基于时间限制com.google.common.util.concurrent.RateLimiter
.
实现这个队列或缓冲区功能的最佳方法是什么,可以在写入Cassandra时应用特定的番石榴速率限制,或者如果有更好的方法也让我知道?我想以每秒2000的请求写入Cassandra(这应该是可配置的,以便我可以使用它来查看什么是最佳设置).
正如下面评论中所述,如果内存不断增加,我们可以使用Guava Cache或CLHM来保持丢弃旧记录,以确保我的程序不会耗尽内存.我们将在盒子上有大约12GB的内存,这些记录非常小,所以我不认为它应该是一个问题.