ElasticSearch java API

Elasticsearch Java API有四类client连接方式

TransportClient
RestClient
Jest
Spring Data Elasticsearch

其中TransportClient和RestClient是Elasticsearch原生的api。TransportClient可以支持2.x,5.x版本,TransportClient将会在Elasticsearch 7.0弃用并在8.0中完成删除,替而代之,我们使用Java High Level REST Client,它使用HTTP请求而不是Java序列化请求。

Jest是Java社区开发的,是Elasticsearch的Java Http Rest客户端;Spring Data Elasticsearch是spring集成的Elasticsearch开发包。

建议:TransportClient将会在后面的版本中弃用,因此不推荐后续使用;而Jest由于是社区维护,所以更新有一定延迟,目前最新版对接ES6.3.1,近一个月只有四个issue,说明整体活跃度较低,因此也不推荐使用;Spring Data Elasticsearch主要是与Spring生态对接,可以在web系统中整合到Spring中使用。目前比较推荐使用官方的高阶、低阶Rest Client,官方维护,比较值得信赖。本文主要介绍高阶Client。

maven的依赖包:

<dependencies>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>transport</artifactId>
            <version>6.4.3</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>6.4.3</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>6.4.3</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
            <version>6.4.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.11.1</version>
        </dependency>
    </dependencies>

    <repositories>
        <repository>
            <id>elastic-lucene-snapshots</id>
            <name>Elastic Lucene Snapshots</name>
            <url>http://s3.amazonaws.com/download.elasticsearch.org/lucenesnapshots/00142c9</url>
            <releases><enabled>true</enabled></releases>
            <snapshots><enabled>false</enabled></snapshots>
        </repository>
    </repositories>

RestHighLevelAPI分为Document APIs、Search APIs、Miscellaneous APIs、Indices APIs、Cluster APIs...等等,这里主要介绍常用的Document APIs和Search APIs,其余的APIs可以参考:https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high.html

public class RestClientUtils {

    private RestHighLevelClient client = null;
    
    public RestClientUtils() {
        if (client == null){
            synchronized (RestHighLevelClient.class){
                if (client == null){
                    client = getClient();
                }
            }
        }
    }

