推荐系统之LogServer日志系统

1.基于NGINX和thrif rpc的日志采集

1.1 软件包准备

前提环境:Hadoop+Flume+Hbase+zookeeper
安装环境:


链接:https://pan.baidu.com/s/1zikEFn_bLkgj1DwTMM7cxg
提取码:8wde

1.2 thrif rpc安装

任务1:调通单机版的thrift、python版本
step1: 安装thrift(下载、编译)

安装thrift rpc环境:好处
1)安全,数据加密(二进制)
2)数据压缩,节省带宽,提升性能
3)解耦,常用于内部服务之间的互通

解压
]# tar xvzf thrift-0.9.3.tar.gz
完成解压,进入thrift源码的根目录下
step2: 安装依赖库(yum库),以便后面能进行源码编译
]# yum install boost-devel-static libboost-dev libboost-test-dev libboost-program-options-dev libevent-dev automake libtool flex bison pkg-config g++ libssl-dev ant


step3: 接着安装c、c++源码包通常3步:
1)./configure --with-cpp --with-boost --with-python --without-csharp --with-java --without-erlang --without-perl --without-php --without-php_extension --without-ruby --without-haskell --without-go
执行过程如下图,发现有错误,从日志看出原因是缺少c++编译器



安装c++编译器:yum install gcc-c++
继续./configure,还是报错:



解决办法:
安装 openssl openssl-devel (centOS)
]#yum -y install openssl openssl-devel
再次./configure,成功。

2)make
3)make install
此刻完成thrift的安装。

1.3 基于python的thrift的使用

step1: 保证python可支持的模块
]# pip install thrift==0.9.3


这时改用清华镜像的方式安装,安装成功。
[root@master src]# pip install thrift==0.9.3 -i https://pypi.tuna.tsinghua.edu.cn/simple/

step2: 创建接口文件
]# cat RecSys.thrift
service RecSys {
string rec_data(1:string data)
}

step3: 利用接口文件,自动生成py接口代码(python:接口模块,c++:server)
]# thrift --gen py RecSys.thrift


step4: 开发python的server端和client端代码
server.py

#coding=utf=8

import sys
sys.path.append('../schema/gen-py')

from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from thrift.server import TServer

from RecSys import RecSys
from RecSys.ttypes import *

class RecSysHandler(RecSys.Iface):
    def rec_data(self, a):
        print "Receive: %s" %(a)
        return "I'm OK !!!"


if __name__ == "__main__":

    # 实例化handler
    handler = RecSysHandler()

    # 设置processor
    processor = RecSys.Processor(handler)

    # 设置端口
    transport = TSocket.TServerSocket('localhost', port=9090)

    # 设置传输层
    tfactory = TTransport.TBufferedTransportFactory()

    # 设置传输协议
    pfactory = TBinaryProtocol.TBinaryProtocolFactory()

    server = TServer.TThreadedServer(processor, transport, tfactory, pfactory)

    print 'Starting the server...'
    server.serve()
    print 'done.'

client.py

#coding=utf=8

import sys
sys.path.append('../schema/gen-py')

from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol

from RecSys import RecSys


try:
    # 设置端口
    transport = TSocket.TSocket('localhost', port=9090)

    # 设置传输层
    transport = TTransport.TBufferedTransport(transport)

    # 设置传输协议
    protocol = TBinaryProtocol.TBinaryProtocol(transport)

    client = RecSys.Client(protocol)

    transport.open()

    rst = client.rec_data("are you ok!!!")
    print "receive return data: ", rst

    transport.close()

except Thrift.TException, ex:
    print "%s" % (ex.message)

第29行就是在本地启动一个9090端口的服务
1)先启动server,等待数据
]# python server.py
报错:



经查明,子模块和当前python文件都引用了同一个模块,于是去掉python文件的引用:



再次启动:

2)发送数据client
]# python client.py



实用python3环境执行报错,切换成python2.7再次执行
客户端打印:



服务端打印:

到此,完成了基于python的thrift的使用!
1.4 调通单机版的thrift、c++版本

step1: 产生c++的接口代码:
]# thrift --gen cpp RecSys.thrift


