filebeat + kafka + elk 日志收集方案实践
[TOC]
0.使用的插件的版本
elasticsearch-5.6.11
filebeat-6.4.2
kafka_2.12-1.1.1
kibana-5.6.11
logstash-6.4.2
zookeeper-3.4.10
注意:
服务器均已安装 JDK1.8 版本
这里的es版本要和kibana的版本要对应
filebeat 对 kafka 的版本有需求,版本不对应将导致日志无法正确收集
es 和logstash 不要安装到同一个服务器 两个都为内存大户
1.安装elasticsearch
下载对应版本的安装包
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.6.11.tar.gz
tar -xvf elasticsearch-5.6.11.tar.gz
进入当前目录,启动Elasticsearch:
cd elasticsearch-5.6.11/bin
./elasticsearch
如果是用root账号启动,会报以下错误:
Exception in thread "main" java.lang.RuntimeException: don't run elasticsearch as root.
at org.elasticsearch.bootstrap.Bootstrap.initializeNatives(Bootstrap.java:93)
at org.elasticsearch.bootstrap.Bootstrap.setup(Bootstrap.java:144)
at org.elasticsearch.bootstrap.Bootstrap.init(Bootstrap.java:285)
at org.elasticsearch.bootstrap.Elasticsearch.main(Elasticsearch.java:35)
Refer to the log for complete error details.
这个意思也很明显,Elasticsearch出于安全方面的考虑, 不允许用户使用root账号启动Elasticsearch。我们得新建一个用户,专门用于启动Elasticsearch。
创建一个用户组elsearch与用户组中的用户elsearch:
groupadd elsearch
useradd elsearch -g elsearch
修改用户elsearch的密码:
passwd elsearch
修改目录拥有者,赋予相应的权限:
chown -R elsearch:elsearch elasticsearch-6.2.4
切换到用户elsearch,或者使用elsearch登陆,启动Elasticsearch:
su elsearch cd elasticsearch-6.2.4/bin
./elasticsearch
如果你想让你的ElasticSearch在后端启动:
./bin/elasticsearch -d
查看Elasticsearch是否安装成功,如果有返回值说明安装成功:
curl http://127.0.0.1:9200
2.安装filebeat
日志收集插件
下载对应版本的安装包
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.4.2-linux-x86_64.tar.gz
tar -zxvf filebeat-6.4.2-linux-x86_64.tar.gz
进入当前目录,编辑配置文件 filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /root/logs/xiezhu/*/*.log # 文件夹包含子目录遍历
fields:
log_topics: log
multiline: # 多行代码合并
pattern: '^\['
negate: true
match: after
output.kafka: # 输出方式,输出到kafka
enabled: true
hosts: ["192.168.1.182:9092"]
topic: '%{[fields][log_topics]}' # 输出的topic
#这里配置下name 在对应日志中可显示出来
name: 192.168.1.126-Ares-server
启动filebeat
nohup ./filebeat &
查看输出日志
tail -f logs/filebeat
3.安装kafka
下载对应版本的安装包
wget http://mirrors.shu.edu.cn/apache/kafka/1.1.1/kafka_2.12-1.1.1.tgz
tar -zxvf kafka_2.12-1.1.1.tgz
进入当前目录,编辑配置文件 config/server.properties
broker.id=0
listeners=PLAINTEXT://192.168.1.182:9092 # 网络监听端口
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/soft/kafka/kafka-logs # 日志输出路径
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.1.160:2181 # zk地址
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
启动kafka
nohup ./bin/kafka-server-start.sh config/server.properties &
验证安装
1、创建topic
bin/kafka-topics.sh --create --zookeeper 192.168.1.160:2181 --replication-factor 1 --partitions 1 --topic test
2、查看创建的topic
bin/kafka-topics.sh -list -zookeeper 192.168.1.160:2181
test
3、生产消息测试
bin/kafka-console-producer.sh --broker-list 192.168.1.182:9092 --topic test
this is test #输入后回车
4、消费消息测试
bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.182:9092 --topic test --from-beginning
this is test
4.安装zookeeper
下载对应版本的安装包
wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz
tar -zxvf zookeeper-3.4.10.tar.gz
进入当前目录,编辑配置文件conf/zoo.cfg
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/opt/soft/zookeeper/zookeeper-3.4.10/data
# the port at which the clients will connect
clientPort=2181
启动zookeeper
./bin/zkServer start
5.安装logstash
下载对应版本的安装包
wget https://artifacts.elastic.co/downloads/logstash/logstash-6.4.2.tar.gz
tar -zxvf logstash-6.4.2.tar.gz
进入当前目录,编辑配置文件config/logstash.conf
input {
kafka {
bootstrap_servers => "192.168.1.182:9092"
topics => ["log"]
codec => json {
charset => "UTF-8"
}
}
# 如果有其他数据源,直接在下面追加
}
filter {
# 将message转为json格式
if [type] == "log" {
json {
source => "message"
target => "message"
}
}
}
output {
# 处理后的日志入es
elasticsearch {
hosts => "192.168.1.181:9200"
index => "logstash-%{+YYYY.MM.dd-HH:mm:ss.SSS}"
}
}
启动logstash
nohup ./bin/logstash -f config/logstash.conf &
6.安装kibana
下载对应版本的安装包
wget https://artifacts.elastic.co/downloads/kibana/kibana-5.6.11-linux-x86_64.tar.gz
tar -zxvf kibana-5.6.11-linux-x86_64.tar.gz
进入当前目录 修改相关配置config/kibana.yml
server.port: 5601
server.host: "localhost"
server.name: "MM-LOG"
elasticsearch.url: "http://192.168.1.181:9200" # es的地址
kibana.index: ".kibana"
启动/停止命令
# 启动
./kibana
# 停止
fuser -n tcp 5601 # 查看端口进程
kill -9 端口 # 杀死上步执行结果进程
所有流程执行成功结果
7.kibana 优化
7.1 添加登录验证
# 1.安装nginx
sudo apt-get install nginx
# 2.安装Apache密码生产工具
sudo apt-get install apache2-utils
# 3.生成密码文件
mkdir -p /etc/nginx/passwd
htpasswd -c -b /etc/nginx/passwd/kibana.passwd 用户名 密码
# 4.配置nginx
#/etc/nginx/conf.d/default
server {
listen 192.168.1.182:5601;
auth_basic "Kibana Auth";
auth_basic_user_file /etc/nginx/passwd/kibana.passwd;
location / {
proxy_pass http://127.0.0.1:5601;
proxy_redirect off;
}
}
# 5.重启kafka
# 6.重启nginx
7.2 汉化
英文好的请自行忽略
https://github.com/anbai-inc/Kibana_Hanization
需要python运行环境
python main.py Kibana目录
8.日志总体架构图
日志搜集分析 对系统无侵入,可扩展