    private RestHighLevelClient getClient(){
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("172.24.112.17", 9200, "http"),
                        new HttpHost("172.24.112.17", 9201, "http")));

        return client;
    }

    public void closeClient(){
        try {
            if (client != null){
                client.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    /*------------------------------------------------ document Api start --------------------------------------------*/

    /**
     * 增,改数据
     * @param indexName index名字
     * @param typeName type名字
     * @param id id
     * @param jsonStr 增加或修改的数据json字符串格式
     * @throws Exception
     */
    public void index(String indexName,String typeName,String id,String jsonStr) throws Exception{
        IndexRequest request = new IndexRequest(indexName,typeName,id);

        request.source(jsonStr, XContentType.JSON);
        IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);//同步

        String index = indexResponse.getIndex();
        String type = indexResponse.getType();
        String ID = indexResponse.getId();
        long version = indexResponse.getVersion();
        if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
            System.out.println("index: " + index);
            System.out.println("type: " + type);
            System.out.println("id: " + ID);
            System.out.println("version: " + version);
            System.out.println("status: " + "CREATED");
        } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
            System.out.println("index: " + index);
            System.out.println("type: " + type);
            System.out.println("id: " + ID);
            System.out.println("version: " + version);
            System.out.println("status: " + "UPDATED");
        }
        ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
        if (shardInfo.getFailed() > 0) {
            for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
                String reason = failure.reason();
            }
        }
    }

    /**
     * 根据 id 获取数据
     * @throws Exception
     */
    public void get(String indexName,String typeName,String id) throws Exception{
        GetRequest request = new GetRequest(indexName, typeName, id);

        //可以自定义要查询的具体key
        /*request.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);
        String[] includes = new String[]{"name", "price"};
        String[] excludes = Strings.EMPTY_ARRAY;
        FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
        request.fetchSourceContext(fetchSourceContext);*/

        //Synchronous Execution
        GetResponse getResponse = client.get(request, RequestOptions.DEFAULT);

        //Get Response
        String index = getResponse.getIndex();
        String type = getResponse.getType();
        String ID = getResponse.getId();
        if (getResponse.isExists()) {
            long version = getResponse.getVersion();
            String sourceAsString = getResponse.getSourceAsString();
            System.out.println("index: " + index);
            System.out.println("type: " + type);
            System.out.println("id: " + ID);
            System.out.println("version: " + version);
            System.out.println(sourceAsString);
        } else {
            System.out.println("没有查询到结果");
        }
    }

    /**
     * 存在
     * @throws Exception
     */
    public boolean exists(String indexName,String typeName,String id) throws Exception{
        GetRequest getRequest = new GetRequest(indexName, typeName, id);
        getRequest.fetchSourceContext(new FetchSourceContext(false));
        getRequest.storedFields("_none_");

        //Synchronous Execution
        boolean exists = client.exists(getRequest, RequestOptions.DEFAULT);
        return exists;
    }

    public void delete(String indexName,String typeName,String id) throws Exception{
        DeleteRequest request = new DeleteRequest(indexName, typeName, id);

        //Synchronous Execution
        DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);

        // document was not found
        if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
            System.out.println("要删除的数据不存在");
        }else {
            //Delete Response
            String index = deleteResponse.getIndex();
            String type = deleteResponse.getType();
            String ID = deleteResponse.getId();
            long version = deleteResponse.getVersion();
            System.out.println("index: " + index);
            System.out.println("type: " + type);
            System.out.println("id: " + ID);
            System.out.println("version: " + version);
            System.out.println("status: " + "DELETE");
            ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
            if (shardInfo.getFailed() > 0) {
                for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
                    String reason = failure.reason();
                }
            }
        }
    }

    public void update(String indexName, String typeName, String id, String jsonStr) throws Exception {
        UpdateRequest request = new UpdateRequest(indexName, typeName, id);

        request.doc(jsonStr, XContentType.JSON);
        try{
            //Synchronous Execution
            UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);

            //update Response
            String index = updateResponse.getIndex();
            String type = updateResponse.getType();
            String ID = updateResponse.getId();
            long version = updateResponse.getVersion();
            if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
                System.out.println("index: " + index);
                System.out.println("type: " + type);
                System.out.println("id: " + ID);
                System.out.println("version: " + version);
                System.out.println("status: " + "CREATED");
            } else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
                System.out.println("index: " + index);
                System.out.println("type: " + type);
                System.out.println("id: " + ID);
                System.out.println("version: " + version);
                System.out.println("status: " + "UPDATED");
            } else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
                System.out.println("index: " + index);
                System.out.println("type: " + type);
                System.out.println("id: " + ID);
                System.out.println("version: " + version);
                System.out.println("status: " + "DELETED");
            } else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
                System.out.println("index: " + index);
                System.out.println("type: " + type);
                System.out.println("id: " + ID);
                System.out.println("version: " + version);
                System.out.println("status: " + "NOOP");
            }
        }catch (ElasticsearchException e) {
            if (e.status() == RestStatus.NOT_FOUND) {
                System.out.println("要修改的内容不存在");
            }
        }
    }

    /**
     * 根据id批量获取数据
     * @throws Exception
     */
    public void multiGet(String indexName,String typeName,String ...ids) throws Exception{
        MultiGetRequest request = new MultiGetRequest();

        for(String str:ids){
            request.add(new MultiGetRequest.Item(indexName, typeName, str));
        }

        //Synchronous Execution
        MultiGetResponse responses = client.mget(request, RequestOptions.DEFAULT);

        //Multi Get Response
        MultiGetItemResponse[] Items = responses.getResponses();
        for(MultiGetItemResponse Item:Items){
            GetResponse response = Item.getResponse();
            String index = response.getIndex();
            String type = response.getType();
            String id = response.getId();
            if (response.isExists()) {
                long version = response.getVersion();
                String sourceAsString = response.getSourceAsString();
                System.out.println("index: " + index);
                System.out.println("type: " + type);
                System.out.println("id: " + id);
                System.out.println("version: " + version);
                System.out.println(sourceAsString+"\n");
            } else {
                System.out.println("不存在");
            }
        }
    }

    /**
     * 单次请求批量处理数据,可以同时操作增删改
     * 注意:bulk方法操作增删改时只能用json格式,其他类似map方式会报错
     * @throws Exception
     */
    public void bulk() throws Exception{
        BulkRequest request = new BulkRequest();
        String jsonString="{\n" +
                "  \"name\":\"dior chengyi\",\n" +
                "  \"desc\":\"shishang gaodang\",\n" +
                "  \"price\":7000,\n" +
                "  \"producer\":\"dior producer\",\n" +
                "  \"tags\":[\"shishang\",\"shechi\"]\n" +
                "}";
        request.add(new IndexRequest("lyh_index", "user", "1")
                .source(jsonString,XContentType.JSON));
        String updateJson="{\n" +
                "  \"other\":\"test1\"\n" +
                "}";
        request.add(new UpdateRequest("lyh_index", "user", "2")
                .doc(updateJson,XContentType.JSON));
        request.add(new DeleteRequest("lyh_index", "user", "3"));

        BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
        for (BulkItemResponse bulkItemResponse : bulkResponse) {
            DocWriteResponse itemResponse = bulkItemResponse.getResponse();

            if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
                    || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
                IndexResponse indexResponse = (IndexResponse) itemResponse;
                String index = indexResponse.getIndex();
                String type = indexResponse.getType();
                String id = indexResponse.getId();
                long version = indexResponse.getVersion();
                System.out.println("index: " + index);
                System.out.println("type: " + type);
                System.out.println("id: " + id);
                System.out.println("version: " + version);
                System.out.println("status: " + "CREATED"+"\n");
            } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
                UpdateResponse updateResponse = (UpdateResponse) itemResponse;
                String index = updateResponse.getIndex();
                String type = updateResponse.getType();
                String id = updateResponse.getId();
                long version = updateResponse.getVersion();
                System.out.println("index: " + index);
                System.out.println("type: " + type);
                System.out.println("id: " + id);
                System.out.println("version: " + version);
                System.out.println("status: " + "UPDATE"+"\n");
            } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
                DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
                String index = deleteResponse.getIndex();
                String type = deleteResponse.getType();
                String id = deleteResponse.getId();
                long version = deleteResponse.getVersion();
                System.out.println("index: " + index);
                System.out.println("type: " + type);
                System.out.println("id: " + id);
                System.out.println("version: " + version);
                System.out.println("status: " + "DELETE"+"\n");
            }
        }
    }

    /*------------------------------------------------ document Api end ----------------------------------------------*/