step2:进入目录gen-cpp/,直接编译
]# g++ -I/usr/local/include/thrift/ -lthrift RecSys_server.skeleton.cpp RecSys.cpp RecSys_constants.cpp -o server


生成一个server,file server可以看到server是一个可执行文件。

从RecSys_server.skeleton.cpp文件中查看端口号:vim RecSys_server.skeleton.cpp

可看到默认端口号是9090
step3: 开启服务:./server,发现报错

解决办法:
修改/.bashrc或/.bash_profile或系统级别的/etc/profile

  1. 添加:
    export LD_LIBRARY_PATH=/usr/local/lib
    export PATH=/where/lib/:$LD_LIBRARY_PATH:$PATH
  2. source .bashrc (使生效)
    再次执行./server,没有报错,查看端口号:nc -tunlp|grep 9090



    step4: 实用python client端请求测试
    python client


发现客户端返回是空,因为我们server端没有做任何修改
修改前的RecSys_server.skeleton.cpp文件内容:



接下来做一下修改:
]# vim RecSys_server.skeleton.cpp

 23   void rec_data(std::string& _return, const std::string& data) {
 24     // Your implementation goes here
 25     //printf("rec_data\n");
 26     std::cout << "Receive Data: " << data << std::endl;
 27     
 28     _return = "I'm OK !!!";
 29   } 

修改完毕,需要重新编译:
]# g++ -I/usr/local/include/thrift/ -lthrift RecSys_server.skeleton.cpp RecSys.cpp RecSys_constants.cpp -o server
再次运行./server和python client.py
server端打印如下:



client端响应打印如下:



至此,c++版本的server开发完毕,剩下的就是c++版本的client的开发了。

step4: 接下来,开发client端:
client_demo.cpp

#include "RecSys.h"
#include <iostream>
#include <string>

#include <transport/TSocket.h>
#include <transport/TBufferTransports.h>
#include <protocol/TBinaryProtocol.h>

using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using namespace std;

int main(int argc, char **argv) {

    boost::shared_ptr<TSocket> socket(new TSocket("localhost", 9090));
    boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
    boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));

    transport->open();

    RecSysClient client(protocol);

    string query_str = "are you okokok!!!";
    string recevie_data;
    client.rec_data(recevie_data, query_str);

    std::cout << "Receive data:" << recevie_data << endl;

    transport->close();
}

编译:
]# g++ -I/usr/local/include/thrift/ -lthrift client.cpp RecSys.cpp RecSys_constants.cpp -o client
原来的server没有停止,现在执行client_demo.cpp:./client_demo.cpp
服务端打印:



客户端打印:



我们再来测试一下c++的client端和python的server间的通信
先启动python的server端:python server.py
再启动c++的client端:./client_demo
服务端打印:

客户端打印:



至此,c++的客户端和服务端的thrift开发完毕。
1.5 搭建Nginx服务器

step1: 安装通常3步:
1)./configure --prefix=/usr/local/nginx/
2)make
3)make install
step2: 启动nginx:
在/usr/local/nginx目录下
]# ./sbin/nginx
]# netstat -antup | grep 80


访问:
http://192.168.87.10/
http://master/

1.6 配合cgi完成独立的server

CGI:公共网关协议:在web服务器上开发一个cgi的程序,该程序可以访问计算机上的资源

step1: 下载:fcgi-2.4.1-SNAP-0910052249.tar.gz在本文最上面已经给出
step2: 解压后,修改一个头文件:]# vim include/fcgio.h
第35行



step3: 安装
安装通常3步:
1)./configure
2)make
3)make install
step4: 开始开发一个cgi的demo
test.cpp代码编写如下:

#include <iostream>
#include <string>

#include <stdio.h>
#include <stdlib.h>

#include <fcgi_stdio.h>
#include <fcgiapp.h>

using namespace std;


inline void send_response(
            FCGX_Request& request, const string& resp_str) {

    FCGX_FPrintF(request.out, "Content-type: text/html;charset=utf-8\r\n\r\n");
    FCGX_FPrintF(request.out, "%s", resp_str.c_str());

    FCGX_Finish_r(&request);
}

