elasticsearch http 客户端 Jest 的使用

jest github 地址:https://github.com/searchbox-io/Jest
引入 jar 包:

    <dependency>
        <groupId>io.searchbox</groupId>
        <artifactId>jest</artifactId>
        <version>5.3.3</version>
    </dependency>

初始化

JestClient client;

JestClientFactory factory = new JestClientFactory();
    factory.setHttpClientConfig(new HttpClientConfig
            .Builder(Arrays.asList(urls))
            .multiThreaded(true)
            .defaultMaxTotalConnectionPerRoute(Integer.valueOf(maxTotal))
            .maxTotalConnection(Integer.valueOf(perTotal))
            .build());
    this.client = factory.getObject();

基本操作

//创建索引
client.execute(new CreateIndex.Builder(index).build());

//创建 type
PutMapping.Builder builder = new PutMapping.Builder(index, type, mapping);
JestResult jestResult = client.execute(builder.build());
if (!jestResult.isSucceeded()) {
    //失败
}

//删除索引
client.execute(new DeleteIndex.Builder(index).build());

操作

    /**
     * 获取对象
     * 可以加路由:setParameter(Parameters.ROUTING, "")
     */
    public <T> T getData(String index, String type, String _id, Class<T> clazz) {
        Get get = new Get.Builder(index, _id).type(type).build();
        
        JestResult result = client.execute(get);
        if (result.isSucceeded()) {
            return result.getSourceAsObject(clazz);
        }
    }
    
    /**
     * 写入数据
     * 对象里要有 id
     */
    public <T> void insertData(String index, String type, List<T> list) {
        Index action = new Index.Builder(list).index(index).type(type).build();
        try {
            client.execute(action);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 功能描述:插入数据
     * @entity 里面最好有@JestId id,要不然会自动生成一个
     */
    public <T> void insertData(String index, String type, T entity) {
        Index action = new Index.Builder(entity).index(index).type(type).build();
        try {
            client.execute(action);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 功能描述:批量插入数据
     * @param index    索引名
     * @param type     类型
     * @param dataList 批量数据
     */
    public <T> void bulkInsertData(String index, String type, List<T> dataList) throws Exception{

        List<Index> actions = new ArrayList<>();
        assert dataList != null;
        dataList.forEach(item -> {
            actions.add(new Index.Builder(item).build());
        });

        Bulk bulk = new Bulk.Builder()
                .defaultIndex(index)
                .defaultType(type)
                .addAction(actions)
                .build();

        client.execute(bulk);
    }

异步操作

    //异步写入
    /**
     * 功能描述:插入数据
     *
     * @param index 索引名
     * @param type  类型
     * @entity 里面最好有 id,要不然会自动生成一个
     */
    public <T> void insertDataAsync(String index, String type, T entity) {
        Index action = new Index.Builder(entity).index(index).type(type).build();

        client.executeAsync(action, new JestResultHandler<JestResult>() {
            @Override
            public void completed(JestResult result) {
                logger.debug("insert success");
            }

            @Override
            public void failed(Exception e) {
                e.printStackTrace();
            }
        });
    }
    
    /**
     * 功能描述:异步更新数据
     */
    public <T> void updateDataAsync(String index, String type, String _id, T entity) {

        client.executeAsync(new Update.Builder(entity).id(_id)
                .index(index)
                .type(type)
                .build(), new JestResultHandler<JestResult>() {
            @Override
            public void completed(JestResult result) {
                logger.debug("insert success");
            }

            @Override
            public void failed(Exception e) {
                e.printStackTrace();
            }
        });
    }
    
    /**
     * 功能描述:异步批量插入数据
     */
    public <T> void bulkInsertData(String index, String type, List<T> dataList) throws Exception{

        List<Index> actions = new ArrayList<>();
        assert dataList != null;
        dataList.forEach(item -> {
            actions.add(new Index.Builder(item).build());
        });

        Bulk bulk = new Bulk.Builder()
                .defaultIndex(index)
                .defaultType(type)
                .addAction(actions)
                .build();

        client.execute(bulk);
    }
    
    

搜索

如果使用 sourcebuilder,需要引入 es 的包

<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>5.5.3</version>
    <scope>compile</scope>
</dependency>

搜索代码:

    /**
     * 功能描述:搜索
     * @param index       索引名
     * @param type        类型
     * @param constructor 查询构造
     */
    public <T> Page<T> search(String index, String type, Class<T> clazz, ESQueryBuilderConstructor constructor) {
        Page<T> page = new Page<>();
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        //sourceBuilder.query(QueryBuilders.matchAllQuery());
        sourceBuilder.query(constructor.listBuilders());

        sourceBuilder.from((constructor.getFrom()));
        sourceBuilder.size(constructor.getSize() > MAX_SIZE ? MAX_SIZE : constructor.getSize());

        sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));

        //增加多个值排序
        if (constructor.getSorts() != null) {
            constructor.getSorts().forEach((key, value) -> {
                sourceBuilder.sort(SortBuilders.fieldSort(key).order(value));
            });
        }

        //设置需要返回的属性
        if (constructor.getIncludeFields() != null || constructor.getExcludeFields() != null) {
            sourceBuilder.fetchSource(constructor.getIncludeFields(), constructor.getExcludeFields());
        }

        logger.debug("查询条件:{}", sourceBuilder.toString());
        //System.out.println("查询条件:" + sourceBuilder.toString());

        Search search = new Search.Builder(sourceBuilder.toString())
                .addIndex(index)
                .addType(type).setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
                .build();

        SearchResult result = null;
        try {
            result = client.execute(search);
        } catch (IOException e) {
            e.printStackTrace();
        }
        logger.debug("查询结果:{}", result.getJsonString());
        List<T> list = new ArrayList<>();

        result.getHits(clazz).forEach(item -> {
            list.add(item.source);
        });

        page.setList(list).setCount(result.getTotal());

        return page;
    }

统计

public <T> Map<Object, Object> statSearch(String index, String type, ESQueryBuilderConstructor constructor, String groupBy) {
    Map<Object, Object> map = new HashedMap();
    SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
    if (constructor != null) {
        sourceBuilder.query(constructor.listBuilders());
    } else {
        sourceBuilder.query(QueryBuilders.matchAllQuery());
    }

    sourceBuilder.from((constructor.getFrom()));
    sourceBuilder.size(constructor.getSize() > MAX_SIZE ? MAX_SIZE : constructor.getSize());

    sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));

    sourceBuilder.aggregation(AggregationBuilders.terms("agg").field(groupBy));

    //不需要 source
    sourceBuilder.fetchSource(false);

    Search search = new Search.Builder(sourceBuilder.toString())
            .addIndex(index)
            .addType(type).setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
            .build();
    SearchResult result = null;
    try {
        result = client.execute(search);
    } catch (IOException e) {
        e.printStackTrace();
    }
    System.out.println("返回的聚合:" + result.getJsonString());

    result.getAggregations().getTermsAggregation("agg").getBuckets().forEach(item -> {
        map.put(item.getKey(), item.getCount());
    });

    return map;
}

