版本
filebeat6.6.0 + kafka2.11 +elk7.3.1
elk 是docker 部署的,其他是本地服务
下载地址:https://mirrors.huaweicloud.com/filebeat/6.6.0/
再次申明,博客真的不靠谱。最好看官方文档
filebeat.yml
详见https://www.elastic.co/guide/en/beats/filebeat/6.6/filebeat-input-log.html
https://www.elastic.co/guide/en/beats/filebeat/6.6/kafka-output.html
#============== Filebeat prospectors ===========
filebeat.inputs: # 6.3以前是 filebeat.prospectors:
- type: log # input类型,默认为log,6.0以前配置是 - input_type: log
paths:
- /usr/local/logs/app-collector.log
multiline.pattern: '^\[' #指定匹配的表达式
multiline.negate: true #是否匹配到
multiline.match: after #如果没有匹配到,就合并到上一行的末尾
multiline.max_lines: 2000 #最大行数
multiline.timeout: 2s #如果在规定时间没有新的日志事件就不等待后面的日志了,开始把数据推送出去
fields:
logbiz: collector
logtopic: app-log-collector #按服务划分用作kafka topic
evn: dev
- type: log
paths:
#app-服务名.log
- /usr/local/logs/error-collector.log
#定义写入ES时的_type 值
document_type: "error-log"
multiline.pattern: '^\[' #指定匹配的表达式
multiline.negate: true #是否匹配到
multiline.match: after #如果没有匹配到,就合并到上一行的末尾
multiline.max_lines: 2000 #最大行数
mulitilne.timeout: 2s #如果在规定时间没有新的日志事件就不等待后面的日志了,开始把数据推送出去
fields:
logbiz: collector
logtopic: error-log-collector #按服务划分用作kafka topic
evn: dev
output.kafka:
hosts: ["192.168.159.128:9092"]
topic: '%{[fields.logtopic]}'
partition.hash:
reachable_only: true
compression: gzip
max_message_bytes: 1000000
#acks=0:生产者在成功写入消息之前不会等待任何来自服务器的响应
#acks=1:只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应
#acks=-1:表示分区leader必须等待消息被成功写入到所有的ISR副本中才认为producer请求成功。
required_acks: 1
logging.to_files: true
docker-compose.yml
version: '3'
services:
elasticsearch: #服务名称(不是容器名)
image: elasticsearch:7.3.1
ports:
- "9200:9200" #暴露的端口信息和docker run -d -p 80:80 一样
- "9300:9300"
restart: "always" #重启策略,能够使服务保持始终运行,生产环境推荐使用
container_name: elasticsearch #容器名称
hostname: elasticsearch
environment:
- "discovery.type=single-node" #配置es启动单节点
- "cluster.name=EsForLog" #配置es集群名称
- "ES_JAVA_OPTS=-Xms512m -Xmx512m" #配置es启动参数
kibana:
image: kibana:7.3.1
restart: "always" #重启策略,能够使服务保持始终运行,生产环境推荐使用
container_name: kibana #容器名称
hostname: kibana
#挂载文件
volumes:
- /mydata/kibana/kibana.yml:/usr/share/kibana/config/kibana.yml
links:
- elasticsearch:es01 #容器关联es01是别名
depends_on:
- elasticsearch #依赖es,将会在es创建成功后才执行
ports:
- "5601:5601" #暴露的端口信息和docker run -d -p 80:80 一样
logstash:
image: logstash:7.3.1
restart: "always" #重启策略,能够使服务保持始终运行,生产环境推荐使用
container_name: logstash #容器名称
hostname: logstash
#挂载文件logstash启动配置文件
volumes:
- /mydata/logstash/logstash-springboot.conf:/usr/share/logstash/pipeline/logstash.conf
links:
- elasticsearch:es01 #容器关联es01是别名
depends_on:
- elasticsearch #依赖es,将会在es创建成功后才执行
ports:
- "5044:5044" #暴露的端口信息和docker run -d -p 80:80 一样
logstash.yml
input {
kafka {
## app-log-服务名称
topics_pattern => "app-log-.*"
bootstrap_servers => "192.168.159.128:9092"
codec => json
consumer_threads => 1 ## 增加consumer的并行消费线程数
decorate_events => true
group_id => "app-log-group"
}
kafka {
topics_pattern => "error-log-.*"
bootstrap_servers => "192.168.159.128:9092"
codec => json
consumer_threads => 1
decorate_events => true
group_id => "error-log-group"
}
}
filter {
#时区转换
ruby{
code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
}
# [fields][logtopic]这串东西 对应的是filebeat的配置文件filebeat.yml里面的fields下的logtopic属性,具体的回头看filebeat的内容
if "app-log" in [fields][logtopic]{
grok{
#这个是匹配日志的格式的,日志的格式可以匹配成功这条数据就不过滤,否则就过滤掉
match => ["message","\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{NOTSPACE:hostName}\] \[%{NOTSPACE:ip}\] \[%{NOTSPACE:applicationName}\] \[%{NOTSPACE:location}\] \[%{NOTSPACE:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
}
}
if "error-log" in [fields][logtopic]{
grok{
match => ["message","\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{NOTSPACE:hostName}\] \[%{NOTSPACE:ip}\] \[%{NOTSPACE:applicationName}\] \[%{NOTSPACE:location}\] \[%{NOTSPACE:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
}
}
}
#输出到控制台
output {
if "app-log" in [fields][logtopic]{
#es插件
elasticsearch{
hosts => ["192.168.159.128:9200"]
#索引名 +号开头的,就会姿容任务后面是时间格式
#javalog-app-service-2019.01.23
index => "app-log-%{[fields][logbiz]}-%{index_time}"
#是否嗅探集群ip:一般设置true
#通过嗅探机制进行es集群负载均衡发日志消息
sniffing => true
#logstash默认值自带一个mapping模板,进行模板覆盖
template_overwrite => true
}
}
if "error-log" in [fields][logtopic]{
elasticsearch{
hosts => ["192.168.159.128:9200"]
index => "app-log-%{[fields][logbiz]}-%{index_time}"
sniffing => true
template_overwrite => true
}
}
stdout {
codec => rubydebug
}
}
kibana.yml
elasticsearch.hosts: http://192.168.159.128:9200 #es01是docker-compose中links的别名
server.host: "0.0.0.0"
server.name: kibana
xpack.monitoring.ui.container.elasticsearch.enabled: true
i18n.locale: zh-CN #中文