int main(int argc, char **argv) {

    FCGX_Init();
    //接收到的请求
    FCGX_Request request;
    FCGX_InitRequest(&request, 0, 0);

    while(FCGX_Accept_r(&request) >= 0) {
        string query_str = FCGX_GetParam("QUERY_STRING", request.envp);

        cout << "query str: " << query_str << endl;

        send_response(request, query_str);
    }

    return 0;
}

编译:]# g++ test.cpp -lfcgi -o test



单独执行这个test没什么用,它只是一小段代码,为了完成整个cgi demo开发,接下来需要安装spawn-cgi并完成服务托管

step4: 下载安装一个托管服务,在本文前面也已经给出,自行下载路径:wget https://github.com/lighttpd/spawn-fcgi/archive/spawn-fcgi-1.6.4.tar.gz

step5: 安装分4步:
1)./autogen.sh
2)./configure
3)make
4)make install
step6: 接下来,用spawn-fcgi工具托管自主开发的cgi demo bin(test)
]# /usr/local/bin/spawn-fcgi -a 127.0.0.1 -p 8099 -f test



发现在当前目录执行报错,切换到其他目录再次执行,成功



检查端口号:]# netstat -antup|grep 8099

接下来怎么把这个NGINX80端口转发到8099,这时就需要改造NGINX
1.7 NGINX配置

step1:
配置nginx的反向代理功能:
/usr/local/nginx/conf目录下
nginx.conf文件中加入:

 48         location ~ /behavior_recsys$ {
 49             fastcgi_pass 127.0.0.1:8099;
 50             include fastcgi_params;
 51         }

step2:
配置完后,重新加载配置:
]# ./sbin/nginx -s reload
step3:
测试,打开浏览器,访问:http://master/behavior_recsys?userid=111&item=222&action=click


查看nginx的log,也是正常。

日志之所以能够直接返回到页面展示,是因为我们之前开发的cgi的demo,test.cpp代码,直接将接受到的请求返回。

1.8 使用google glog日志模块把用户行为信息写入到本地文件(log文件)

glog日志级别 FATAL(高) > ERROR > WARNING > INFO > TRACE > DEBUG(低)
step1: 安装glog:
1)./configure
2)make
3)make install
step2: 开发client,相当于在之前的基础上,把日志写入文件。

#include <stdio.h>
#include <stdlib.h>

#include <fcgi_stdio.h>
#include <fcgiapp.h>

#include <glog/logging.h>

using namespace std;


inline void send_response(
            FCGX_Request& request, const string& resp_str) {

    FCGX_FPrintF(request.out, "Content-type: text/html;charset=utf-8\r\n\r\n");
    FCGX_FPrintF(request.out, "%s", resp_str.c_str());

    FCGX_Finish_r(&request);
}

int main(int argc, char **argv) {

    FCGX_Init();
    FCGX_Request request;
    FCGX_InitRequest(&request, 0, 0);

    FLAGS_log_dir = "/root/7_codes/logserver_test/cgi_demo/logs";
    FLAGS_max_log_size = 100;
    FLAGS_logbufsecs = 0;
    google::InitGoogleLogging(argv[0]);

    while(FCGX_Accept_r(&request) >= 0) {
        string query_str = FCGX_GetParam("QUERY_STRING", request.envp);

        cout << "query str: " << query_str << endl;
        LOG(INFO) << query_str;
        LOG(WARNING) << query_str;
        LOG(ERROR) << query_str;

        send_response(request, query_str);
    }

    return 0;
} 

step3: 在当前目录新建目录logs:mkdir logs
step4: 编译:]# g++ -lglog -lfcgi client.cpp -o client (这里引入了glog)
step5: 关掉之前的cgi进程


step6: 加入托管服务:]# /usr/local/bin/spawn-fcgi -a 127.0.0.1 -p 8099 -f /usr/local/src/logserver_base/cgi_demo/client

step7: 验证:在浏览器访问:http://192.168.36.101/behavior_recsys?userid=111&item=222&action=click
此时可以观察日志输出:

1.9 将thrift和cgi联合打通,完整日志服务

