首页 > 编程语言 > 详细

ElastiSearch 2.1使用java api获取TransportClient连接ES集群的方法

时间:2016-02-16 16:50:20      阅读:8008      评论:0      收藏:0      [点我收藏+]

2.0之后ES的java api用法有了很大变化。在此记录一些。

java应用程序连接ES集群,笔者使用的是TransportClient,获取TransportClient的代码设计为单例模式(见getClient方法)。同时包含了设置自动提交文档的代码。注释比较详细,不再赘述。代码如下:

 

  1 package elasticsearch;
  2 
  3 import com.vividsolutions.jts.geom.GeometryFactory;
  4 import com.vividsolutions.jts.geom.MultiPolygon;
  5 import com.vividsolutions.jts.geom.Polygon;
  6 import com.vividsolutions.jts.io.ParseException;
  7 import com.vividsolutions.jts.io.WKTReader;
  8 import org.apache.commons.logging.Log;
  9 import org.apache.commons.logging.LogFactory;
 10 import org.elasticsearch.action.bulk.BulkProcessor;
 11 import org.elasticsearch.action.bulk.BulkRequest;
 12 import org.elasticsearch.action.bulk.BulkResponse;
 13 import org.elasticsearch.client.transport.TransportClient;
 14 import org.elasticsearch.common.settings.Settings;
 15 import org.elasticsearch.common.transport.InetSocketTransportAddress;
 16 import org.elasticsearch.common.unit.ByteSizeUnit;
 17 import org.elasticsearch.common.unit.ByteSizeValue;
 18 import org.elasticsearch.common.unit.TimeValue;
 19 
 20 import java.net.InetAddress;
 21 import java.util.Date;
 22 
 23 /**
 24  * Created by ZhangDong on 2015/12/25.
 25  */
 26 public class EsClient {
 27     static Log log = LogFactory.getLog(EsClient.class);
 28 
 29     //    用于提供单例的TransportClient BulkProcessor
 30     static public TransportClient tclient = null;
 31     static BulkProcessor staticBulkProcessor = null;
 32 
 33 //【获取TransportClient 的方法】
 34     public static TransportClient getClient() {
 35         try {
 36             if (tclient == null) {
 37                 String EsHosts = "10.10.2.1:9300,10.10.2.2:9300";
 38                 Settings settings = Settings.settingsBuilder()
 39                         .put("cluster.name", "wshare_es")//设置集群名称
 40                         .put("tclient.transport.sniff", true).build();//自动嗅探整个集群的状态,把集群中其它机器的ip地址加到客户端中
 41 
 42                 tclient = TransportClient.builder().settings(settings).build();
 43                 String[] nodes = EsHosts.split(",");
 44                 for (String node : nodes) {
 45                     if (node.length() > 0) {//跳过为空的node(当开头、结尾有逗号或多个连续逗号时会出现空node)
 46                         String[] hostPort = node.split(":");
 47                         tclient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(hostPort[0]), Integer.parseInt(hostPort[1])));
 48 
 49                     }
 50                 }
 51             }//if
 52         } catch (Exception e) {
 53             e.printStackTrace();
 54         }
 55         return tclient;
 56     }
 57      //【设置自动提交文档】
 58     public static BulkProcessor getBulkProcessor() {
 59         //自动批量提交方式
 60         if (staticBulkProcessor == null) {
 61             try {
 62                 staticBulkProcessor = BulkProcessor.builder(getClient(),
 63                         new BulkProcessor.Listener() {
 64                             @Override
 65                             public void beforeBulk(long executionId, BulkRequest request) {
 66                                 //提交前调用
 67 //                                System.out.println(new Date().toString() + " before");
 68                             }
 69 
 70                             @Override
 71                             public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
 72                                 //提交结束后调用(无论成功或失败)
 73 //                                System.out.println(new Date().toString() + " response.hasFailures=" + response.hasFailures());
 74                                 log.info( "提交" + response.getItems().length + "个文档,用时"
 75                                         + response.getTookInMillis() + "MS" + (response.hasFailures() ? " 有文档提交失败!" : ""));
 76 //                                response.hasFailures();//是否有提交失败
 77                             }
 78 
 79                             @Override
 80                             public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
 81                                 //提交结束且失败时调用
 82                                 log.error( " 有文档提交失败!after failure=" + failure);
 83                             }
 84                         })
 85                         
 86                         .setBulkActions(1000)//文档数量达到1000时提交
 87                         .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))//总文档体积达到5MB时提交 //
 88                         .setFlushInterval(TimeValue.timeValueSeconds(5))//每5S提交一次(无论文档数量、体积是否达到阈值)
 89                         .setConcurrentRequests(1)//加1后为可并行的提交请求数,即设为0代表只可1个请求并行,设为1为2个并行
 90                         .build();
 91 //                staticBulkProcessor.awaitClose(10, TimeUnit.MINUTES);//关闭,如有未提交完成的文档则等待完成,最多等待10分钟
 92             } catch (Exception e) {//关闭时抛出异常
 93                 e.printStackTrace();
 94             }
 95         }//if
 96 
 97 
 98 
 99 
100 
101         return staticBulkProcessor;
102     }
103 }

 

ElastiSearch 2.1使用java api获取TransportClient连接ES集群的方法

原文:http://www.cnblogs.com/zhangdong92/p/5192867.html

(0)
(1)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!