我试图使用ListenableFuture对Cassandra集群进行异步写入,如下所示:
private static Cluster cluster = null; private ListeningExecutorService executorService; private PreparedStatement preparedStatement; private Session session = null; ... executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(POOL_SIZE)); ... public void writeValue(Tuple tuple) { ListenableFuturefuture = executorService.submit(new Callable () { @Override public String call() throws Exception { if(session == null) { session = getCluster().connect("dbname"); preparedStatement = session.prepare(queryString); } try { BoundStatement boundStatement = preparedStatement.bind(tuple values); session.execute(boundStatement); } catch(Exception exception) { // handle exception } return null; } });
如果我将POOL_SIZE设置为1,一切正常.
如果我将POOL_SIZE设置为> 1,我会收到如下错误:
引起:com.datastax.driver.core.exceptions.InvalidQueryException:试图执行未知的准备查询:0x75c5b41b9f07afa5384a69790503f963.您可能已使用使用其他Cluster实例创建的PreparedStatement.
所以我session
和preparedStatement
当地的变种.然后我得到关于Re-preparing already prepared query ...
加上它每次都创建一个新会话的警告.
我想尽可能多地重用.我做错了什么,我有什么选择?
将这个类静态化会有帮助吗?
你在这里有各种各样的竞争条件,并且执行不是线程安全的.
每个的Cluster
,Session
和PreparedStatement
正在意思是应用范围的单身人士,即.你只需要一个(每个查询一个PreparedStatement
).
但是,您正在重新创建Session
并可能PreparedStatement
多次准备.
别.Session
在构造函数或某个只运行一次的位置初始化您的一次,并同时准备您的语句.然后使用Session
和PreparedStatement
在适当情况下.
使用单线程执行程序,一切都像同步一样运行.当您添加更多线程时,其中许多可能会调用
session.prepare(queryString);
同时.或者PreparedStatement
你在这里使用
BoundStatement boundStatement = preparedStatement.bind(tuple values); session.execute(boundStatement);
可能与您初始化的不同
preparedStatement = session.prepare(queryString);
即使在同一个执行线程内.或者您可能尝试使用与初始化它PreparedStatement
不同的方法执行Session
.
以下是使用CQL驱动程序时应该做的一些事情.
准备好的语句是绑定在一个会话上还是可以在另一个会话中使用?
预准备语句派生自特定会话实例.因此,当您准备一个语句并将其发送到服务器时,它将被发送到与此会话实例关联的集群.
的javadoc的Session
状态
会话实例是线程安全的,通常每个应用程序的单个实例就足够了.