thrif和cgi的区别:
rpc好处
1.安全,数据加密(二进制)
2.数据压缩,因为是二进制方式,比字符串方式规模要小很多,所以节省带宽,提升性能
3.常用于内部服务之间的互通(解耦),而http更倾向于web
thrift——内部服务,解耦,安全(数据加密)
cgi——web服务
问题:nginx本身就是日志服务,为什么要引入cgi?
如果只是定位一个日志服务器的话,可以不需要,但如果要开发推荐引擎或搜索引擎的话,就必须这样的架构来做。

step1: 启动server



server代码:

// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.

#include "RecSys.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>

using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;

using boost::shared_ptr;

class RecSysHandler : virtual public RecSysIf {
 public:
  RecSysHandler() {
    // Your initialization goes here
  }

  void rec_data(std::string& _return, const std::string& data) {
    // Your implementation goes here
   // printf("rec_data\n");
      std::cout << "Receive Data: " << data << std::endl;
      _return = "I'm OK !!!";
  }

};

int main(int argc, char **argv) {
  int port = 9090;
  shared_ptr<RecSysHandler> handler(new RecSysHandler());
  shared_ptr<TProcessor> processor(new RecSysProcessor(handler));
  shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
  shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
  shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());

  TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
  server.serve();
  return 0;
}

step2: 开发client.cpp

#include "RecSys.h"

#include <iostream>
#include <string>

#include <stdio.h>
#include <stdlib.h>

#include <fcgi_stdio.h>
#include <fcgiapp.h>

#include <transport/TSocket.h>
#include <transport/TBufferTransports.h>
#include <protocol/TBinaryProtocol.h>

#include <glog/logging.h>

using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using namespace std;

inline void send_response(
            FCGX_Request& request, const string& resp_str) {

    FCGX_FPrintF(request.out, "Content-type: text/html;charset=utf-8\r\n\r\n");
    FCGX_FPrintF(request.out, "%s", resp_str.c_str());

    FCGX_Finish_r(&request);
}

int main(int argc, char **argv) {

    // step 1. init fcgi
    FCGX_Init();
    FCGX_Request request;
    FCGX_InitRequest(&request, 0, 0);

    // step 2. init glog
    FLAGS_log_dir = "/usr/local/src/logserver_base/cgi_demo/logs";
    FLAGS_max_log_size = 100;
    FLAGS_logbufsecs = 0;
    google::InitGoogleLogging(argv[0]);

    // step 3. init thrift
    boost::shared_ptr<TSocket> socket(new TSocket("localhost", 9090));
    boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
    boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));

    transport->open();

    RecSysClient client(protocol);

    while(FCGX_Accept_r(&request) >= 0) {
        string query_str = FCGX_GetParam("QUERY_STRING", request.envp);

        cout << "query str: " << query_str << endl;
        LOG(ERROR) << query_str;

        // request to thrift server
        string recevie_data;
        client.rec_data(recevie_data, query_str);

        // return info back http
        send_response(request, recevie_data);
    }

    return 0;
}

step3: 编译
g++ -I/usr/local/include/thrift -lthrift -lglog -lfcgi RecSys.cpp RecSys_constants.cpp client.cpp -o client
step4:托管


step5:测试
先删除logs下的日志,浏览器访问:http://192.168.36.101/behavior_recsys?userid=111&item=222&action=click
日志成功写入:


网页上和客户端也成功返回:

2.0压力测试

step1: 安装压测包:]# yum install httpd-tools
step2:压测命令:ab -c 20 -n 5000 http://192.168.36.101/behavior_recsys?userid=111&item=222&action=click
http://192.168.87.10:9088/
一次性发5000个请求,请求情况:


可以看到,平均每次请求时间为0.232毫秒
日志行数正确:

step3: 接下来模拟真实请求,积累多样性的日志:
]# cat request.py

import os
import sys
import random

base_url='http://[http://192.168.36.101](http://192.168.36.101/)
/behavior_recsys?'

action_type_list = ['show', 'click', 'collect', 'pay']
ip_addr_list = ['10.1.1.3', '10.5.2.76', '10.1.2.90', '10.0.2.188', '10.10.1.19']

