elasticsearch 5.4 JAVA API 使用

JAVA API

根据官网API进行的整合,提供了增删改查、分组的demo
前置条件 :

  • JDK1.8
  • elasticsearch 5.4
  • maven 项目

1.新建maven项目,添加依赖

#添加more-core依赖,或者自行添加sping依赖代替(私服依赖)
<dependency>
    <groupId>com.umpay</groupId>
    <artifactId>mars-core</artifactId>
    <version>2.0.0-SNAPSHOT</version>
</dependency>
#client依赖(必须的)
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>transport</artifactId>
    <version>5.4.1</version>
</dependency>
#使用了LifeCycle的生命周期类(非必须),可使用Spring的生命周期或者其他来实现(私服依赖)
<dependency>
    <groupId>com.umpay</groupId>
    <artifactId>typhos-kernel</artifactId>
</dependency>

关于日志
若使用log4j 2来记录日志,增加如下依赖

<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-api</artifactId>
    <version>2.8.2</version>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-core</artifactId>
    <version>2.8.2</version>
</dependency>

若使用其他日志,如logback,需添加slf4j桥转接

<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-to-slf4j</artifactId>
    <version>2.8.2</version>
</dependency>

2.客户端实现
ElasticsearchClient.java
①设置变量,set方法自己生成

private String clusterName = "elasticsearch";//设置集群名称
private String clusterNodes;//节点的ip:port
private boolean sniff = true;// 是否自动检测变化节点
private String pingTimeout = "5s";// 等待来自节点的ping响应的时间。默认为5 s
private boolean ignoreClusterName = false;//是否忽略集群名称
private String nodesSamplerInterval = "5s";// 对列出和连接的节点进行采样的频率。默认为5 s
private TransportClient client;

②启动方法

@Override
protected void doStart( ) throws Exception {
  buildClient();
  logger.info( "elasticSearch client is connected");
  super.doStart( );
}

protected void buildClient( ) throws Exception {
  client = new PreBuiltTransportClient( settings( ) );
  Assert.hasText( clusterNodes, "[Assertion failed] clusterNodes settings missing." );
  for( String clusterNode : clusterNodes.split( ";" )) {
       String hostName = clusterNode.substring( 0,clusterNode.indexOf( ":" ) );
       String port = clusterNode.substring( clusterNode.indexOf( ":" )+1,clusterNode.length( ) );
       Assert.hasText( hostName, "[Assertion failed] missing host name in 'clusterNodes'" );
       Assert.hasText( port, "[Assertion failed] missing port in 'clusterNodes'" );
       logger.info( "adding transport node : " + clusterNode );
       client.addTransportAddress( new InetSocketTransportAddress( InetAddress.getByName( hostName ), Integer.valueOf( port ) ) );
   }
  client.connectedNodes( );
}

private Settings settings( ) {
  return Settings.builder( ).put( "cluster.name", clusterName ).put( "client.transport.sniff", sniff )
    .put( "client.transport.ignore_cluster_name", ignoreClusterName )
    .put( "client.transport.ping_timeout", pingTimeout )
    .put( "client.transport.nodes_sampler_interval", nodesSamplerInterval ).build( );
}

③停止方法

@Override
protected void doStop( ) throws Exception {
    try {
        logger.info( "Closing elasticSearch  client" );
        if( client != null ) {
            client.close( );
        }
    } catch( final Exception e ) {
        logger.error( "Error closing ElasticSearch client: ", e );
    }
    super.doStop( );
}

④设置单例

private ElasticsearchClient(){}

public static ElasticsearchClient getInstance(){
    return InstanceHolder.instance;
}

private static final class InstanceHolder {
    public static final ElasticsearchClient instance = new ElasticsearchClient( );
}

⑤ 提供getClient方法

public TransportClient getClient(){
    return client;
}

3.sping 配置

<bean id="esClient" class="com.umpay.mars.simu.elasticsearch.client.ElasticsearchClient" factory-method="getInstance" 
    init-method="start" destroy-method="stop">
    <property name="clusterName" value="es5.4"/>
    <property name="clusterNodes" value="192.168.181.130:9300"/>
    <property name="sniff" value="true"/>
    <property name="pingTimeout" value="50s"/>
    <property name="nodesSamplerInterval" value="5s"/>
    <property name="ignoreClusterName" value="false"/>
</bean>

4.启动

public static void main( String[] args ) throws Exception {
    App.getInstance( ).start( );//可替换成其他加载spring的方式     
}

5.数据的增删改
① 单条数据插入

