Elasticsearch Bulk API允许批量提交index和delete请求。
?
BulkRequestBuilder bulkRequest = client.prepareBulk(); bulkRequest.add(client.prepareIndex("index1", "type1", "id1").setSource(source); bulkRequest.add(client.prepareIndex("index2", "type2", "id2").setSource(source); BulkResponse bulkResponse = bulkRequest.execute().actionGet();
?
?
但有时我们需要更精细的批量操控,比如
?
BulkProcessor bulkProcessor = BulkProcessor.builder( client, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { ... } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { ... } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { ... } }) .setBulkActions(10000) .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) .setFlushInterval(TimeValue.timeValueSeconds(5)) .build(); bulkProcessor.add(new IndexRequest("index1", "type1", "id1").source(source1)); bulkProcessor.add(new DeleteRequest("index2", "type2", "id2");
?
?
1.?beforeBulk会在批量提交之前执行,可以从BulkRequest中获取请求信息request.requests()或者请求数量request.numberOfActions()。
2. 第一个afterBulk会在批量成功后执行,可以跟beforeBulk配合计算批量所需时间
3. 第二个afterBulk会在批量失败后执行
4. 在例子中,当请求超过10000个(default=1000)或者总大小超过1GB(default=5MB)时,触发批量提交动作。另外每隔5秒也会提交一次(默认不会根据时间间隔提交)。
原文:http://kane-xie.iteye.com/blog/2242265