url_list = []
for i in range(1000):
    userid = "userid=" + str(int(random.random() * 50))
    itemid = "itemid=" + str(random.randint(30001, 30050))
    action_type = "type=" + random.sample(action_type_list, 1)[0]
    ip = "ip=" + random.sample(ip_addr_list, 1)[0]

    url = "\"" + base_url +'&'.join([userid, itemid, action_type, ip]) + "\""
    url_list.append(url)

for i in url_list:
    #os.system('ab -c 20 -n5000 ' + url)
    os.system('curl ' + i)

step4: 和python的web服务性能比较
python服务代码:

import web

urls = (
    '/(.*)', 'hello'
)
app = web.application(urls, globals())

class hello:
    def GET(self, name):
        return 'ok!!!'

if __name__ == "__main__":
    app.run()

启动:]# python pyserver.py 9999



压测:
ab -c 20 -n 5000 http://192.168.36.101:9999/
压测结果:



我们发现,python服务请求,业务很简单但是平均每次请求时间为2.19毫秒。而c++虽然做了很多层业务调用,但是比起简单业务的python性能上还是高很多,要高出2.19/0.232,约为10倍!

2. 实时对接用户行为log,Flume进行实时流的打通

2.1FlumeHbaseEventSerializer开发

利用IDE,完成FlumeHbaseEventSerializer的开发,编译jar包
将生成的/root/IdeaProjects/FlumeTest/target/FlumeTest-1.0-SNAPSHOT.jar文件
拷贝到/usr/local/src/apache-flume-1.6.0-bin/lib目录下

step1: pom配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>FlumeHbase</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>1.3.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.6.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flume.flume-ng-sinks</groupId>
            <artifactId>flume-ng-hbase-sink</artifactId>
            <version>1.6.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.5.3</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>

    </build>


</project>

step2. FlumeHbaseEventSerializer实现:

import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.conf.ComponentConfiguration;
import org.apache.flume.sink.hbase.HbaseEventSerializer;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Row;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;

public class FlumeHbaseEventSerializer implements HbaseEventSerializer {
    public static final String REGEX_CONFIG = "regex";
    public static final String REGEX_DEFAULT = " ";
    public static final String INGNORE_CASE_CONFIG = "regexIgnoreCase";
    public static final boolean INGNORE_CASE_DEFAULT = false;
    public static final String COL_NAME_CONFIG = "colNames";
    public static final String COL_NAME_DEFAULT = "ip";
    public static final String ROW_KEY_INDEX_CONFIG = "rowKeyIndex";
    public static final String ROW_KEY_NAME = "ROW_KEY";
    public static final String DEPOSIT_HEADERS_CONFIG = "depositHeaders";
    public static final boolean DEPOSIT_HEADERS_DEFAULT = false;
    public static final String CHARSET_CONFIG = "charset";
    public static final String CHARSET_DEFAULT = "utf-8";

    protected static final AtomicInteger nonce = new AtomicInteger(0);
    protected static String randomKey = RandomStringUtils.randomAlphanumeric(10);
    protected byte[]cf;
    private byte[]payload;
    private List<byte[]> colNames = Lists.newArrayList();
    private boolean regexIngnoreCase;
    private Charset charset;

    public void initialize(Event event, byte[] columnFamily) {
        event.getHeaders();
        this.payload = event.getBody();
        this.cf = columnFamily;
    }