/*------------------------------------------------ search Api 多条件查询 start ----------------------------------------------*/
/**
     * 查询模板
     * @throws Exception
     */
    public void searchTemplate(String indexName,String JsonStr,Map<String, Object> scriptParams) throws Exception{
        //Inline Templates
        SearchTemplateRequest request = new SearchTemplateRequest();
        request.setRequest(new SearchRequest(indexName));
        request.setScriptType(ScriptType.INLINE);
        request.setScript(JsonStr);

        request.setScriptParams(scriptParams);

        //Synchronous Execution
        SearchTemplateResponse response = client.searchTemplate(request, RequestOptions.DEFAULT);

        //SearchTemplate Response
        SearchResponse searchResponse = response.getResponse();
        //Retrieving SearchHits 获取结果数据
        SearchHits hits = searchResponse.getHits();
        long totalHits = hits.getTotalHits();
        float maxScore = hits.getMaxScore();
        System.out.println("totalHits: " + totalHits);
        System.out.println("maxScore: " + maxScore);
        System.out.println("------------------------------------------");
        SearchHit[] searchHits = hits.getHits();
        for (SearchHit hit : searchHits) {
            // do something with the SearchHit
            String index = hit.getIndex();
            String type = hit.getType();
            String id = hit.getId();
            float score = hit.getScore();

            String sourceAsString = hit.getSourceAsString();
            System.out.println("index: " + index);
            System.out.println("type: " + type);
            System.out.println("id: " + id);
            System.out.println("score: " + score);
            System.out.println(sourceAsString);
            System.out.println("------------------------------------------");
        }
        //得到aggregations下内容
        Aggregations aggregations = searchResponse.getAggregations();
        if(aggregations!=null){
            Map<String, Aggregation> aggregationMap = aggregations.getAsMap();
            Terms companyAggregation = (Terms) aggregationMap.get("group_by_tags");
            List<? extends Terms.Bucket> buckets = companyAggregation.getBuckets();
            for(Terms.Bucket bk:buckets){
                Object key = bk.getKey();
                long docCount = bk.getDocCount();
                System.out.println("key: "+key.toString());
                System.out.println("doc_count: "+docCount);
            }
        }
    }
}

