Logstash使用配置
运行:bin/logstash -f /home/logstash.conf
logstash.conf
#整个配置文件分为三部分:input,filter,output
input {
#输入
}
filter{
#过滤匹配
}
output {
#输出
}
1、input配置
1.1 file{}(文件读取)
监听文件变化,记录一个叫.sincedb的数据库文件来跟踪被监听的日志文件的当前读取位(也就是时间戳)
格式:
input {
#file可以多次使用,也可以只写一个file而设置它的path属性配置多个文件实现多文件监控
file {
path =>["/var/log/access.log", "/var/log/message"]#监听文件路径
type => "system_log"#定义事件类型
start_position =>"beginning"#检查时间戳
}
}
参数说明:
exclude:排除掉不想被监听的文件
stat_interval:logstash每隔多久检查一次被监听文件状态(是否有更新),默认是1秒。
start_position:logstash默认是从结束位置开始读取文件数据,也就是说logstash进程会以类似tail -f的形式运行。如果你是要导入原有数据,把这个设定改成“beginning”,logstash进程就按时间戳记录的地方开始读取,如果没有时间戳则从头开始读取,有点类似cat,但是读到最后一行不会终止,而是继续变成tail -f。
1.2 codec(定义编码类型)
优化建议:直接输入预定义好的JSON数据,这样就可以省略掉filter/grok配置,从而减轻过滤器logstash的CPU负载消耗;
配置文件示例:
input {
file {
type =>"access_u_ex"
#添加自定义字段
add_field => {"fromhost"=>"EIR"}
#监听文件的路径
path =>"D:/java/logs/test/test1*"
#排除不想监听的文件
exclude =>"test13.log"
#监听文件的起始位置
start_position =>"beginning"
#增加标签
#tags =>
"tag1"
#设置多长时间扫描目录,发现新文件
#discover_interval=> 15
#设置多长时间检测文件是否修改
#stat_interval
=> 1
codec => multiline {
charset =>"GB2312"
patterns_dir => ["d:/java/logstash-2.3.4/bin/patterns"]
pattern =>"^%{SECOND_TIME}"
negate =>true
what =>"previous"
}
}
}
从上到下,解释下配置参数的含义。
file :读取文件。
path :日志文件绝对路径(只支持文件的绝对路径)。
type :定义类型,以后的filter、output都会用到这个属性。
add_field:添加字段,尽量不要与现有的字段重名。重名的话可能会有不同错误出现。
start_position:从什么位置开始读取文件,默认是结束位置。如果要导入原有数据,把这个设定改为beginning .
discover_interval:设置多长时间检查一个被监听的path下面是否有新文件。默认时间是15秒.
exclude:不想监听的文件排除出去。
stat_interval:设置多久时间检查一次被监听的文件(是否有更新),默认是1秒。
说明:
a、有时候path可能需要配置多个路径:
path => ["D:/java/logs/test/test1*","D:/java/logs/test/test2*"]
b、add_field添加字段时,想判断要添加的字段是否存在,不存在时添加
if![field1]{
mutate{
add_field=> {"field1" => "test1"}
}
}
codec编码插件
logstash实际上是一个input | codec | filter | codec |
output的数据流。codec是用来decode、encode事件的。
Multiline合并多行数据
Multiline有三个设置比较重要:pattern、negate、what .
pattern:必须设置,String类型,没有默认值。匹配的是正则表达式。上面配置中的含义是:通过SECOND_TIME字段与日志文件匹配,判断文件开始位置。
negate:boolean类型,默认为false .如果没有匹配,否定正则表达式。
what:必须设置,可以设置为previous或next .(ps:还没弄清楚两者之间的关系,目前我用的都是previous )。
根据上面的配置,还有两个设置:charset、patterns_dir。
charset :一般都知道是编码,这个就不多说了。
patterns_dir:从配置中看到的是一个文件路径。
2、filter过滤器配置
2.1 data(时间处理)
用来转换日志记录中的时间字符串,变成LogStash::Timestamp对象,然后转存到@timestamp字段里。
注意:因为在稍后的outputs/elasticsearch中index常用的%{+YYYY.MM.dd}这种写法必须读取@timestamp数据,所以一定不要直接删掉这个字段保留自己的时间字段,而是应该用filters/date转换后删除自己的字段!至于elasticsearch中index使用%{+YYYY.MM.dd}这种写法的原因后面会说明。
格式:
filter{
grok {
match =>["message", "%{HTTPDATE:logdate}"]
}
date {
match=> ["logdate", "dd/MMM/yyyy:HH:mm:ss Z"]
}
}
2.2 grok(正则匹配)
语法格式如下:
%{正则表达式或正则子句:你要赋值的变量名}
filter{
grok {
match => [ "message","\s+(?\d+?)\s+" ]#跟python的正则有点差别
}
}
%{语法:语义}
“语法”指的就是匹配的模式,例如使用NUMBER模式可以匹配出数字,IP则会匹配出127.0.0.1这样的IP地址:
%{NUMBER:lasttime}%{IP:client}
默认情况下,所有“语义”都被保存成字符串,你也可以添加转换到的数据类型
%{NUMBER:lasttime:int}%{IP:client}
目前转换类型只支持int和float
优化建议:如果把“message”里所有的信息都grok到不同的字段了,数据实质上就相当于是重复存储了两份。所以可以用remove_field参数来删除掉message字段,或者用overwrite参数来重写默认的message字段,只保留最重要的部分。
filter {
grok {
patterns_dir =>"/path/to/your/own/patterns"
match => {
"message" =>"%{SYSLOGBASE} %{DATA:message}"
}
overwrite => ["message"]
}
}
filter {
grok {
match => ["message", "%{HTTPDATE:logdate}"]
remove_field =>["logdate"]
}
}
Logstash内置正则文件:
/home/logstash-5.2.1/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-4.0.2/patterns/grok-patterns
可以在此文件中进行添加项;
覆盖-- overwrite
使用Grok的overwrite参数也可以覆盖日志中的信息
filter {
grok {
match =>{ "message" => "%{SYSLOGBASE} %{DATA:message}" }
overwrite=> [ "message" ]
}
}
日志中的message字段将会被覆盖
2.3kv
将数据源转换成键值对,并创建相对的field。比如传入“a=111&b=2222&c=3333”,输出的时候,a,b,c会被创建成三个field,这样做的好处是,当需要查询某一个参数的时候可直接查询,而不是把一个字符串搜索出来再做解析。
kv {
source => "field_name"
field_split => "&?"
}
2.4 IP位置插件GeoIP(地址查询归类)--(态势感知项目中攻击数据IP转换可用)
GeoIP是最常见的免费IP地址归类查询库,同时也有收费版可以采购。GeoIP库可以根据IP地址提供对应的地域信息,包括国别,省市,经纬度等,对于可视化地图和区域统计非常有用。
使用方法:
filter {
geoip{
source =>"clientip"
database =>"/etc/logstash/GeoLiteCity.dat"#需去官网下载ip库放到本地
}
}
示例:
filter {
geoip {
source =>"message"#如果能联网可查询在线ip库
}
}
运行结果:
{
"message"=> "183.60.92.253",
"@version"=> "1",
"@timestamp"=> "2014-08-07T10:32:55.610Z",
"host"=> "raochenlindeMacBook-Air.local",
"geoip"=> {
"ip"=> "183.60.92.253",
"country_code2"=> "CN",
"country_code3"=> "CHN",
"country_name"=> "China",
"continent_code"=> "AS",
"region_name"=> "30",
"city_name"=> "Guangzhou",
"latitude"=> 23.11670000000001,
"longitude"=> 113.25,
"timezone"=> "Asia/Chongqing",
"real_region_name"=> "Guangdong",
"location"=> [
[0]113.25,
[1]23.11670000000001
]
}
}
注:geoip插件的“source”字段可以是任一处理后的字段,比如“clientip”,但是字段内容却需要小心!geoip库内只存有公共网络上的IP信息,查询不到结果的,会直接返回null,而logstash的geoip插件对null结果的处理是:不生成对应的geoip.字段。
所以在测试时,如果使用了诸如127.0.0.1, 172.16.0.1,
182.168.0.1, 10.0.0.1等内网地址,会发现没有对应输出!
GeoIP库数据较多,如果不需要这么多内容,可以通过fields选项指定自己所需要的。下例为全部可选内容:
filter {
geoip {
fields => ["city_name","continent_code","country_code2","country_code3","country_name","dma_code","ip","latitude","longitude","postal_code","region_name","timezone"]
}
}
选项
上面我们看到了source和fields两个选项,geoip还提供了下列选项:
2.5 drop
drop可以跳过某些不想统计的日志信息,当某条日志信息符合if规则时,该条信息则不会在out中出现,logstash将直接进行下一条日志的解析。
if [field_name] == "value" {
drop {}
}
2.6重命名-- rename
对于已经存在的字段,重命名其字段名称
filter {
mutate {
rename => ["syslog_host", "host"]
}
}
2.7更新字段内容– update
更新字段内容,如果字段不存在,不会新建
filter{
mutate {
update =>{ "sample" => "My new message" }
}
}
2.8替换字段内容– replace
与update功能相同,区别在于如果字段不存在则会新建字段
filter {
mutate {
replace => { "message" =>"%{source_host}: My new message" }
}
}
2.9数据类型转换– convert
filter{
mutate {
convert =>["request_time", "float"]
}
}
2.10文本替换– gsub
gsub提供了通过正则表达式实现文本替换的功能
filter {
mutate {
gsub => [
# replace all forward slashes with underscore
"fieldname","/", "_",
# replace backslashes, question marks, hashes, andminuses
#with a dot "."
"fieldname2","[\\?#-]", "."
]
}
}
2.11大小写转换-- uppercase、lowercase
filter {
mutate {
uppercase => ["fieldname" ]
}
}
2.12去除空白字符-- strip
类似php中的trim,只去除首尾的空白字符
filter {
mutate{
strip=> ["field1", "field2"]
}
}
2.13删除字段-- remove、remove_field
remove不推荐使用,推荐使用remove_field
filter {
mutate{
remove_field=> [ "foo_%{somefield}" ]
}
}
2.14分割字段– split
将提取到的某个字段按照某个字符分割
filter {
mutate {
split => ["message","|"]
}
}
针对字符串"123|321|adfd|dfjld*=123",可以看到输出结果:
{
"message" =>[
[0] "123",
[1] "321",
[2] "adfd",
[3]"dfjld*=123"
],
"@version" =>"1",
"@timestamp"=> "2014-08-20T15:58:23.120Z",
"host" =>"raochenlindeMacBook-Air.local"
}
2.15聚合数组– join
将类型为array的字段中的array元素使用指定字符为分隔符聚合成一个字符串
如我们可以将split分割的结果再重新聚合起来:
filter {
mutate {
split =>["message", "|"]
}
mutate {
join =>["message", ","]
}
}
输出:
{
"message" =>"123,321,adfd,dfjld*=123",
"@version" =>"1",
"@timestamp"=> "2014-08-20T16:01:33.972Z",
"host" =>"raochenlindeMacBook-Air.local"
}
2.16合并数组– merge
对于几个类型为array或hash或string的字段,我们可以使用merge合并
filter {
mutate {
merge => ["dest_field", "added_field" ]
}
}
需要注意的是,array和hash两个字段是不能merge的
2.17时间分割– split
mutiline让logstash将多行数据变成一个事件,当然了,logstash同样支持将一行数据变成多个事件
logstash提供了split插件,用来把一行数据拆分成多个事件
示例:
filter {
split {
field => "message"
terminator => "#"
}
}
运行结果:
对于"test1#test2",上述logstash配置将其变成了下面两个事件:
{
"@version":"1",
"@timestamp":"2014-11-18T08:11:33.000Z",
"host":"web121.mweibo.tc.sinanode.com",
"message":"test1"
}
{
"@version":"1",
"@timestamp":"2014-11-18T08:11:33.000Z",
"host":"web121.mweibo.tc.sinanode.com",
"message":"test2"
}
需要注意的是,当split插件执行结束后,会直接进入output阶段,其后的所有filter都将不会被执行
2.18 json
对于json格式的log,可以通过codec的json编码进行解析,但是如果记录中只有一部分是json,这时候就需要在filter中使用json解码插件
示例:
filter {
json{
source=> "message"
target=> "jsoncontent"
}
}
运行结果:
{
"@version":"1",
"@timestamp":"2014-11-18T08:11:33.000Z",
"host":"web121.mweibo.tc.sinanode.com",
"message":"{\"uid\":3081609001,\"type\":\"signal\"}",
"jsoncontent": {
"uid":3081609001,
"type":"signal"
}
}
上面的例子中,解析结果被放到了target所指向的节点下,如果希望将解析结果与log中其他字段保持在同一层级输出,那么只需要去掉target即可:
{
"@version":"1",
"@timestamp":"2014-11-18T08:11:33.000Z",
"host":"web121.mweibo.tc.sinanode.com",
"message":"{\"uid\":3081609001,\"type\":\"signal\"}",
"uid":3081609001,
"type":"signal"
}
3、output
output {
stdout{ codec=>rubydebug}#直接输出,调试用起来方便
#输出到redis
redis {
host => '10.120.20.208'
data_type => 'list'
key => '10.99.201.34:access_log_2016-04'
}
#输出到ES
elasticsearch {
hosts =>"192.168.0.15:9200"
index => "%{sysid}_%{type}"
document_type => "%{daytag}"
}
}
附:
1、Grok正则在线验证http://grokdebug.herokuapp.com/
2、Logstash配置https://my.oschina.net/shawnplaying/blog/670217
3、Logstash最佳实践http://udn.yyuap.com/doc/logstash-best-practice-cn/index.html