    public List<Row> getActions() {
        ArrayList<Row> actions = Lists.newArrayList();
        byte[] rowKey;
        String body = new String(payload,charset);
        String tmp = body.replace("\"","");
        String[] arr = tmp.trim().split(" ");
        int len = arr.length;
        System.out.println("tmp:"+tmp);
        System.out.println("len1:"+len);
        String log_data = arr[len-1];
        String[] param_arr = log_data.split("&");
        System.out.println("===================="+log_data);
        String userid = param_arr[0].split("=")[1];
        String itemid = param_arr[1].split("=")[1];
        String type = param_arr[2].split("=")[1];
        String ip_str = param_arr[3].split("=")[1];
        System.out.println("=====================");
        System.out.println("=====================");
        System.out.println("=====================");
        System.out.println("=====================");
        System.out.println(userid);
        System.out.println(itemid);
        System.out.println(type);
        System.out.println(ip_str);
        System.out.println("=====================");
        System.out.println("=====================");
        System.out.println("=====================");
        System.out.println("=====================");
        try {
            System.out.println("0000=============");
            rowKey = getRowKey();
            System.out.println("1111=============");
            Put put = new Put(rowKey);
            put.add(cf,colNames.get(0),userid.getBytes(Charsets.UTF_8));
            put.add(cf,colNames.get(1),itemid.getBytes(Charsets.UTF_8));
            put.add(cf,colNames.get(2),type.getBytes(Charsets.UTF_8));
            put.add(cf,colNames.get(3),ip_str.getBytes(Charsets.UTF_8));
            actions.add(put);
        } catch (Exception e){
            throw new FlumeException("could not get row key!",e);
        }
        return actions;
    }

    public List<Increment> getIncrements() {
        return Lists.newArrayList();
    }

    public void close() {

    }

    public void configure(Context context) {
        String regex = context.getString(REGEX_CONFIG,REGEX_DEFAULT);
        regexIngnoreCase = context.getBoolean(INGNORE_CASE_CONFIG,INGNORE_CASE_DEFAULT);
        context.getBoolean(DEPOSIT_HEADERS_CONFIG,DEPOSIT_HEADERS_DEFAULT);
        Pattern.compile(regex, Pattern.DOTALL+(regexIngnoreCase? Pattern.CASE_INSENSITIVE: Pattern.UNIX_LINES));
        charset = Charset.forName(context.getString(CHARSET_CONFIG,CHARSET_DEFAULT));

        String cols = new String(context.getString("columns"));
        String colNameStr = "";
        if(cols != null && !"".equals(cols)){
            colNameStr = cols;
        }else {
            colNameStr = context.getString(COL_NAME_CONFIG,COL_NAME_DEFAULT);
        }
        String[]colmnNames = colNameStr.split(",");
        for (String s:colmnNames){
            colNames.add(s.getBytes(charset));
        }
    }

    public void configure(ComponentConfiguration componentConfiguration) {

    }

    protected byte[] getRowKey(Calendar cal){
        String str = new String(payload,charset);
        String tmp = str.replace("\"","");
        String[]arr = tmp.split(" ");
        int len = arr.length;
        System.out.println("tmp:"+tmp);
        System.out.println("len2:"+len);
        String log_data = arr[len-1];
        String[] param_arr = log_data.split("&");
        String userid = param_arr[0];
        String itemid = param_arr[1];
        String type = param_arr[2];
        String ip_str = param_arr[3];
        String rowKey = ip_str + "-"+ nonce.getAndIncrement();
        return rowKey.getBytes(charset);
    }

    protected byte[] getRowKey(){
        return getRowKey(Calendar.getInstance());
    }
}


step3: 打jar包



step4:将jar包拷贝到flume的lib目录中


2.2 Flume服务端和客户端配置

step1 :服务端配置
/usr/local/src/apache-flume-1.6.0-bin/conf/logserver_flume_hbase目录下,指定一下sink方式:
flume server端配置文件:flume-server.properties:

# 定义这个agent中各组件的名字
a1.channels = c1
a1.sources = r1
a1.sinks = k1

# set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.type = avro
a1.sources.r1.bind = master
a1.sources.r1.port = 52020

a1.sinks.k1.type=hbase
a1.sinks.k1.table=user_action_table
a1.sinks.k1.columnFamily=action_log
a1.sinks.k1.serializer=com.badou.FlumeHbaseEventSerializer
a1.sinks.k1.serializer.columns=userid,itemid,type,ip


# 描述和配置source  channel   sink之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

step2 :Flume客户端:(slave1和master的flume作为客户端)
flume client端配置文件:flume-client.properties:

# 定义这个agent中各组件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 描述和配置source组件:r1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/local/logserver_study/cgi_demo/logs/client.ERROR

