2.0之后ES的java api用法有了很大变化。在此记录一些。
java应用程序连接ES集群,笔者使用的是TransportClient,获取TransportClient的代码设计为单例模式(见getClient方法)。同时包含了设置自动提交文档的代码。注释比较详细,不再赘述。代码如下:
1 package elasticsearch; 2 3 import com.vividsolutions.jts.geom.GeometryFactory; 4 import com.vividsolutions.jts.geom.MultiPolygon; 5 import com.vividsolutions.jts.geom.Polygon; 6 import com.vividsolutions.jts.io.ParseException; 7 import com.vividsolutions.jts.io.WKTReader; 8 import org.apache.commons.logging.Log; 9 import org.apache.commons.logging.LogFactory; 10 import org.elasticsearch.action.bulk.BulkProcessor; 11 import org.elasticsearch.action.bulk.BulkRequest; 12 import org.elasticsearch.action.bulk.BulkResponse; 13 import org.elasticsearch.client.transport.TransportClient; 14 import org.elasticsearch.common.settings.Settings; 15 import org.elasticsearch.common.transport.InetSocketTransportAddress; 16 import org.elasticsearch.common.unit.ByteSizeUnit; 17 import org.elasticsearch.common.unit.ByteSizeValue; 18 import org.elasticsearch.common.unit.TimeValue; 19 20 import java.net.InetAddress; 21 import java.util.Date; 22 23 /** 24 * Created by ZhangDong on 2015/12/25. 25 */ 26 public class EsClient { 27 static Log log = LogFactory.getLog(EsClient.class); 28 29 // 用于提供单例的TransportClient BulkProcessor 30 static public TransportClient tclient = null; 31 static BulkProcessor staticBulkProcessor = null; 32 33 //【获取TransportClient 的方法】 34 public static TransportClient getClient() { 35 try { 36 if (tclient == null) { 37 String EsHosts = "10.10.2.1:9300,10.10.2.2:9300"; 38 Settings settings = Settings.settingsBuilder() 39 .put("cluster.name", "wshare_es")//设置集群名称 40 .put("tclient.transport.sniff", true).build();//自动嗅探整个集群的状态,把集群中其它机器的ip地址加到客户端中 41 42 tclient = TransportClient.builder().settings(settings).build(); 43 String[] nodes = EsHosts.split(","); 44 for (String node : nodes) { 45 if (node.length() > 0) {//跳过为空的node(当开头、结尾有逗号或多个连续逗号时会出现空node) 46 String[] hostPort = node.split(":"); 47 tclient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(hostPort[0]), Integer.parseInt(hostPort[1]))); 48 49 } 50 } 51 }//if 52 } catch (Exception e) { 53 e.printStackTrace(); 54 } 55 return tclient; 56 } 57 //【设置自动提交文档】 58 public static BulkProcessor getBulkProcessor() { 59 //自动批量提交方式 60 if (staticBulkProcessor == null) { 61 try { 62 staticBulkProcessor = BulkProcessor.builder(getClient(), 63 new BulkProcessor.Listener() { 64 @Override 65 public void beforeBulk(long executionId, BulkRequest request) { 66 //提交前调用 67 // System.out.println(new Date().toString() + " before"); 68 } 69 70 @Override 71 public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { 72 //提交结束后调用(无论成功或失败) 73 // System.out.println(new Date().toString() + " response.hasFailures=" + response.hasFailures()); 74 log.info( "提交" + response.getItems().length + "个文档,用时" 75 + response.getTookInMillis() + "MS" + (response.hasFailures() ? " 有文档提交失败!" : "")); 76 // response.hasFailures();//是否有提交失败 77 } 78 79 @Override 80 public void afterBulk(long executionId, BulkRequest request, Throwable failure) { 81 //提交结束且失败时调用 82 log.error( " 有文档提交失败!after failure=" + failure); 83 } 84 }) 85 86 .setBulkActions(1000)//文档数量达到1000时提交 87 .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))//总文档体积达到5MB时提交 // 88 .setFlushInterval(TimeValue.timeValueSeconds(5))//每5S提交一次(无论文档数量、体积是否达到阈值) 89 .setConcurrentRequests(1)//加1后为可并行的提交请求数,即设为0代表只可1个请求并行,设为1为2个并行 90 .build(); 91 // staticBulkProcessor.awaitClose(10, TimeUnit.MINUTES);//关闭,如有未提交完成的文档则等待完成,最多等待10分钟 92 } catch (Exception e) {//关闭时抛出异常 93 e.printStackTrace(); 94 } 95 }//if 96 97 98 99 100 101 return staticBulkProcessor; 102 } 103 }
ElastiSearch 2.1使用java api获取TransportClient连接ES集群的方法
原文:http://www.cnblogs.com/zhangdong92/p/5192867.html