page对象:

public class Page<T> implements Serializable {
    private int pageSize;
    public static final int COUNT_NOT_COUNT = -1;
    public static final int PAGE_SIZE_NOT_PAGING = -1;
    private int last;
    private int first;
    private int next;
    private int prev;
    private int pageNo;
    private int centerNum;
    private static final long serialVersionUID = 1L;
    private String orderBy;
    public static final int COUNT_ONLY_COUNT = -2;
    private List<T> list;
    private long count;

    public List<T> getList() {
        return this.list;
    }

    public int getPageSize() {
        return this.pageSize;
    }

    public Page(int pageNo, int pageSize) {
        this(pageNo, pageSize, 0L);
    }

    public void setOrderBy(String orderBy) {
        this.orderBy = orderBy;
    }

    public Page(int pageNo, int pageSize, long count, List<T> list) {
        this.pageNo = 1;
        this.pageSize = pageSize;
        this.list = list;
        this.centerNum = 5;
        this.pageNo = pageNo;
        this.pageSize = pageSize;
        this.count = count;
        if (list != null) {
            this.list = list;
        }

    }

    public int getFirstResult() {
        int a = (this.getPageNo() - 1) * this.getPageSize();
        if (this.getCount() != -1L && (long)a >= this.getCount()) {
            a = 0;
        }

        return a;
    }

