参考资料
https://blog.csdn.net/UbuntuTouch/article/details/99702199
需求
最近在做日志收集到es时遇到些问题。我不想用logstash做日志的切分,毕竟很大,想用FileBeat直接推数据到es中。这就涉及到一个问题,这步切分数据在哪做。这个可以使用es的ingest node来做。
解决
ingest node
es在5.0X以后推出了ingest node的功能,可以对数据进行预处理。我们可以指定需要的Pipeline,然后ingest node 将会帮我们按照规定的 processor 顺序来执行对数据的操作和处理。
定义pipeline
要在索引之前预处理文档,我们必须定义pipeline。pipeline定义了一系列processor。每个processor以某种方式转换文档。每个processor按照在 pipeline 中定义的顺序执行。 pipeline由两个主要字段组成description和processor列表。
pipeline 是群集级存储而被保存在每个节点的内存中,并且 pipeline 始终在 ingest node中运行,因此最好在群集中保留需要的 pipeline,而删除那些不需要的 pipeline。
Pipeline 以 cluster 状态存储,并且立即传播到所有 ingest node。 当 ingest node 接收到新 pipeline 时,它们将以内存 pipeline 表示形式更新其节点,并且 pipeline 更改将立即生效。
命令相关
注册或者更新pipeline(根据对应的json注册test-pipeline)
curl -H "Content-Type: application/json" -XPUT 'http://10.130.7.207:9200/_ingest/pipeline/test-pipeline' -d@/home/hadoop/zgh/log/pipeline.json
删除pipeline(下面是删除test-pipeline)
curl -XDELETE '10.130.7.207:9200/_ingest/pipeline/test-pipeline?pretty'
pipeline的json格式
格式如下
{
"description" : "...",
"processors" : [ ... ]
}
processor
我在另外一篇文章也有讲怎么将FileBeat的数据直接传到es,然后做切分。这里只是讲下processor。
ingest node内置了很多processor,grok就是属于processor的一种。processor还有有很多,参考以下网址
内置的processor
https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest-processors.html
我们这里2中常见的grok和drop
grok
grok 适用于正则匹配,用来匹配分割文本并映射到关键字的工具,在grok调试中,由于采用是整个字段匹配,如果一处无法匹配,就会导致整个匹配失败
我们在写grok的时候,需要了解它的自带的类型,以及正则相关的知识,然后进行测试,下面是相关的链接
测试grok的工具
http://grokdebug.herokuapp.com
这里是些常用的
GREEDYDATA 匹配出\n外的所有值
LOGLEVEL 匹配日志级别
TIMESTAMP_ISO8601 匹配时间 毫秒级别
遇到的问题: 我在匹配正则的时候,想匹配全部使用GREEDYDATA 发现失败。因为GREEDYDATA 不匹配\n, 后来使用(.|\n)*这种形式发现是可以的。
"grok": {
"field": "message",
"patterns": ["%{TIMESTAMP_ISO8601:client} %{GREEDYDATA:method} %{LOGLEVEL:request} %{GREEDYDATA:demo} %{ALL_CODE:messageaaa}" ],
"pattern_definitions" : {
"ALL_CODE" : "(.|\n)*"
}
}
remove
remove这里用于删除字段。我这边的需求用到它是因为使用grok切分完,原有字段就将其删除了。
格式为,这个看官网就行,一大片
{
"remove": {
"field" : "message"
}
}