有人可以建议如何使用函数elasticsearch.helpers.streaming_bulk而不是elasticsearch.helpers.bulk将数据索引到elasticsearch中.
如果我只是改变streaming_bulk而不是批量,则没有任何内容被索引,所以我想它需要以不同的形式使用.
下面的代码从500个elemens的块中的CSV文件创建索引,类型和索引数据到elasticsearch.它工作正常,但我在徘徊是否有可能提高性能.这就是我想尝试使用streaming_bulk函数的原因.
目前我需要10分钟为100MB的CSV文档索引100万行.我使用两台机器,Centos 6.6,8 CPU-s,x86_64,CPU MHz:2499.902,Mem:15.574G总计.不确定它会更快.
es = elasticsearch.Elasticsearch([{'host': 'uxmachine-test', 'port': 9200}]) index_name = 'new_index' type_name = 'new_type' mapping = json.loads(open(config["index_mapping"]).read()) #read mapping from json file es.indices.create(index_name) es.indices.put_mapping(index=index_name, doc_type=type_name, body=mapping) with open(file_to_index, 'rb') as csvfile: reader = csv.reader(csvfile) #read documents for indexing from CSV file, more than million rows content = {"_index": index_name, "_type": type_name} batch_chunks = [] iterator = 0 for row in reader: var = transform_row_for_indexing(row,fields, index_name, type_name,id_name,id_increment) id_increment = id_increment + 1 #var = transform_row_for_indexing(row,fields, index_name, type_name) batch_chunks.append(var) if iterator % 500 == 0: helpers.bulk(es,batch_chunks) del batch_chunks[:] print "ispucalo batch" iterator = iterator + 1 # indexing of last batch_chunk if len(batch_chunks) != 0: helpers.bulk(es,batch_chunks)
Christopher .. 7
因此,流量批量返回一个交互器.这意味着在您开始迭代之前不会发生任何事情.'bulk'函数的代码如下所示:
success, failed = 0, 0 # list of errors to be collected is not stats_only errors = [] for ok, item in streaming_bulk(client, actions, **kwargs): # go through request-reponse pairs and detect failures if not ok: if not stats_only: errors.append(item) failed += 1 else: success += 1 return success, failed if stats_only else errors
所以基本上只调用streaming_bulk(客户端,动作,**kwargs)实际上不会做任何事情.直到你迭代它,就像在for循环中完成索引实际开始发生一样.
所以在你的代码中.欢迎您将'批量'更改为'streaming_bulk',但是您需要迭代流量批量的结果,以便实际上有任何索引.
因此,流量批量返回一个交互器.这意味着在您开始迭代之前不会发生任何事情.'bulk'函数的代码如下所示:
success, failed = 0, 0 # list of errors to be collected is not stats_only errors = [] for ok, item in streaming_bulk(client, actions, **kwargs): # go through request-reponse pairs and detect failures if not ok: if not stats_only: errors.append(item) failed += 1 else: success += 1 return success, failed if stats_only else errors
所以基本上只调用streaming_bulk(客户端,动作,**kwargs)实际上不会做任何事情.直到你迭代它,就像在for循环中完成索引实际开始发生一样.
所以在你的代码中.欢迎您将'批量'更改为'streaming_bulk',但是您需要迭代流量批量的结果,以便实际上有任何索引.