ElasticSearch是一款非常优秀的分布式搜索程序,提供RESTful API,底层基于Lucene,采用多shared方式保证数据安全,提供自动resharding功能。
为什么要使用ElasticSearch呢?最开始时使用MySQL进行简单搜索,不能索引LIKE语句直接拉低MySQL性能。后来考虑Sphinx,但在数据量级、多台MySQL、搜索服务本身的HA、后续扩容等问题上,Sphinx并不是一个最优的选择。
对于ES项目中使用需解决的问题:
- 索引:对需搜索的数据,如何建立合适的索引,需根据特定语言使用不同的analyzer等。
- 搜索:ES提供非常强大的搜索功能,如何写出高效的搜索语句呢?
- 数据源:MySQL是唯一的数据源,如何将MySQL的数据导入到ES呢?
由于数据都是从MySQL生成,index的field是固定的,主要做的工作就是根据业务场景设计好对应的mapping和search语句,并不断调优。
对于数据源,需工具将MySQL的数据导入ES,由于对搜索实时性要求很高,需将MySQL增量数据实时导入,可通过row based binlog来完成。
ES关键概念
数据层面
- index:ES用来存储数据的逻辑区域,类似于关系型数据库中的db的概念。一个index可在一个或多个shard上,同时一个shard也可能会由多个replicas。
- document:ES里存储的实体数据,类似于关系数据中一个table里的一行数据。document由多个field组成,不同的document里同名的field一定具有相同的类型。document里field可重复出现,也就是一个field会由多个值,即multivalued。
- document type:为了查询需要,一个index可能会由多种document,也就是document type。不同document中同名的field一定要是相同类型的。
- mapping:存储field的相关映射信息,不同document type会有不同的mapping。
对比MySQL,需要大概认为index就是db,document就是一行数据,field就是table的column,mapping就是table的定义,document type就是一个table就可以了。
document type是为了更好的查询,例如一个index可能一部分数据我们想使用一种查询方式,而另一部分数据想使用另一种查询方式,于是就由2种type。不过这种情况应该在项目中不会出现,所以通常一个index下面仅仅会由一个type。
服务层面
- node:一个server的实例
- cluster:多个node组成的cluster
- shard:数据分片,一个index可能会存在于多个shards,不同shards可能在不同nodes。
- replica:shard的备份,由一个primary shard,其余的叫做replica shards。
ES之所以能动态resharding,主要在于它最开始就预先分配了多个shards,然后以shards为单位进行数据迁移。这种做法其实在分布式领域非常普遍,codis就是使用1024个slot来进行数据迁移。
RESTful API
ES提供了RESTful API,使用JSON格式,使得非常利于与外部交互。RESTful接口很简单,一个URL表示一个特定的资源,例如/blog/article/1表示index为blog,type为article,id为1的document。
使用HTTP标准method来操作这些资源
- POST 新增
- PUT 更新
- GET 获取
- DELETE 删除
- HEAD 判断是否存在。
HTTPIE
推荐HTTPIE,非常强大的HTTP工具,类似curl,几乎是命令行调试ES的绝配。
# get
$ http GET : 9200/blog/article/1
# create
$ http POST :9200/blog/article/1 title="hello" tags:='["elasticsearch"]'
# update
$ http PUT :9200/blog/article/1 title="hello" tags:='["elasticsearch","world"]'
# delete
$ http DELETE :9200/blog/article/1
# exists
$ http HEAD :9200/blog/article/1
索引和搜索
虽然ES能自动判断field类型并建立合适的索引,推荐自己设置相关的索引规则,这样才能更好的为后续的搜索服务。通过定制mapping的方式来设置不同field的索引规则。对于搜索,ES提供太多的搜索选项。索引和搜索是ES非常重要的两个方面,直接关系到产品的搜索体验。
同步MySQL数据
ES很强大但要建立在足够数据的情况下,如何将MySQL的数据导入ES。虽然现在有一些实现,譬如elasticsearch-river-jdbc或elasticsearch-river-mysql。
elasticsearch-river-jdbc功能很强大,但并没很好支持增量数据更新的问题,需要对应的表只增不减,而这几乎在项目中是不能办到的。
elasticsearch-river-mysql采用了python-mysql-replication来通过binlog获得变更的数据,进行增量更新,貌似处理MySQL dump数据导入的问题。
核心概念
接近实时(NRT)
ES是一个接近实时的搜索平台,这意味着从索引一个文档直到这个文档能搜索到由一个轻微的延迟(1秒)。
集群(Cluster)
一个集群是由一个或多个节点组织在一起,共同持有整个数据,并一起提供索引和搜索功能。一个集群由一个唯一的名字标识,这个名字默认就是 elasticsearch。一个节点只能通过指定的某个集群的名字来加入集群。
节点(Node)
一个节点是集群中一个服务器,作为集群一部分,它存储数据参与集群索引和搜索功能。和集群类似,一个节点也是由一个名字来标识的。默认情况下,这个名字是漫威角色的名字,这个名字会在启动时赋予节点,这个名字对于管理工作很重要,因为在管理过程中,你会去确定网络中那些服务对应于ES集群中那些节点。
一个节点可通过配置集群名称的方式来加入一个指定的集群。默认情况下,每个节点都会被安排加入到一个叫做 elasticsearch的集群中,这意味着若你在网络中启动若干节点,并假定他们能相互发现彼此,它们将会自动地形成并加入到一个叫做 elasticsearch的集群中。
在一个集群中只要你想可拥有任意多个节点,若当前你的网络中没有运行任何ES节点,此时启动一个节点会默认并加入一个叫做 elasticsearch 的集群。
索引(index)
一个索引就是一个 拥有哦积分相似特征的文档的集合,比如说你可以由一个客户数据的索引,另一个产品目录的索引还有一个订单数据的索引。
一个索引由一个名字来标识(必须全部都是小写字母),当我们要对对应于这个索引中的文儿当进行索引、搜索、更新、删除的时候,都要使用这个名字。在一个集群中如你想可定义任意多个索引。
类型(type)
在一个索引中可定义一种或多种类型,一个类型是你的索引的一个逻辑上的分类或分区,其语义完全由你来定。通常会为具有一组共同字段的文档定义一个类型。比如你运营的一个博客平台并将你所有的数据存储到一个索引中,在这个索引中你可为用户数据定义一个类型,为博客数据定义另一个类型,当然也可为评论定义另一个类型。
文档(document)
一个文档是一个可被索引的基础信息单元,比如你可拥有某个客户的文档,某个产品的一个文档,当然也可拥有某个订单的一个文档。文档以JSON格式来表示,而JSON是一个到处存在的互联网交换格式。
在一个index/type里面,只要你想可存储任意多的文档。注意尽管一个文档物理上存在于一个索引之中,文档必须被索引赋予一个索引的type。
一. Elasticsearch简介
ES是一个基于Lucene的搜索服务器,它提供一个分布式多用户能力的全文搜索引擎,基于RESTful web接口,ES是使用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。设计用于云计算中,能达到实时搜索、稳定、可靠、快速、安装使用方便。
Elasticsearch是一个分布式、可扩展、实时的搜索与数据分析引擎。
Elasticsearch被用作全文搜索、结构化搜索、分析以及这三个功能的组合。
Elasticsearch是一个开源的搜索引擎,建立在一个全文搜索引擎库Apache Lucene基础之上。
Elasticsearch使用Java编写内部使用Lucene做索引和搜索,其目的是使全文索引变得简单,通过隐藏Lucene的复杂性,取而代之的提供一套简单一致的RESTful API。
归纳总结
- 一个分布式的实时文档存储,每个字段可被索引和搜索。
- 一个分布式实时分析的搜索引擎
- 能胜任上百个服务节点的扩展,并支持PB级别的结构化或非结构化数据。
二. Elasticsearch安装
操作系统:Ubuntu 16.04 LTS
1. 安装java
# 查看java版本
$ java -version
java version "1.7.0_80"
Java(TM) SE Runtime Environment (build 1.7.0_80-b15)
Java HotSpot(TM) 64-Bit Server VM (build 24.80-b11, mixed mode)
# java编译器
$ javac
Elasticsearch recommends Oracle JDK version 1.8.0_73, but the native Ubuntu OpenJDK native package for the JRE works as well.
推荐安装JDK8
$ sudo add-apt-repository -y ppa:webupd8team/java
$ sudo apt-get update
$ sudo apt-get -y install oracle-java8-installer
$ java -version
2. 安装Elasticsearch
建议采用源码包安装,使用 sudo apt-get install elasticsearch
出现下列错误。
# 安装elasticsearch
$ sudo apt-get update
$ sudo apt-get install elasticsearch
# 安装curl
$ sudo apt-get install curl
$ curl -version
# 查看系统类型
$ ps -p 1
# systemd下启动、关闭、重启elasticsearch
$ sudo systemctl start elasticsearch.service
$ sudo systemctl stop elasticsearch.service
$ sudo systemctl restart elasticsearch.service
$ sudo systemctl status elasticsearch.service
# 查看elasticsearch日志
$ ll /var/log/elasticsearch/
#查看日志
$ sudo journalctl -f
$ sudo journalctl --unit=elasticsearch
## 测试elasticsearch
$ curl 'http://localhost:9200/?pretty'
出现问题
jc@jcos:~$ curl 'http://127.0.0.1:9200/?pretty'
curl: (7) Failed to connect to 127.0.0.1 port 9200: 拒绝连接
root@jcos:~# curl -XGET 'localhost:9200'
curl: (7) Failed to connect to 127.0.0.1 port 9200: 拒绝连接
# 查看9200端口
root@jcos:~# netstat -tulpn | grep 9200
3. 源码包安装
下载最新版本的 elasticsearch
目前最新版本
https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.4.3.deb
# wget 下载
$ wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.4.3.deb
# 解压安装deb
$ sudo dpkg -i elasticsearch-5.4.3.deb
# 安装后elasticsearch位于
$ cd /usr/share/elasticsearch/
# 开启服务
$ system.md start elasticsearch.service
测试:启动elasticsearch节点
$ curl -X GET 'http://localhost"9200'
{
"name" : "orc_HnJ",
"cluster_name" : "elasticsearch",
"cluster_uuid" : "m9RkECrAToWEVAjEdpidMw",
"version" : {
"number" : "5.4.3",
"build_hash" : "eed30a8",
"build_date" : "2017-06-22T00:34:03.743Z",
"build_snapshot" : false,
"lucene_version" : "6.5.1"
},
"tagline" : "You Know, for Search"
}
将ES加入到系统启动文件并启动ES服务
# 加入开启自启动
$ sudo update-rc.d elasticsearch default 95 1
# 启动服务
$ sudo /etc/init.d/elasticsearch start
# CURL测试
$ curl -X GET "http://localhost:9200"
单个节点可作为一个运行中Elasticsearch实例,而一个集群是一组拥有相同cluster.name的节点,他们能一起工作并共享数据,还提供容错和可伸缩性。可在/etc/elasticsearch.yml
配置文件中修改cluster.name
,该节点会在节点启动时加载。
3. 安装Kibana
kibana是一个用于Elasticsearch分析和查询的仪表盘,它最吸引人的应该是NB的图标和表现能力。
下载安装 kibana
https://www.elastic.co/downloads/kibana
Kibana服务
# 启动kibana
$ systemctl start kibana.service
# 关闭kibana
$ systemctl stop kibana.service
# 查看kibana状态
$ systemctl status kibana.service
# 重启 kibana
$ systemctl restart kibana.service
# 使用deb安装kibana位于
$ cd /usr/share/kibana/
地址栏输入 localhost:5601
4. 安装Sense
Sense是一个Kibana应用,它提供交互式控制台,通过浏览器直接向Elasticsearch提交请求。
https://github.com/elastic/sense
HEADS UP: This repo is deprecated. Sense is now included as Console in Kibana 5.0. File issues over at elastic/kibana
目前sense已经集成到Kibana5中,访问 http://localhost:5601
打开左侧菜单栏 Dev Tools
此处就是 sense
。
例如:计算集群中文档的数据量
在Dev Tools中输入
GET /_count
{
“query":{
"match_all":{}
}
}
三. 和Elasticsearch交互
和Elasticsearch交互取决于你是否使用java,ES为Java用户提供两种内置客户端:
节点客户端(node cilent)
节点客户端作为一个非数据节点加入到本地集群中,换句话说它本身不保存任何数据,但是它知道数据在集群中那个节点中,并可把请求转发到正确的节点。传输客户端(transport client)
轻量级的传输客户端可将请求发送到远程集群,它本身不加入集群,但可将请求转发到集群中的一个节点上。
两个java客户端都是通过9300端口并使用本地ES传输协议和集群交互,集群中的节点通过端口9300彼此通信。若端口没有打开,节点将无法形成一个集群。
基于HTTP协议,以JSON为数据交换格式的RESTful API
其他所有语言都可使用RESTful API,通过9200端口与ES通信,可使用你喜欢的web客户端,设置可通过curl命令与ES通信。
四. 面向文档
ES是面向文档的,意味着可存储整个对象或 文档,而且索引每个文档的内容使之可被检索。在ES中,对文档进行索引、检索、排序、过滤而不是对行列数据,这种不同的思考数据的方式,也是 ES 能支持复杂全文检索的原因。Elasticsearch 使用 Javascript Object Notation 或 JSON 作为文档的序列化格式。
案例:创建一个雇员目录的业务需求
- 支持包含多值标签、数值、全文本数据
- 检索任一雇员的完整信息
- 允许结构化搜索,比如查询30岁以上的员工
- 允许简单的全文搜索以及较复杂的短语搜索
- 支持在匹配文档内容中高亮显示搜索片段
- 支持基于数据创建和管理分析仪表盘
1. 索引雇员文档
以雇员文档形式存储,一个文档代表一个雇员。存储数据到 Elasticsearch 的行为叫做索引,但在索引一个文档之前,需确定文档存储位置。
一个 Elasticsearch 集群可包含多个索引,相应每个索引可包含多个类型,不同类型存储多个文档,每个文档有多个属性。
一个索引类似于关系数据库中一个数据库,一个存储关系文档的地方,索引一个文档就是存储一个文档到一个索引(名称)中以便被检索和查询。关系数据库通过增加一个索引到指定列以提升数据检索速度。elasticsearch使用一个叫做倒序索引的结构来达到相同目的。
默认一个文档中每个属性都是被索引的和可搜索的,一个没有倒序索引的属性是不能被搜索的。
对于雇员目录将做如下操作
- 每个雇员索引一个文档,包含雇员的所有信息。
- 每个文档都将是 employee 类型
- employee 类型位于 megacorp内
- 该索引保存在Elasticsearch集群中
在Dev Tool
输入
// 创建一个索引或执行每个属性的数据类型 : megacorp 索引名称 / employee 类型名称 / 1 特定雇员编号
// 请求体,JSON文档中包含员工详细信息
PUT /megacorp/employee/1
{
"firstname":"John",
"lastname":"smith",
"age":25,
"introduce":"i love to go rock climbing",
"interests": ["sports", "music"]
}
返回结果
{
"_index": "megacorp",
"_type": "employee",
"_id": "1",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": true
}
继续创建索引
PUT /megacorp/employee/2
{
"firstname":"Jane",
"lastname":"Smith",
"age":32,
"introduce":"i like to collect rock albums",
"interests":["music"]
}
PUT /megacorp/employee/3
{
"firstname":"Douglas",
"lastname":"Fir",
"age":35,
"introduce":"i like to build cabinets",
"interests":["forestry"]
}
2. 检索文档 GET
检索单个雇员的数据,执行一个HTTP GET请求并指定文档地址-索引库、类型和ID,即可返回原始的JSON文档。
GET megacorp/employee/1
返回
{
"_index": "megacorp",
"_type": "employee",
"_id": "1",
"_version": 1,
"found": true,
"_source": {
"firstname": "John",
"lastname": "Smith",
"age": 25,
"introduce": "i love to go rock climing",
"interests": [
"sports",
"music"
]
}
}
GET megacorp/employee/2
GET megacorp/employee/3
HTTP命令
- GET 检索文档
- PUT 新增文档
- DELETE 删除文档
- HEAD 检查文档是否存在
3. 轻量搜索
# 搜索所有雇员
GET megacorp/employee/_search
使用索引库megacorp以及类型employee,返回结果包括3个文档,放在数组hits中。一个搜索默认返回10条结果。
{
"took": 2,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 3,
"max_score": 1,
"hits": [
{
"_index": "megacorp",
"_type": "employee",
"_id": "2",
"_score": 1,
"_source": {
"firstname": "Jane",
"lastname": "Smith",
"age": 32,
"introduce": "i like to collect rock albums",
"interests": [
"music"
]
}
},
{
"_index": "megacorp",
"_type": "employee",
"_id": "1",
"_score": 1,
"_source": {
"firstname": "John",
"lastname": "Smith",
"age": 25,
"introduce": "i love to go rock climing",
"interests": [
"sports",
"music"
]
}
},
{
"_index": "megacorp",
"_type": "employee",
"_id": "3",
"_score": 1,
"_source": {
"firstname": "Douglas",
"lastname": "Fir",
"age": 35,
"introduce": "i like to build cabinets",
"interests": [
"forestry"
]
}
}
]
}
}
需求:搜索姓名为 Smith 的雇员
使用高亮搜索,涉及到查询字符串(query-string_)搜索,通过一个URL参数来传递查询信息给搜索接口。
GET magacorp/employee/_search?q=lastname:Smith
3. 查询表达式搜索
Query-string
搜索通过命令非常方便地进行临时性的即席搜索,但它由自身的局限性。Elasticsearch
提供了丰富灵活的查询语言叫做查询表达式。
GET megacorp/employee/_search
{
"query":{
"match":{
"lastname":"Smith"
}
}
}
不再使用query-string参数,而使用JSON构造的请求体替代,并使用一个match查询。
4. 复杂的搜索
需求:搜索姓氏为Smitch且年龄大于30的雇员
GET megacorp/employee/_search
{
"query":{
"bool":{
"must":{
"match":{"lastname":"smith"}
}
},
"filter":{
"range":{
"age":{"gt":30}
}
}
}
}
使用range过滤器,找到年龄大于30的文档。
添加了一个过滤器用于执行一个范围查询,并复用之前的match查询。
5. 全文搜索(_search)
需求:搜索所有喜欢攀岩(rock climbing)的雇员
GET megacorp/employee/_search
{
"query":{
"match":{
"introduce":"rock climbing"
}
}
}
结果按相关性排序,即每个文档跟查询的匹配程度,第一个最高得分的结果很明显。
{
"took": 0,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 2,
"max_score": 0.26742277,
"hits": [
{
"_index": "megacorp",
"_type": "employee",
"_id": "2",
"_score": 0.26742277,
"_source": {
"firstname": "Jane",
"lastname": "Smith",
"age": 32,
"introduce": "i like to collect rock albums",
"interests": [
"music"
]
}
},
{
"_index": "megacorp",
"_type": "employee",
"_id": "1",
"_score": 0.26742277,
"_source": {
"firstname": "John",
"lastname": "Smith",
"age": 25,
"introduce": "i love to go rock climing",
"interests": [
"sports",
"music"
]
}
}
]
}
}
6. 短语搜索(match_phrase)
需求:仅匹配同时包含rock和albums,且二者以短语 rock albums 形式紧挨着的雇员记录。
GET megacorp/employee/_search
{
"query":{
"match_phrase":{
"introduce":"rock albums"
}
}
}
7. 高亮搜索(highlight)
GET megacorp/employee/_search
{
"query":{
"match_phrase":{
"introduce":"rock albums"
}
},
"highlight": {
"fields": {
"introduce": {}
}
}
}
返回结果中以HTML的em标签封装
GET megacorp/employee/_search
{
"query":{
"match_phrase":{
"introduce":"rock albums"
}
},
"highlight": {
"fields": {
"introduce": {}
}
}
}
8. 分析
支持管理这对雇员目录做分析,Elasticsearch聚合(aggregations)允许我们基于数据生成一些精细的分析结果。集合与SQL中的GROUP BY类似,但更强大。
# 挖掘出雇员中最受欢迎的兴趣爱好
GET megacorp/employee/_search
{
"aggs": {
"all_interests": {
"terms": { "field": "interests" }
}
}
}
# 查询名叫Smith的雇员中最受欢迎的兴趣爱好可直接添加适当的查询来组合
GET /megacorp/employee/_search
{
"query": {
"match": {
"lastname": "smith"
}
},
"aggs": {
"all_interests": {
"terms": {
"field": "interests"
}
}
}
}
# 查询特定兴趣爱好员工的平均年龄,聚合支持分级汇总
GET /megacorp/employee/_search
{
"aggs" : {
"all_interests" : {
"terms" : { "field" : "interests" },
"aggs" : {
"avg_age" : {
"avg" : { "field" : "age" }
}
}
}
}
}
五. 集群内的原理
ElasticSearch 的主旨是随时可用和按需扩容,而扩容可通过购买性能更强大(垂直扩容或纵向扩容)或数量更多的服务器来实现。虽然ES可获益于更强大的硬件设备,但垂直扩容是有限的。真正的扩容能力是来自于水平库扩容 - 为集群添加更多的节点,并将负载压力和稳定分散到节点中。
对于大多数数据库而言,通常需要对应用进行非常大的改动,才能利用上横向扩容和新增资源。ES天生就是分布式的,它知道如何通过管理多节点来提高扩容性和可用性,这也意味着应用无需关注这个问题。
PHP
使用composer安装elasticsearch
# composer.json文件加入
{
“require”:{
"elasticsearch/elasticsearch":"~2.0@beta"
}
}
# composer 安装
$ composer update
测试
require("/vendor/elasticsearch/autoload.php");
$hosts = ["192.168.1.10'];
$client = Elasticsearch\ClientBuilder::create()->setHosts($host)->build();
$client = $this->getElasticClient();
$params = [
"index"=>"website",
"type" => "blog",
"body" => [
"query"=>[
"match"=>["_id"=>1]
]
]
];
$rtn = $client->search($params);
var_dump($rtn);
数据操作 1
索引内的一个文档的建立,类似mysql中添加记录
# SQL插入数据方式
INSERT INTO blog('title','content','created_at') VALUES('ES PHP', 'balabala...','2017-07-20')
ES实例化
require('/vendor/elasticsearch/autoload.php');
$hosts = ["192.168.1.10"];
$client = ElasticSearch\ClientBuilder::create()->setHosts($hosts)->build();
Index添加数据
// 添加数据
$params = [
'index'=>'website',
'type'=>'blog',
'id'=>10,
'body'=>[
'title'=>'elasticsearch & php',
'content'=>'balabala...',
'created_at'=>'2017-08-01 12:02:20'
]
];
$resp = $client->index($params);
var_dump($resp);
Get查询数据
// 查询数据 GET
$params = [
'index' => 'website',
'type' => 'blog',
'id' => 1
];
try{
$resp = $client->get($params);
}catch(Exception $e){
$resp = $e->getMessage();
}
Search查询数据
//查询数据 Search
$params = [
"index" => "website",
"type" => "blog",
"body" => [
"query" => [
"match" => ["title"=>"elasticsearch php"]
],
]
];
try{
$resp = $client->search($params);
}catch(Exception $e){
$resp = $e->getMessage();
}
```
##数据操作2
创建索引
```
$client = new Elasticsearch\Client();
// 设置索引名称
$index = ['index'=>'log', 'type'=>'syslog'];
// 设置分片数量
$data['body']['settings'] = ['number_of_shards'=>5, 'number_of_replicas'=>0];
// 创建索引
$client->indices()->create($index);
```
插入索引数据
```
// 设置索引名称
$index['index'] = 'log';//索引名称
$index['type'] = 'syslog';//类型名称
$index['id'] = 1;//系统自动生成唯一编号
$index['body'] = [
'mac'=>'ajlfadlkajldf',
'customer_id' => 3,
'product_id' => 5,
'version' => 2
];
$client->index($index);
```
查询数据1:match
```
$index['index'] = 'log';//设置索引名称
$index['type'] = 'syslog';//设置类型名
$index['body']['query']['match']['mac'] = 'test';
$index['size'] = 10;
$index['from'] = 200;
$client->search($index);
//相当于SQL语句
SELECT * FROM syslog WHERE mac='test' LIMIT 200,10;
```
查询数据2: must = and
```
$index['index'] = 'log';//设置索引
$index['type'] = 'syslog';//设置类型
$index['body']['query']['bool']['must'] = [
['match'=>['mac'=>'rest']],
['match'=>['product_id'=>20]]
];
$index['size'] = 10;
$index['from'] = 200;
$client->search($index);
//相当于SQL语句
SELECT * FROM syslog WHERE mac='test' AND product_id=20 LIMIT 200,10
```
查询数据3: should = or
```
$index['index'] = 'log';//设置索引
$index['type'] = 'syslog';//设置类型
$index['body']['query']['bool']['should'] = [
['match'=>['mac'=>'test']],
['match'=>['product_id'=>20]]
];
$index['size'] = 10;
$index['from'] = 200;
$client->search($index);
// 相当于sql语句
SELECT * FROM syslog WHERE mac='test' OR product_id=20 LIMIT 200,10
```
查询数据4:must_not
```
$index['index'] = 'log';
$index['type'] = 'syslog';
$index['body']['query']['bool']['must_not'] = [
['match'=>['mac'=>'test']],
['match'=>['product_id'=>20]]
];
$index['size'] = 10;
$index['from'] = 100;
$client->search($index);
//相当于SQL语句
SELECT * FROM syslog WHERE mac='test' AND product_id!=20 LIMIT 200,10
```
查询数据5:range
```
$index['index'] = 'log';
$index['type'] = 'syslog';
$index['body']['query']['range'] = [
'id' => ['gte'=>20, 'lt'=>30]
];
$index['size'] = 10;
$index['from'] = 200;
$client->search($index);
// 相当于sql语句
SELECT * FROM syslog WHERE id>=20 AND id<30 LIMIT 200,10
```
删除文档
```
$index['index'] = 'dbname';//设置索引
$index['type'] = 'tblname';//设置类型
$index['id'] = 2;
$client->delete($index);
```
## 将MySQL数据同步到ES
### 1. 下载安装 Elasticsearch-jdbc
ES-JDBC下载地址
[https://github.com/jprante/elasticsearch-jdbc](https://github.com/jprante/elasticsearch-jdbc)
```
# 查看 ES 版本
$ cd /usr/share/elasticsearch
$ ./elasticsearch -V
Version: 5.4.3, Build: eed30a8/2017-06-22T00:34:03.743Z, JVM: 1.8.0_131
```
由于本机ES5不能使用数据源ES-JDBC,故选择ES5+logstash的logstash-input-jdbc实现mysql数据同步。
## 2. 安装 Logstash
logstash 下载地址
[https://www.elastic.co/cn/products/logstash](https://www.elastic.co/cn/products/logstash)
下载
```
wget https://artifacts.elastic.co/downloads/logstash/logstash-5.4.3.deb
```
Logstash是一款轻量级的日志收集处理框架,可方便的把分散的、多样化的日志搜索起来,并进行自定义处理,然后传输到指定位置,比如某个服务器或文件。
Logstash作为数据分析软件主要目的是分析log日志,整套软件可当做MVC模型,Logstash是controller,Elasticsearch是model层,Kibana是view层。
首先将数据传递给logstash,它将数据进行过滤和格式化为json格式,然后传递给Elasticsearch进行存储、建立搜索的索引,kibana提供前端页面在进行搜索和图标可视化,它调用Elasticsearch的接口返回的数据进行可视化。
logstash-input-jdbc插件使用ruby开发,由于国内网络原因,访问不了亚马逊的服务器。解决办法是修改为国内的ruby仓库镜像,此镜像托管于淘宝的阿里云服务器上。
```
# 查看gem版本
$ gem -v
# 查看gem源
$ gem source
# 删除官网源
$ gem -r https://rubygems.org/
# 添加淘宝源
$ gem -a https://ruby.taobao.org/
```
Logstash的逻辑架构
Logstash经历收集、过滤、输出三个步骤即可简单的筛选与管理日志,这三个步骤经过符合业务规模的设计后可满足各种规模的需求。
Logstash配置
1. 定义数据源
定义的数据源支持从文件、stdin、kafka、twitter等来源,甚至可自己写一个input plugin。若出现新日志拷进来它会自动去扫描。
```
# 定义数据源
input {
# 从文件读取数据
file {
path => "/data/web/logstash/logFile/*/*",
# 从文件开始处读写
start_position => 'begining'
}
#从标准输入读取数据
stdin {}
}
```
2. 定义数据格式
根据日志的格式,用正则表达式进行匹配。
```
filter{
# 定义数据的格式
grok{
match => {"messge"=>"%{DATA:timestamp}\|%{IP:serverIp}\|%{IP:clientIp}\|%{DATA:logSource}\|%{DATA:userId}\|%{DATA:reqUrl}\|%{DATA:refer}\|%{DATA:device}\|%{DATA:textDuring}\|%{DATA:dringTIme:int}\|\|"}
}
}
```
过滤数据格式
```
filter {
# 定义数据格式
grok{}
# 定义时间戳格式
date{
match => ['timestamp', 'yyyy-MM-dd HH:mm:ss'],
locale => 'cn'
}
}
```
指出客户端IP,Logstash会自动抓取该IP的相关位置信息。
```
filter {
# 定义数据格式
grok{...}
# 定义时间戳的格式
date {...}
# 定义客户端IP字段
geoip{
source => 'clientIp'
}
}
```
关于客户端UA,由于UA格式较多,logstash会自动去分析提取操作系统等相关信息。
```
# 定义客户端设备字段
useragent{
source => 'device',
target => 'userDevice'
}
# 需进行转换的字段,将访问时间转换为int再传递给ES。
mutate{
convert => ["duringTime", "integer"]
}
```
输出配置
将过滤后的数据输出到ES
```
output {
# 将输出保存到ES,若无匹配时间就不保存,因为日志里的网址有些带有换行
if [timestamp] = ~/^\d{4}-\d{2}-\d{2}/{
elasticsearch {host=>localhost}
}
# 输出到stdout
stdout {codec => rubydebug }
# 定义访问数据的用户名和密码
user => webService
password => 1q2w3e4r
}
```
Logstash配置格式
```
input{
#读取数据
}
filter{
# 从不规则的日志中提取关注的数据
}
output{
# 输出数据
}
```
从input中的插件读入数据,按行数据,与awk类似。
```
file{
path => "/var/log/maillog",
start_position => "begining"
}
```
在filter中进行数据处理
首先读取一行,把内容传递给message字段,message字段与awk中的$0类似。