    public int getMaxResults() {
        return this.getPageSize();
    }

    public long getCount() {
        return this.count;
    }

    public void setPageSize(int pageSize) {
        if (pageSize <= 0) {
            this.pageSize = 20;
        } else {
            this.pageSize = pageSize;
        }

    }

    public int getPageNo() {
        return this.pageNo;
    }

    public void initialize() {
        if (!this.isNotPaging() && !this.isNotCount() && !this.isOnlyCount()) {
            if (this.pageSize <= 0) {
                this.pageSize = 20;
            }

            this.first = 1;
            this.last = (int)(this.count / (long)this.pageSize) + this.first - 1;
            if (this.count % (long)this.pageSize != 0L || this.last == 0) {
                ++this.last;
            }

            if (this.last < this.first) {
                this.last = this.first;
            }

            if (this.pageNo <= this.first) {
                this.pageNo = this.first;
            }

            if (this.pageNo >= this.last) {
                this.pageNo = this.last;
            }

            Page var10000;
            if (this.pageNo > 1) {
                var10000 = this;
                this.prev = this.pageNo - 1;
            } else {
                var10000 = this;
                this.prev = this.first;
            }

            if (var10000.pageNo < this.last - 1) {
                this.next = this.pageNo + 1;
            } else {
                this.next = this.last;
            }
        }

    }

    public boolean isNotPaging() {
        return this.pageSize == -1;
    }

    public Page() {
        this.pageNo = 1;
        this.pageSize = 10;
        this.list = new ArrayList();
        this.centerNum = 5;
    }

    public void setCount(long count) {
        if ((this.count = count) != -1L && (long)this.pageSize >= count) {
            this.pageNo = 1;
        }

    }

    public boolean isOnlyCount() {
        return this.count == -2L;
    }

    public Page(int pageNo, int pageSize, long count) {
        this(pageNo, pageSize, count, (List)null);
    }

    public boolean isNotCount() {
        return this.count == -1L || this.isNotPaging();
    }

    public Page<T> setList(List<T> list) {
        if (list == null) {
            list = new ArrayList();
        }

        this.list = (List)list;
        return this;
    }

    public void setCenterNum(int centerNum) {
        this.centerNum = centerNum;
    }

    public void setPageNo(int pageNo) {
        this.pageNo = pageNo;
    }
}

总结:使用 jest 的 client 还是比较方便的,并且使用@JestId 注解,直接将对象的 id 作为 doc 的 id。性能上也不错,经本人测试 1 秒 5k-6k 数据写入没问题

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,376评论 6 491
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,126评论 2 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,966评论 0 347
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,432评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,519评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,792评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,933评论 3 406
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,701评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,143评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,488评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,626评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,292评论 4 329
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,896评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,742评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,977评论 1 265
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,324评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,494评论 2 348

推荐阅读更多精彩内容

  • host Copyright (c) 2014-2017, racaljk. https://github.com...
    JasonStack阅读 3,091评论 0 4
  • 请允许我借鉴前辈们的东西~~~~ 感激不尽~~~~~ 以下为Android 框架排行榜 么么哒~ Android...
    嗯_新阅读 2,018评论 3 32
  • Elasticsearch优势 横向可扩展性:只需要增加台服务器,做一点儿配置,启动一下Elasticsearch...
    AncientJazz阅读 3,399评论 0 8
  • 参考原文:https://github.com/taizilongxu/interview_python Pyth...
    Gasxia阅读 326评论 0 0
  • 密密麻麻的人群中古欣然淡然走出,来至平台前,"把你的手放到水晶球上即可。"古欣然将手放在水晶球上,顿时...
    梦游生阅读 254评论 0 0