Elasticsearch Java API有四类client连接方式
TransportClient
RestClient
Jest
Spring Data Elasticsearch
其中TransportClient和RestClient是Elasticsearch原生的api。TransportClient可以支持2.x,5.x版本,TransportClient将会在Elasticsearch 7.0弃用并在8.0中完成删除,替而代之,我们使用Java High Level REST Client,它使用HTTP请求而不是Java序列化请求。
Jest是Java社区开发的,是Elasticsearch的Java Http Rest客户端;Spring Data Elasticsearch是spring集成的Elasticsearch开发包。
建议:TransportClient将会在后面的版本中弃用,因此不推荐后续使用;而Jest由于是社区维护,所以更新有一定延迟,目前最新版对接ES6.3.1,近一个月只有四个issue,说明整体活跃度较低,因此也不推荐使用;Spring Data Elasticsearch主要是与Spring生态对接,可以在web系统中整合到Spring中使用。目前比较推荐使用官方的高阶、低阶Rest Client,官方维护,比较值得信赖
。本文主要介绍高阶Client。
maven的依赖包:
<dependencies>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>6.4.3</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.4.3</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.4.3</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>6.4.3</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.11.1</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>elastic-lucene-snapshots</id>
<name>Elastic Lucene Snapshots</name>
<url>http://s3.amazonaws.com/download.elasticsearch.org/lucenesnapshots/00142c9</url>
<releases><enabled>true</enabled></releases>
<snapshots><enabled>false</enabled></snapshots>
</repository>
</repositories>
RestHighLevelAPI分为Document APIs、Search APIs、Miscellaneous APIs、Indices APIs、Cluster APIs...等等,这里主要介绍常用的Document APIs和Search APIs,其余的APIs可以参考:https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high.html
public class RestClientUtils {
private RestHighLevelClient client = null;
public RestClientUtils() {
if (client == null){
synchronized (RestHighLevelClient.class){
if (client == null){
client = getClient();
}
}
}
}
private RestHighLevelClient getClient(){
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("172.24.112.17", 9200, "http"),
new HttpHost("172.24.112.17", 9201, "http")));
return client;
}
public void closeClient(){
try {
if (client != null){
client.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
/*------------------------------------------------ document Api start --------------------------------------------*/
/**
* 增,改数据
* @param indexName index名字
* @param typeName type名字
* @param id id
* @param jsonStr 增加或修改的数据json字符串格式
* @throws Exception
*/
public void index(String indexName,String typeName,String id,String jsonStr) throws Exception{
IndexRequest request = new IndexRequest(indexName,typeName,id);
request.source(jsonStr, XContentType.JSON);
IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);//同步
String index = indexResponse.getIndex();
String type = indexResponse.getType();
String ID = indexResponse.getId();
long version = indexResponse.getVersion();
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
System.out.println("index: " + index);
System.out.println("type: " + type);
System.out.println("id: " + ID);
System.out.println("version: " + version);
System.out.println("status: " + "CREATED");
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
System.out.println("index: " + index);
System.out.println("type: " + type);
System.out.println("id: " + ID);
System.out.println("version: " + version);
System.out.println("status: " + "UPDATED");
}
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
String reason = failure.reason();
}
}
}
/**
* 根据 id 获取数据
* @throws Exception
*/
public void get(String indexName,String typeName,String id) throws Exception{
GetRequest request = new GetRequest(indexName, typeName, id);
//可以自定义要查询的具体key
/*request.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);
String[] includes = new String[]{"name", "price"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
request.fetchSourceContext(fetchSourceContext);*/
//Synchronous Execution
GetResponse getResponse = client.get(request, RequestOptions.DEFAULT);
//Get Response
String index = getResponse.getIndex();
String type = getResponse.getType();
String ID = getResponse.getId();
if (getResponse.isExists()) {
long version = getResponse.getVersion();
String sourceAsString = getResponse.getSourceAsString();
System.out.println("index: " + index);
System.out.println("type: " + type);
System.out.println("id: " + ID);
System.out.println("version: " + version);
System.out.println(sourceAsString);
} else {
System.out.println("没有查询到结果");
}
}
/**
* 存在
* @throws Exception
*/
public boolean exists(String indexName,String typeName,String id) throws Exception{
GetRequest getRequest = new GetRequest(indexName, typeName, id);
getRequest.fetchSourceContext(new FetchSourceContext(false));
getRequest.storedFields("_none_");
//Synchronous Execution
boolean exists = client.exists(getRequest, RequestOptions.DEFAULT);
return exists;
}
public void delete(String indexName,String typeName,String id) throws Exception{
DeleteRequest request = new DeleteRequest(indexName, typeName, id);
//Synchronous Execution
DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
// document was not found
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
System.out.println("要删除的数据不存在");
}else {
//Delete Response
String index = deleteResponse.getIndex();
String type = deleteResponse.getType();
String ID = deleteResponse.getId();
long version = deleteResponse.getVersion();
System.out.println("index: " + index);
System.out.println("type: " + type);
System.out.println("id: " + ID);
System.out.println("version: " + version);
System.out.println("status: " + "DELETE");
ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
String reason = failure.reason();
}
}
}
}
public void update(String indexName, String typeName, String id, String jsonStr) throws Exception {
UpdateRequest request = new UpdateRequest(indexName, typeName, id);
request.doc(jsonStr, XContentType.JSON);
try{
//Synchronous Execution
UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
//update Response
String index = updateResponse.getIndex();
String type = updateResponse.getType();
String ID = updateResponse.getId();
long version = updateResponse.getVersion();
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
System.out.println("index: " + index);
System.out.println("type: " + type);
System.out.println("id: " + ID);
System.out.println("version: " + version);
System.out.println("status: " + "CREATED");
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
System.out.println("index: " + index);
System.out.println("type: " + type);
System.out.println("id: " + ID);
System.out.println("version: " + version);
System.out.println("status: " + "UPDATED");
} else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
System.out.println("index: " + index);
System.out.println("type: " + type);
System.out.println("id: " + ID);
System.out.println("version: " + version);
System.out.println("status: " + "DELETED");
} else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
System.out.println("index: " + index);
System.out.println("type: " + type);
System.out.println("id: " + ID);
System.out.println("version: " + version);
System.out.println("status: " + "NOOP");
}
}catch (ElasticsearchException e) {
if (e.status() == RestStatus.NOT_FOUND) {
System.out.println("要修改的内容不存在");
}
}
}
/**
* 根据id批量获取数据
* @throws Exception
*/
public void multiGet(String indexName,String typeName,String ...ids) throws Exception{
MultiGetRequest request = new MultiGetRequest();
for(String str:ids){
request.add(new MultiGetRequest.Item(indexName, typeName, str));
}
//Synchronous Execution
MultiGetResponse responses = client.mget(request, RequestOptions.DEFAULT);
//Multi Get Response
MultiGetItemResponse[] Items = responses.getResponses();
for(MultiGetItemResponse Item:Items){
GetResponse response = Item.getResponse();
String index = response.getIndex();
String type = response.getType();
String id = response.getId();
if (response.isExists()) {
long version = response.getVersion();
String sourceAsString = response.getSourceAsString();
System.out.println("index: " + index);
System.out.println("type: " + type);
System.out.println("id: " + id);
System.out.println("version: " + version);
System.out.println(sourceAsString+"\n");
} else {
System.out.println("不存在");
}
}
}
/**
* 单次请求批量处理数据,可以同时操作增删改
* 注意:bulk方法操作增删改时只能用json格式,其他类似map方式会报错
* @throws Exception
*/
public void bulk() throws Exception{
BulkRequest request = new BulkRequest();
String jsonString="{\n" +
" \"name\":\"dior chengyi\",\n" +
" \"desc\":\"shishang gaodang\",\n" +
" \"price\":7000,\n" +
" \"producer\":\"dior producer\",\n" +
" \"tags\":[\"shishang\",\"shechi\"]\n" +
"}";
request.add(new IndexRequest("lyh_index", "user", "1")
.source(jsonString,XContentType.JSON));
String updateJson="{\n" +
" \"other\":\"test1\"\n" +
"}";
request.add(new UpdateRequest("lyh_index", "user", "2")
.doc(updateJson,XContentType.JSON));
request.add(new DeleteRequest("lyh_index", "user", "3"));
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
for (BulkItemResponse bulkItemResponse : bulkResponse) {
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
|| bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
IndexResponse indexResponse = (IndexResponse) itemResponse;
String index = indexResponse.getIndex();
String type = indexResponse.getType();
String id = indexResponse.getId();
long version = indexResponse.getVersion();
System.out.println("index: " + index);
System.out.println("type: " + type);
System.out.println("id: " + id);
System.out.println("version: " + version);
System.out.println("status: " + "CREATED"+"\n");
} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
String index = updateResponse.getIndex();
String type = updateResponse.getType();
String id = updateResponse.getId();
long version = updateResponse.getVersion();
System.out.println("index: " + index);
System.out.println("type: " + type);
System.out.println("id: " + id);
System.out.println("version: " + version);
System.out.println("status: " + "UPDATE"+"\n");
} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
String index = deleteResponse.getIndex();
String type = deleteResponse.getType();
String id = deleteResponse.getId();
long version = deleteResponse.getVersion();
System.out.println("index: " + index);
System.out.println("type: " + type);
System.out.println("id: " + id);
System.out.println("version: " + version);
System.out.println("status: " + "DELETE"+"\n");
}
}
}
/*------------------------------------------------ document Api end ----------------------------------------------*/
/*------------------------------------------------ search Api 多条件查询 start ----------------------------------------------*/
/**
* 查询模板
* @throws Exception
*/
public void searchTemplate(String indexName,String JsonStr,Map<String, Object> scriptParams) throws Exception{
//Inline Templates
SearchTemplateRequest request = new SearchTemplateRequest();
request.setRequest(new SearchRequest(indexName));
request.setScriptType(ScriptType.INLINE);
request.setScript(JsonStr);
request.setScriptParams(scriptParams);
//Synchronous Execution
SearchTemplateResponse response = client.searchTemplate(request, RequestOptions.DEFAULT);
//SearchTemplate Response
SearchResponse searchResponse = response.getResponse();
//Retrieving SearchHits 获取结果数据
SearchHits hits = searchResponse.getHits();
long totalHits = hits.getTotalHits();
float maxScore = hits.getMaxScore();
System.out.println("totalHits: " + totalHits);
System.out.println("maxScore: " + maxScore);
System.out.println("------------------------------------------");
SearchHit[] searchHits = hits.getHits();
for (SearchHit hit : searchHits) {
// do something with the SearchHit
String index = hit.getIndex();
String type = hit.getType();
String id = hit.getId();
float score = hit.getScore();
String sourceAsString = hit.getSourceAsString();
System.out.println("index: " + index);
System.out.println("type: " + type);
System.out.println("id: " + id);
System.out.println("score: " + score);
System.out.println(sourceAsString);
System.out.println("------------------------------------------");
}
//得到aggregations下内容
Aggregations aggregations = searchResponse.getAggregations();
if(aggregations!=null){
Map<String, Aggregation> aggregationMap = aggregations.getAsMap();
Terms companyAggregation = (Terms) aggregationMap.get("group_by_tags");
List<? extends Terms.Bucket> buckets = companyAggregation.getBuckets();
for(Terms.Bucket bk:buckets){
Object key = bk.getKey();
long docCount = bk.getDocCount();
System.out.println("key: "+key.toString());
System.out.println("doc_count: "+docCount);
}
}
}
}
测试类:
import java.util.HashMap;
import java.util.Map;
public class ElasticSearchDemo {
public static void main(String[] args) throws Exception{
RestClientUtils client = new RestClientUtils();
/*String jsonString="{\n" +
" \"name\":\"dior chengyi\",\n" +
" \"desc\":\"shishang gaodang\",\n" +
" \"price\":7000,\n" +
" \"producer\":\"dior producer\",\n" +
" \"tags\":[\"shishang\",\"shechi\"]\n" +
"}";
String jsonString1="{\n" +
" \"name\":\"hailanzhijia chengyi\",\n" +
" \"desc\":\"shangwu xiuxian\",\n" +
" \"price\":200,\n" +
" \"producer\":\"hailanzhijia producer\",\n" +
" \"tags\":[\"xiuxian\"]\n" +
"}";
String jsonString2="{\n" +
" \"name\":\"kama chengyi\",\n" +
" \"desc\":\"shangwu xiuxian\",\n" +
" \"price\":300,\n" +
" \"producer\":\"kama producer\",\n" +
" \"tags\":[\"shishang\"]\n" +
"}";
client.index("lyh_index","user","1",jsonString);
client.index("lyh_index","user","2",jsonString1);
client.index("lyh_index","user","3",jsonString2);*/
//client.get("lyh_index","user","3");
/*boolean exists = client.exists("lyh_index", "user", "1");
boolean exists1 = client.exists("lyh_index", "user", "4");
System.out.println(exists+"\t"+exists1);*/
//client.delete("lyh_index", "user", "3");
/*String updateJson="{\"price2\":50000}";
client.update("lyh_index", "user", "2",updateJson);*/
//client.multiGet("lyh_index", "user","1","2","3");
String json="{\n" +
" \"query\": {\n" +
" \"match\": {\n" +
" \"{{name}}\": \"{{chengyi}}\"\n" +
" }\n" +
" }, \n" +
" \"aggs\": {\n" +
" \"{{group_by_tags}}\": {\n" +
" \"terms\": {\n" +
" \"field\": \"{{tags}}\"\n" +
" }\n" +
" }\n" +
" }\n" +
"}";
Map<String, Object> map = new HashMap<>();
map.put("name","name");
map.put("chengyi","chengyi");
map.put("group_by_tags","group_by_tags");
map.put("tags","tags");
client.searchTemplate("lyh_index",json,map);
//client.bulk();
client.closeClient();
}
}