自研替换elk中kibana和logstash 一路实战

自研替换的起因

kibana

  1. kibana虽灵活,但需学习lucene语法,操作繁锁
  2. kibana不方便与现有的devops平台集成
  3. kibana不方便作权限分类

Logstash

  1. 性能不是logstash的优势
    2.虽基于 grok的正则,但使用配置,每个版本的迭代总有不同,学习成本过高
    3.重点是时不时jvm溢出,down掉。

kibana替换效果对比

kibana

k.png

自研

k2.png

替换logstashh

应用日志框架是logback, 之前是集成 logstash-logback-encoder , 通过udp端口在线传输数据。

解决的思路,自写UdpAppender替换logstash-logback-encoder
代码:

package com.dmw.server;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketException;

import ch.qos.logback.classic.encoder.PatternLayoutEncoder;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.AppenderBase;

public class UdpAppender extends AppenderBase<ILoggingEvent> {

    PatternLayoutEncoder encoder;
    int port;
    String ip;
    DatagramSocket socket;


    public PatternLayoutEncoder getEncoder() {
        return encoder;
    }
    public void setEncoder(PatternLayoutEncoder encoder) {
        this.encoder = encoder;
    }
    public int getPort() {
        return port;
    }
    public void setPort(int port) {
        this.port = port;
    }
    public String getIp() {
        return ip;
    }
    public void setIp(String ip) {
        this.ip = ip;
    }

    public DatagramSocket getSocket() {
        return socket;
    }
    public void setSocket(DatagramSocket socket) {
        this.socket = socket;
    }
    public UdpAppender() {
//      this.ip="10.11.199.154";
//      this.port=5070;
    }
    @Override
    public void start() {
        if (this.encoder == null) {
            addError("no layout of udp appender");
            return;
        }
        if (socket==null) {
            try {
                socket = new DatagramSocket(port);
            } catch (SocketException e) {
                e.printStackTrace();
            }
        }
        super.start();
    }
    @Override
    protected void append(ILoggingEvent event) {
        byte[] buf = encoder.getLayout().doLayout(event).trim().getBytes();
        try {
            InetAddress address = InetAddress.getByName(ip);
            DatagramPacket p = new DatagramPacket(buf, buf.length, address, port);
            socket.send(p);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
    @Override
    public void stop() {
        if (!socket.isClosed()) {
            socket.close();
        }
        super.stop();
    }

}

logback.xml加上这配置


l1.png

完整的配置

<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true">
    <contextName>MovieBooking</contextName>
    <timestamp key="TIMESTAMP" datePattern="yyyy-MM-dd" />
    <property name="LOGPATH" value="log" />
    <!-- 输出到控制台 -->
    <appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
        <layout class="ch.qos.logback.classic.PatternLayout">
            <pattern>
                %d{HH:mm:ss.SSS} [%thread] %-5level %logger{40} - %msg%n
            </pattern>
        </layout>
    </appender>

    <!-- 输出到文件 -->
    <appender name="fileLog"
              class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${LOGPATH}${file.separator}${TIMESTAMP}.log</file>
        <append>true</append>
        <encoder>
            <pattern>
                %d{HH:mm:ss.SSS} [%thread] %-5level %logger{40} - %msg%n
            </pattern>
        </encoder>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${LOGPATH}${file.separator}all${file.separator}%d{yyyy-MM-dd}.log
            </fileNamePattern>
            <maxHistory>30</maxHistory>
        </rollingPolicy>
        <triggeringPolicy
                class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
            <MaxFileSize>10MB</MaxFileSize>
        </triggeringPolicy>
    </appender>
    <!--<appender name="SOCKET" class="ch.qos.logback.classic.net.SocketAppender">-->
        <!--<remoteHost>127.0.0.1</remoteHost>-->
        <!--<port>5454</port>-->
        <!--<reconnectionDelay>10000</reconnectionDelay>-->
        <!--<includeCallerData>true</includeCallerData>-->
    <!--</appender>-->
    <appender name="udp" class="com.dmw.server.UdpAppender">
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
        <ip>127.0.0.1</ip>
        <port>5454</port>
    </appender>
    <root level="INFO">
        <appender-ref ref="fileLog" />
        <appender-ref ref="stdout" />
        <appender-ref ref="udp" />
    </root>
</configuration>

基于socket的udp协议接口接收数据

协程版

import socket
import asyncio
import uvloop


async def socket_server():
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server_add = ('0.0.0.0', 5454)
    s.bind(server_add)
    print(server_add)
    while 1:
        data, addr = s.recvfrom(1024)
        print(data.decode())
    s.close()


if __name__ == "__main__":
    asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
    loop = uvloop.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(socket_server())

验证效果

l2.png

扩展 ,完善

通过以上铺垫,基本思路明淅,方案可行,但还不能投入使用。

扩展写入消息队列(kafka, rabbitmq,redis....)
再从消息作业务逻辑处理,存入es

完美,收工

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。