连接:
private staticSettingssettings= Settings
.builder()
.put("cluster.name",CLUSTER_NAME)
.put("client.transport.sniff", true)
.build();
private staticTransportClientclient;
static{
try{
client= TransportClient.builder().settings(settings).build()
.addTransportAddress(newInetSocketTransportAddress(InetAddress.getByName(IP),PORT));
}catch(UnknownHostException e) {
e.printStackTrace();
}
}
注意http请求跟spark连接都是rest接口,所以端口是9200,而transportClient端口是9300.
查询:
org.elasticsearch.index.query.BoolQueryBuilder bq1 = QueryBuilders.boolQuery()
.must(QueryBuilders.boolQuery()
.should(QueryBuilders.matchPhraseQuery("province","0002"))
.should(QueryBuilders.matchPhraseQuery("province","0001"))
);
bq1.must(QueryBuilders.boolQuery()
.should(QueryBuilders.matchPhraseQuery("sex","02401")));
bq1.must(QueryBuilders.boolQuery()
.should(QueryBuilders.matchPhraseQuery("age","02505"))
);
org.elasticsearch.index.query.BoolQueryBuilder qq = QueryBuilders.boolQuery().must(bq1);
上面是查询条件精确匹配过滤,下面是按media分组,统计distinct count deviceId 列。
.setQuery(qq)
.addAggregation(
AggregationBuilders
.terms("agg1").size(200).field("media")
.subAggregation(AggregationBuilders.cardinality("agg2") -----去重
.precisionThreshold(40000) -----精确匹配最大值,表示完全匹配
.field("deviceId"))
)