测试类:

import java.util.HashMap;
import java.util.Map;

public class ElasticSearchDemo {

    public static void main(String[] args) throws Exception{
        RestClientUtils client = new RestClientUtils();
        /*String jsonString="{\n" +
                "  \"name\":\"dior chengyi\",\n" +
                "  \"desc\":\"shishang gaodang\",\n" +
                "  \"price\":7000,\n" +
                "  \"producer\":\"dior producer\",\n" +
                "  \"tags\":[\"shishang\",\"shechi\"]\n" +
                "}";
        String jsonString1="{\n" +
                "  \"name\":\"hailanzhijia chengyi\",\n" +
                "  \"desc\":\"shangwu xiuxian\",\n" +
                "  \"price\":200,\n" +
                "  \"producer\":\"hailanzhijia producer\",\n" +
                "  \"tags\":[\"xiuxian\"]\n" +
                "}";
        String jsonString2="{\n" +
                "  \"name\":\"kama chengyi\",\n" +
                "  \"desc\":\"shangwu xiuxian\",\n" +
                "  \"price\":300,\n" +
                "  \"producer\":\"kama producer\",\n" +
                "  \"tags\":[\"shishang\"]\n" +
                "}";
        client.index("lyh_index","user","1",jsonString);
        client.index("lyh_index","user","2",jsonString1);
        client.index("lyh_index","user","3",jsonString2);*/

        //client.get("lyh_index","user","3");

        /*boolean exists = client.exists("lyh_index", "user", "1");
        boolean exists1 = client.exists("lyh_index", "user", "4");
        System.out.println(exists+"\t"+exists1);*/

        //client.delete("lyh_index", "user", "3");

        /*String updateJson="{\"price2\":50000}";
        client.update("lyh_index", "user", "2",updateJson);*/

        //client.multiGet("lyh_index", "user","1","2","3");

        String json="{\n" +
                "  \"query\": {\n" +
                "    \"match\": {\n" +
                "      \"{{name}}\": \"{{chengyi}}\"\n" +
                "    }\n" +
                "  }, \n" +
                "  \"aggs\": {\n" +
                "    \"{{group_by_tags}}\": {\n" +
                "      \"terms\": {\n" +
                "        \"field\": \"{{tags}}\"\n" +
                "      }\n" +
                "    }\n" +
                "  }\n" +
                "}";
        Map<String, Object> map = new HashMap<>();
        map.put("name","name");
        map.put("chengyi","chengyi");
        map.put("group_by_tags","group_by_tags");
        map.put("tags","tags");

        client.searchTemplate("lyh_index",json,map);

        //client.bulk();
        client.closeClient();
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,776评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,527评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,361评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,430评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,511评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,544评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,561评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,315评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,763评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,070评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,235评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,911评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,554评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,173评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,424评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,106评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,103评论 2 352