public static void insert( ) {
    TransportClient client = ElasticsearchClient.getInstance( ).getClient( );
    //bill-->索引,submit-->类型
    IndexResponse response =client.prepareIndex( "bill", "submit" ).setSource( "{\"mobile\":\"14445424" + i + "\",\"status\":\"1\",\"orgId\":\"553\",\"apId\":\"ap01\",\"bizCode\":\"001\"}" ).get( );
    System.out.println( response.getIndex( ) + "-" + response.getId( ) + "-" +esponse.getType( ) );
}

②批量插入、删除
bulk

public static void bluk( ) {
    TransportClient client = ElasticsearchClient.getInstance( ).getClient( );
    BulkRequestBuilder bulkRequest = client.prepareBulk( );
    for( int i = 0; i < 1000; i++ ) {
        //可add index/delete 请求
        bulkRequest.add( client.prepareIndex( "bill", "submit" ).setSource("{\"mobile\":\"144dfdf5424" + i + "\",\"status\":\"5\",\"orgId\":\"5ds5\",\"apId\":\"ap03\",\"bizCode\":\"002\"}" ) );
        bulkRequest.add(client.prepareDelete( "bill", "submit" ,"1" ));//index,type,id
    }
    //此时发送处理请求
    BulkResponse bulkResponse = bulkRequest.get( );
    if( bulkResponse.hasFailures( ) ) {
        //失败处理
    }
}

③ BulkProcessor--它提供了一个简单的接口,可以根据请求的数量或大小,或在给定的时间段后自动执行批量操作。

BulkProcessor bulkProcessor = BulkProcessor.builder(
        client,  <1>
        new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId,
                                   BulkRequest request) { ... }    <2>

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  BulkResponse response) { ... }    <3>

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  Throwable failure) { ... }    <4>
        })
        .setBulkActions(10000)   <5>
        .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))    <6>
        .setFlushInterval(TimeValue.timeValueSeconds(5))      <7>
        .setConcurrentRequests(1)      <8>
        .setBackoffPolicy(
            BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))     <9>
        .build();

说明:

<1> elasticsearch客户端
<2> 每个批量请求前调用,例如可以查询numberOfActions->request.numberOfActions()
<3> 每个批量执行后调用此方法。例如可以检查是否有一些失败的请求->response.hasFailures()
<4> 每个批量执行后,抛出了异常调用此方法,大部分失败了
<5> 设置批量大小,比如每到10000个批次就处理
<6> 设置数据量大小,比如每5M处理一次
<7> 设置定时处理间隔,比如每5s处理一次
<8> 设置并发请求的数量。值为0意味着只有一个单一的请求被允许执行。值为1时表示1个并发请求,请求是累计的批次请求
<9> 设置一个补偿政策,当一次批量请求失败,并抛出EsRejectedExecutionException (表明请求处理不过来)的异常时,初次等待100ms重试,重试3次,重试等待时间呈指数增长,禁用补偿政策,通过设置BackoffPolicy.noBackoff().

默认值:

bulkActions = 1000
bulkSize = 5mb
不设置 flushInterval
concurrentRequests = 1
backoffPolicy设置次8重试和开始50毫秒的延迟。总等待时间大约是5.1秒。

添加的请求
IndexRequest、DeleteRequest、UpdateRequest、还有upsert的请求

bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));
bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));
bulkProcessor.add(new UpdateRequest( "report", "test", "2" ).doc( "{\"orgId\":\"1\"}" ))
//id存在则更新否则就插入
bulkProcessor.add(new UpdateRequest( "report", "test", "2" ).doc( "{\"orgId\":\"1\"}" ).upsert( /* your doc here */))

BulkProcessor关闭
当文档加载到 BulkProcessor后,通过使用awaitClose或 close方法将其关闭:

bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
或者
bulkProcessor.close();

如果被设置flushInterval,这两种方法都可以flush任何剩余的文档,并禁用所有其他预定的flush政策。如果启用了并发请求,在等待的超时时间内完成了所有的bulk请求,awaitClose方法将会返回true ,否则会返回false。close方法不等待任何剩余的批量请求完成并立即退出。
④update --当id不存在时将会抛出异常

public static void update( ) throws InterruptedException, ExecutionException  {
    TransportClient client = ElasticsearchClient.getInstance( ).getClient( );
    UpdateRequest updateRequest = new UpdateRequest( "report", "test", "338799bd8c40e1963fd56557fb161c" ).doc( "{\"orgId\":\"333\"}" );
    client.update( updateRequest ).get( );
}

