创建ES连接
// 初始化api客户端 public static RestHighLevelClient client = new RestHighLevelClient( RestClient.builder(new HttpHost(url,port, "http")));
将需要插入的数据转化成Map<String, Object>类型
单条数据插入的方法
/**
* 增加索引oalogincount的当前数据
* @param list
*/
public static void addDataOaloginData(List<Map<String, Object>> list) {
for (int i = 0; i < list.size(); i++) {
Map<String, Object> oaloginCountVo = list.get(i);
log.info(oaloginCountVo);
IndexRequest request = new IndexRequest(
INDEX, //索引
INDEX, // mapping type
oaloginCountVo.get("_id")+""); //文档id
oaloginCountVo.remove("_id"); //因为在插入数据时,不需要id,所以在这里将id remove掉
request.source(oaloginCountVo);
/*
request.routing("routing"); //设置routing值
request.timeout(TimeValue.timeValueSeconds(1)); //设置主分片等待时长
request.version(2); //设置版本号
*/
request.setRefreshPolicy("wait_for"); //设置重刷新策略
request.opType(DocWriteRequest.OpType.CREATE); //操做类别
//四、发送请求
IndexResponse indexResponse = null;
try {
// 同步方式
indexResponse = client.index(request);
} catch(Exception e) {
log.error("索引异常", e);
}
//五、处理响应
if(indexResponse != null) {
String index = indexResponse.getIndex();
String type = indexResponse.getType();
String id = indexResponse.getId();
long version = indexResponse.getVersion();
Result result = indexResponse.getResult();
}
}
}
因为单条插入每一条都需要连接es,所以不太建议,建议直接批量插入
public static void addDataOaloginDataAll(List<Map<String, Object>> list) {
BulkRequest request = new BulkRequest();
for (int i = 0; i < list.size(); i++) {
Map<String, Object> oaloginCountVo = list.get(i);
String _id = (String) oaloginCountVo.get("_id");
oaloginCountVo.remove("_id");
request.add(new IndexRequest(INDEX, INDEX, _id)
.source(oaloginCountVo)); //文档id
//三、发送请求 同步请求
}
try {
BulkResponse bulkResponse = client.bulk(request);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
原文:https://www.cnblogs.com/xhj99/p/15064954.html