# 描述和配置sink组件:k1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = master
a1.sinks.k1.port = 52020

# 描述和配置source  channel   sink之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.3 Flume和Hbase打通之前,我们需要准备Hadoop和Hbase

step1: 启动Hbase依赖的zookeeper
step1: 启动Hadoop:start-all.sh
step2:启动Hbase:start-hbase.sh
step3:进程检查:



step4: hbase创建日志表
打开hbase客户端并创建表:


2.4 启动Flume,打通Hbase

step1: master节点启动服务端
]# ./bin/flume-ng agent --conf conf --conf-file ./conf/logserver_flume_hbase/flume-server.properties --name a1 -Dflume.root.logger=INFO,console


step2: master节点启动客户端
]# ./bin/flume-ng agent --conf conf --conf-file ./conf/logserver_flume_hbase/flume-client.properties --name a1 -Dflume.root.logger=INFO,console


step3: slave1节点启动客户端
]# ./bin/flume-ng agent --conf conf --conf-file ./conf/logserver_flume_hbase/flume-client.properties --name a1 -Dflume.root.logger=INFO,console

2.5 日志采集测试
  1. 在浏览器模拟访问:http://192.168.36.101/behavior_recsys?userid=111&item=222&action=click&ip=1.0.0.10

  2. 观察flume服务端控制台:



    发现日志都打印出来了,说明日志能正常采集到,但是报了错:
    Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Put.setWriteToWAL(Z)V
    这个错误说明我的hbase版本和flume版本不兼容,我的flume版本是1.6.0,hbase是0.98.6。
    将Flume版本切换为1.7后,启动Flume Server和Flume Client后,查看server端控制台,发现没有报错了。


  3. 观察hbase,查看user_action_table表数据是否被插入:



    我们看到,hbase的user_action_table表已经有数据了,说明我们的数据采集到数据存储已经成功打通!

2.6 完成Nginx的负载均衡(支撑高并发)

step1: 配置计划:
192.168.87.10 master ——> server1
192.168.87.11 slave1 ——> server2
192.168.87.12 slave2 ——> proxy
step2: nginx配置

  1. master的配置不变:
location ~ /behavior_recsys$ {
            fastcgi_pass 127.0.0.1:8099;
            include fastcgi_params;
        }
  1. slave1的配置
    1.和master一致,前提是和master一样地能提供rpc服务,启动rpc client端:
    ]# g++ -I/usr/local/include/thrift -lthrift -lglog -lfcgi RecSys.cpp RecSys_constants.cpp client.cpp -o client
    /usr/local/bin/spawn-fcgi -a 127.0.0.1 -p 8099 -f /usr/local/src/logserver_base/schema/gen-cpp/client

2.模拟nginx,直接调用master的rpc服务,配置如下:

location ~ /behavior_recsys$ {
            fastcgi_pass 192.168.36.101:8099;
            include fastcgi_params;
        } 
  1. slave2的代理分发配置
 18     upstream recsys {
 19         server 192.168.36.101:80;
 20         server 192.168.36.102:80;
 21     }

 48         location ~ /behavior_recsys$ {
 49             proxy_pass http://recsys;
 50         }

step3 :进入nginx主目录重新加载:./sbin/nginx -s reload

2.7flume并发

step1:将master主机下的flume jar包,flume conf下的flume-server.properties,flume-client.properties文件拷贝到slave1.
step2:进入flume conf目录启动slave1上的client端:
./bin/flume-ng agent --conf conf --conf-file ./conf/logserver_flume_hbase/flume-client.properties --name a1 -Dflume.root.logger=INFO,console

2.8: 测试:直接访问slave2的服务,看它能否转发到master或slave1来进行存储数据,访问:

http://192.168.36.103/behavior_recsys?userid=123&item=111&action=click&ip=1.0.0.10
如下图,我们发现数据插入成功:

至此,我们完成了从NGINX到cglib到server再到flume,最终到hbase的支持高并发的数据采集流程完成。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,922评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,591评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,546评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,467评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,553评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,580评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,588评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,334评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,780评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,092评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,270评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,925评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,573评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,194评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,437评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,154评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352

推荐阅读更多精彩内容