⑤upsert--id不存在时就插入

public static void upsert( ) throws InterruptedException, ExecutionException {
    TransportClient client = ElasticsearchClient.getInstance( ).getClient( );
    UpdateRequest updateRequest = new UpdateRequest( "report", "test", "338799bd8c40e1963fd56557fb161c" ).doc( "{\"orgId\":\"333\"}" ).upsert(/*source*/ );
    client.update( updateRequest ).get( );
}

⑥分组
分组的结果是树形结构的json,使用时需要自行拼装需要的对象

{
    "aggregations": {
        "ap_count": {
            "doc_count_error_upper_bound": 0, 
            "sum_other_doc_count": 0, 
            "buckets": [
                {
                    "key": "ap02", 
                    "doc_count": 304000, 
                    "org_count": {
                        "doc_count_error_upper_bound": 0, 
                        "sum_other_doc_count": 0, 
                        "buckets": [
                            {
                                "key": "5ds5", 
                                "doc_count": 300000
                            }, 
                            {
                                "key": "5ds3", 
                                "doc_count": 2000
                            }, 
                            {
                                "key": "5ds4", 
                                "doc_count": 2000
                            }
                        ]
                    }
                }, 
                {
                    "key": "ap03", 
                    "doc_count": 300000, 
                    "org_count": {
                        "doc_count_error_upper_bound": 0, 
                        "sum_other_doc_count": 0, 
                        "buckets": [
                            {
                                "key": "5ds5", 
                                "doc_count": 300000
                            }
                        ]
                    }
                }
            ]
        }
    }
}

java code

  public static void aggregation( ) {
        SearchRequestBuilder sbuilder = ElasticsearchClient.getInstance( ).getClient( ).prepareSearch( "bill" ).setTypes( "submit" );
        TermsAggregationBuilder apBuilder = AggregationBuilders.terms( "ap_count" ).field( "apId" );
        TermsAggregationBuilder orgBuilder = AggregationBuilders.terms( "org_count" ).field( "orgId" );
        apBuilder.subAggregation( orgBuilder );
        sbuilder.addAggregation( apBuilder );
        SearchResponse response = sbuilder.execute( ).actionGet( );
        Map<String, Aggregation> aggMap = response.getAggregations( ).asMap( );
        StringTerms teamAgg = (StringTerms) aggMap.get( "ap_count" );
        Iterator<Bucket> teamBucketIt = teamAgg.getBuckets( ).iterator( );
        while( teamBucketIt.hasNext( ) ) {
            Bucket buck = teamBucketIt.next( );
            String key = buck.getKeyAsString( );
            long count = buck.getDocCount( );
            System.out.println(key+"--"+count);
            Map subaggmap = buck.getAggregations( ).asMap( );
            StringTerms orgAgg = (StringTerms) subaggmap.get( "org_count" );
            Iterator<Bucket> orgBucketIt = orgAgg.getBuckets( ).iterator( );
            while(orgBucketIt.hasNext( )){
                Bucket orgBuck = orgBucketIt.next( );
                String orgKey=orgBuck.getKeyAsString( );
                long orgCount = orgBuck.getDocCount( );
                System.out.println(orgKey+"--"+orgCount);
            }
        }
  }

6.数据查询
①get--根据index,type,id

GetResponse response = client.prepareGet("twitter", "tweet", "1").get();

可设置是否多个线程处理请求,默认为true

GetResponse response = client.prepareGet("twitter", "tweet", "1")
        .setOperationThreaded(false)
        .get();

②根据条件查询、分页

   public static void queryPage(){
        TransportClient client = ElasticsearchClient.getInstance( ).getClient( );
        SearchResponse response = client.prepareSearch("bill")
        .setTypes("submit")
        .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
        .setQuery(QueryBuilders.termQuery("orgId", "5ds5"))                 // Query
        //.setPostFilter(QueryBuilders.rangeQuery("age").from(12).to(18))     // Filter
        .setFrom(0).setSize(60).setExplain(true)
        .get();
        SearchHits hits = response.getHits();
        for (SearchHit searchHit : hits) {
            Map source = searchHit.getSource();
            for(Object key : source.keySet( )){
                System.out.println(key+"-" +source.get( key ));
            }
        }
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,718评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,683评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,207评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,755评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,862评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,050评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,136评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,882评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,330评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,651评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,789评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,477评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,135评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,864评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,099评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,598评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,697评论 2 351

推荐阅读更多精彩内容