自研替换的起因
kibana
- kibana虽灵活,但需学习lucene语法,操作繁锁
- kibana不方便与现有的devops平台集成
- kibana不方便作权限分类
Logstash
- 性能不是logstash的优势
2.虽基于 grok的正则,但使用配置,每个版本的迭代总有不同,学习成本过高
3.重点是时不时jvm溢出,down掉。
kibana替换效果对比
kibana
k.png
自研
k2.png
替换logstashh
应用日志框架是logback, 之前是集成 logstash-logback-encoder , 通过udp端口在线传输数据。
解决的思路,自写UdpAppender替换logstash-logback-encoder
代码:
package com.dmw.server;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketException;
import ch.qos.logback.classic.encoder.PatternLayoutEncoder;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.AppenderBase;
public class UdpAppender extends AppenderBase<ILoggingEvent> {
PatternLayoutEncoder encoder;
int port;
String ip;
DatagramSocket socket;
public PatternLayoutEncoder getEncoder() {
return encoder;
}
public void setEncoder(PatternLayoutEncoder encoder) {
this.encoder = encoder;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public DatagramSocket getSocket() {
return socket;
}
public void setSocket(DatagramSocket socket) {
this.socket = socket;
}
public UdpAppender() {
// this.ip="10.11.199.154";
// this.port=5070;
}
@Override
public void start() {
if (this.encoder == null) {
addError("no layout of udp appender");
return;
}
if (socket==null) {
try {
socket = new DatagramSocket(port);
} catch (SocketException e) {
e.printStackTrace();
}
}
super.start();
}
@Override
protected void append(ILoggingEvent event) {
byte[] buf = encoder.getLayout().doLayout(event).trim().getBytes();
try {
InetAddress address = InetAddress.getByName(ip);
DatagramPacket p = new DatagramPacket(buf, buf.length, address, port);
socket.send(p);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void stop() {
if (!socket.isClosed()) {
socket.close();
}
super.stop();
}
}
logback.xml加上这配置
l1.png
完整的配置
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true">
<contextName>MovieBooking</contextName>
<timestamp key="TIMESTAMP" datePattern="yyyy-MM-dd" />
<property name="LOGPATH" value="log" />
<!-- 输出到控制台 -->
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>
%d{HH:mm:ss.SSS} [%thread] %-5level %logger{40} - %msg%n
</pattern>
</layout>
</appender>
<!-- 输出到文件 -->
<appender name="fileLog"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOGPATH}${file.separator}${TIMESTAMP}.log</file>
<append>true</append>
<encoder>
<pattern>
%d{HH:mm:ss.SSS} [%thread] %-5level %logger{40} - %msg%n
</pattern>
</encoder>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOGPATH}${file.separator}all${file.separator}%d{yyyy-MM-dd}.log
</fileNamePattern>
<maxHistory>30</maxHistory>
</rollingPolicy>
<triggeringPolicy
class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<MaxFileSize>10MB</MaxFileSize>
</triggeringPolicy>
</appender>
<!--<appender name="SOCKET" class="ch.qos.logback.classic.net.SocketAppender">-->
<!--<remoteHost>127.0.0.1</remoteHost>-->
<!--<port>5454</port>-->
<!--<reconnectionDelay>10000</reconnectionDelay>-->
<!--<includeCallerData>true</includeCallerData>-->
<!--</appender>-->
<appender name="udp" class="com.dmw.server.UdpAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
<ip>127.0.0.1</ip>
<port>5454</port>
</appender>
<root level="INFO">
<appender-ref ref="fileLog" />
<appender-ref ref="stdout" />
<appender-ref ref="udp" />
</root>
</configuration>
基于socket的udp协议接口接收数据
协程版
import socket
import asyncio
import uvloop
async def socket_server():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_add = ('0.0.0.0', 5454)
s.bind(server_add)
print(server_add)
while 1:
data, addr = s.recvfrom(1024)
print(data.decode())
s.close()
if __name__ == "__main__":
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(socket_server())
验证效果
l2.png
扩展 ,完善
通过以上铺垫,基本思路明淅,方案可行,但还不能投入使用。
扩展写入消息队列(kafka, rabbitmq,redis....)
再从消息作业务逻辑处理,存入es