当前位置:  开发笔记 > 前端 > 正文

BulkRequestBuilder的Elasticsearch索引速度变慢

如何解决《BulkRequestBuilder的Elasticsearch索引速度变慢》经验,为你挑选了1个好方法。

大家好,所有的弹性研究大师.

我有数百万个数据要由elasticsearch Java API索引.elasticsearch的集群节点数为3(1作为主节点+ 2节点).

我的代码片段如下.

Settings settings = ImmutableSettings.settingsBuilder()
     .put("cluster.name", "MyClusterName").build();

TransportClient client = new TransportClient(settings);
String hostname = "myhost ip";
int port = 9300; 
client.addTransportAddress(new InetSocketTransportAddress(hostname, port));

BulkRequestBuilder bulkBuilder = client.prepareBulk();
BufferedReader br = new BufferedReader(new InputStreamReader(new DataInputStream(new FileInputStream("my_file_path"))));
long bulkBuilderLength = 0;
String readLine = "";
String index = "my_index_name";
String type = "my_type_name";
String id = "";

while((readLine = br.readLine()) != null){

    id = somefunction(readLine);
    String json = new ObjectMapper().writeValueAsString(readLine);
    bulkBuilder.add(client.prepareIndex(index, type, id)
        .setSource(json));
    bulkBuilderLength++;
    if(bulkBuilderLength % 1000== 0){
        logger.info("##### " + bulkBuilderLength + " data indexed.");
        BulkResponse bulkRes = bulkBuilder.execute().actionGet();
        if(bulkRes.hasFailures()){
            logger.error("##### Bulk Request failure with error: " + bulkRes.buildFailureMessage());
        }
    }
}

br.close();

if(bulkBuilder.numberOfActions() > 0){
    logger.info("##### " + bulkBuilderLength + " data indexed.");
    BulkResponse bulkRes = bulkBuilder.execute().actionGet();
    if(bulkRes.hasFailures()){
        logger.error("##### Bulk Request failure with error: " + bulkRes.buildFailureMessage());
    }
    bulkBuilder = client.prepareBulk();
}

它工作正常,但在成千上万的文档之后,性能迅速下降.

我已经尝试将" refresh_interval "的设置值更改为-1,将" number_of_replicas " 更改为0. 但是,性能下降的情况是一样的.

如果我使用bigdesk监控集群的状态,则GC值每秒都会达到1,如下面的屏幕截图所示.

有人可以帮帮我吗?

提前致谢.

在此输入图像描述

===================更新===========================

最后,我已经解决了这个问题.(见答案).

问题的原因是我错过了重新创建一个新的BulkRequestBuilder.在我更改了下面的代码片段后,性能降低从未发生过.

非常感谢你.

Settings settings = ImmutableSettings.settingsBuilder()
     .put("cluster.name", "MyClusterName").build();

TransportClient client = new TransportClient(settings);
String hostname = "myhost ip";
int port = 9300; 
client.addTransportAddress(new InetSocketTransportAddress(hostname, port));

BulkRequestBuilder bulkBuilder = client.prepareBulk();
BufferedReader br = new BufferedReader(new InputStreamReader(new DataInputStream(new FileInputStream("my_file_path"))));
long bulkBuilderLength = 0;
String readLine = "";
String index = "my_index_name";
String type = "my_type_name";
String id = "";

while((readLine = br.readLine()) != null){

    id = somefunction(readLine);
    String json = new ObjectMapper().writeValueAsString(readLine);
    bulkBuilder.add(client.prepareIndex(index, type, id)
        .setSource(json));
    bulkBuilderLength++;
    if(bulkBuilderLength % 1000== 0){
        logger.info("##### " + bulkBuilderLength + " data indexed.");
        BulkResponse bulkRes = bulkBuilder.execute().actionGet();
        if(bulkRes.hasFailures()){
            logger.error("##### Bulk Request failure with error: " + bulkRes.buildFailureMessage());
        }
        bulkBuilder = client.prepareBulk();  // This line is my mistake and the solution !!!
    }
}

br.close();

if(bulkBuilder.numberOfActions() > 0){
    logger.info("##### " + bulkBuilderLength + " data indexed.");
    BulkResponse bulkRes = bulkBuilder.execute().actionGet();
    if(bulkRes.hasFailures()){
        logger.error("##### Bulk Request failure with error: " + bulkRes.buildFailureMessage());
    }
    bulkBuilder = client.prepareBulk();
}

dadoonet.. 8

这里的问题是您在批量执行后不再重新创建一个新的Bulk.

这意味着您要一次又一次地重新索引相同的第一个数据.

顺便说一句,看看BulkProcessor类.绝对更好用.



1> dadoonet..:

这里的问题是您在批量执行后不再重新创建一个新的Bulk.

这意味着您要一次又一次地重新索引相同的第一个数据.

顺便说一句,看看BulkProcessor类.绝对更好用.


BulkProcessor是Bulk API之上的一个层.所以吞吐量相同.
推荐阅读
跟我搞对象吧
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有