数据平台实践①——Flume+Kafka+SparkStreaming(pyspark)

蜻蜓点水


Flume——数据采集

如果说,爬虫是采集外部数据的常用手段的话,那么,Flume就是采集内部数据的常用手段之一(logstash也是这方面的佼佼者)。
下面介绍一下Flume的基本构造。

  • Agent:包含Source、Channel和Sink的主体,它是这3个组件的载体,是组成Flume的数据节点。
  • Event:Flume 数据传输的基本单元。
  • Source: 用来接收Event,并将Event批量传给Channel。
  • Channel:Source和Sink之间的Event缓冲通道,它有个type属性,一般为memory,可以提高传输速度。
  • Sink:负责将数据沉淀到最终存储区,或沉淀给下一个source,形成数据流。
Flume

在大致了解了以上要素之后,通过上图,我们就可以有一个大概的认识。一句话讲,Source接收数据,并转成Event单元,然后导入Channel缓冲通道,最后,经由Sink进行数据沉淀。当然这里的沉淀,有多种选择,除了上图中的HDFS外,还包括HBase、File,或者作为另一个Source的源。在一系列过程,一条有序的数据流就诞生了。

Kafka——数据的发布/订阅

Kafka,作为基于发布/订阅的消息系统,以其分布式性而受到大家的喜爱。
下面介绍一下Kafka的基本构造。

  • Broker(代理): Kafka集群可由一个或多个服务器组成,其中的每个服务节点称作这个集群的一个Broker。
  • Topic(主题): 一个Topic对应一类消息,Topic用作为消息划分类别。
  • Partition(分区): 一个Topic一般含有多个分区。
  • Producer(生产者):消息生产者,负责生产Topic消息。
  • Consumer(消费者): 消息消费者,负责消费Topic消息。


    Kafka

Zookeeper——服务器间协调

这里需要提一下Zookeeper,对于Kafka这样的分布式服务,大多需要多台服务器相互协调工作,且保持一致性。任意一台服务器出现问题,如果不及时处理,都有可能导致整个服务的崩溃,其后果是不堪设想的。ZooKeeper的分布式设计,可用于领导人选举、群组协同工作和配置服务等,保证了服务的一致性和可用性。

Zookeeper

Spark Streaming——Spark核心API

Spark Streaming属于Spark的核心api,它支持高吞吐量、支持容错的实时流数据处理。它可以通过Kafka、HDFS、Flume等多种渠道获取数据,转换数据后利用Spark Engine进行数据处理。现在,包括Python、Java等多种高级语言都对Spark进行支持。本文使用pyspark进行编程。

Spark Streaming

实践出真知


要做什么

nginx日志分析,简单统计了下PV和UV,并做了H5图表实时展示。使用的是我开发的基于ace-admin和react的管理端LogAdmin对数据进行展示。这里提供github,感兴趣的朋友可以看一下。

LogAdmin

下面是我的主要步骤。
①Flume实时读入nginx日志,并将数据导入Kafka中。
②pyspark从Kafka读入数据,做实时处理,并将处理后的数据导出到redis队列中。
③编写脚本从redis中取出数据,存入mysql。
④H5展示。
【版本:Logstash1.7.0,Kafka 2.11(该版本中已集成了Zookeeper),Spark(2.0.2)】

①Flume实时读入nginx日志,并将数据导入Kafka中。

这一步中,只需配置flume.conf,并依次启动flume、zookeeper、kafka即可
flume.conf(配置中命名已较为明确,hdfs部分被注释了)

# agent-my80

# Finally, now that we've defined all of our components, tell
# agent-my80 which ones we want to activate.
#agent-my80.channels = ch1
#agent-my80.sources = avro-source1
#agent-my80.sinks = hdfs-sink1

agent-my80.channels = ch2
agent-my80.sources = exec-source1
agent-my80.sinks = kafka-sink1

# Define a memory channel called ch1 on agent-my80
#agent-my80.channels.ch1.type = memory
agent-my80.channels.ch2.type = memory


# Define an Avro source called avro-source1 on agent-my80 and tell it
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
#agent-my80.sources.avro-source1.channels = ch1
#agent-my80.sources.avro-source1.type = avro
#agent-my80.sources.avro-source1.bind = 0.0.0.0
#agent-my80.sources.avro-source1.port = 44444
#agent-my80.sources.avro-source1.basenameHeader = true


agent-my80.sources.exec-source1.channels = ch2
agent-my80.sources.exec-source1.type = exec
agent-my80.sources.exec-source1.command = tail -f /home/www/logs/access.log

# # Define a logger sink that simply logs all events it receives
# # and connect it to the other end of the same channel.
#agent-my80.sinks.hdfs-sink1.channel = ch1
#agent-my80.sinks.hdfs-sink1.type = hdfs
#agent-my80.sinks.hdfs-sink1.hdfs.path = hdfs://my80:9000/flume-test
#agent-my80.sinks.hdfs-sink1.hdfs.filePrefix = event-
#agent-my80.sinks.hdfs-sink1.hdfs.filePrefix = %{basename}
#agent-my80.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true
#agent-my80.sinks.hdfs-sink1.hdfs.round = true
#agent-my80.sinks.hdfs-sink1.hdfs.roundValue = 10
#agent-my80.sinks.hdfs-sink1.hdfs.fileType = DataStream

agent-my80.sinks.kafka-sink1.channel = ch2
agent-my80.sinks.kafka-sink1.type = org.apache.flume.sink.kafka.KafkaSink 
agent-my80.sinks.kafka-sink1.topic = my80-log
agent-my80.sinks.kafka-sink1.brokerList = localhost:9092
agent-my80.sinks.kafka-sink1.batchSize = 20

flume启动命令

flume-ng agent --conf /usr/local/apache-flume-1.7.0-bin/conf --conf-file /usr/local/apache-flume-1.7.0-bin/conf/flume.conf --name agent-my80 -Dflume.root.logger=INFO,console

zookeeper启动命令

/usr/local/kafka_2.11-0.10.1.0/bin/zookeeper-server-start.sh /usr/local/kafka_2.11-0.10.1.0/config/zookeeper.properties

kafka启动命令

/usr/local/kafka_2.11-0.10.1.0/bin/kafka-server-start.sh /usr/local/kafka_2.11-0.10.1.0/config/server.properties

注意:有些朋友,是用自己的个人服务器做相关实践,那么会遇到内存不足的问题,这时候一般通过,修改Java堆大小来解决。比如我是修改的kafka的kafka-server-start.sh和zookeeper-server-start.sh来解决这个问题。

#export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" 
export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"

②pyspark从Kafka读入数据,做实时处理,并将处理后的数据导出到redis队列中。

这部分为方便站点解析,我对nginx日志格式做了修改。
该步骤主要是做正则解析+MapReduce+数据导入redis,并分别将请求内容和请求ip放入redis的list和set,这样主要是方便我统计每天的PV和UV。
还要注意一点,nginx日志中包括静态文件,显然这个不能算UV和PV,所以要过滤。

calculate.py

#coding=utf8
from __future__ import print_function
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import re
import redis
import datetime

# 解析日志
def parse(logstring):
    #使用正则解析日志
    # regex = '(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}).*ip=\/(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}).*tbl=([a-zA-Z0-9_]+)'
    regex = 'ip:(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}).*?time:\[(.*?)\].*?request:\"(.*?)\".*?status_code:(\d{1,3}).*?agent:\"(.*?)\"'
    pattern = re.compile(regex)
    m1 = pattern.search(str(logstring))
    if m1 is not None:
        m = m1.groups()
        # print(m
        if len(m)!=5 or not m[2]:
            m= None
        else:
            hd_list=[u".js",u".css",u".jpg",u".png",u".jpeg",u".gif",u".bmp",u".woff"];
            if doStrContainAnyWords(m[2],hd_list):
                m= None
    else:
        m = None
    return m

def doStrContainAnyWords(str,words=[]):
    for word in words:
        if word in str:
            return True;
    return False;

class RedisClient:
    pool = None
    def __init__(self):
        self.getRedisPool()

    def getRedisPool(self):
        redisIp='localhost'
        redisPort=6379
        redisDB=0
        self.pool = redis.ConnectionPool(host=redisIp, port=redisPort, db=redisDB)
        return self.pool

    def addToHashSet(self, key, value):
        if self.pool is None:
            self.pool = self.getRedisPool()
        r = redis.Redis(connection_pool=self.pool)
        hashSetName="my80-log-iphash-"+datetime.datetime.now().strftime("%Y-%m-%d");

        flag=False;
        if r.exists(hashSetName) is False:
            flag=True

        if r.hexists(hashSetName,str(key)):
            r.hincrby(hashSetName, str(key), value)
        else:
            r.hset(hashSetName, str(key), value)

        if flag is True:
            r.expire(hashSetName,3600*24+300);


    def addToList(self,value):
        if self.pool is None:
            self.pool = self.getRedisPool()
        r = redis.Redis(connection_pool=self.pool)
        r.lpush('my80-log-list', value)

if __name__ == '__main__':
    zkQuorum = 'localhost:2181'
    topic = 'my80-log'
    sc = SparkContext("local[2]", appName="kafka_pyspark_redis")
    ssc = StreamingContext(sc, 10)
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "kafka-streaming-redis", {topic: 1})
    #kafka读取返回的数据为tuple,长度为2,tuple[1]为实际的数据,tuple[0]的编码为Unicode
    

    res = kvs.map(lambda x: x[1]).map(lambda x:parse(x)).filter(lambda x:True if x is not None else False)
    items = res.map(lambda item:{"ip":item[0],"time":item[1],"request":item[2],"status_code":item[3],"agent":item[4] } )
    # items = res.map(lambda item:{"ip":item[0],"time":item[1] } )
    # ipcount = res.map(lambda item:(item[0],1)).reduceByKey(lambda a, b: a+b).map(lambda x:{ x[0]:str(x[1]) } )
    ipcount = res.map(lambda item:(item[0],1)).reduceByKey(lambda a, b: a+b)

    r = RedisClient()

    def handleItem(time,rdd):
        if rdd.isEmpty() is False:
            for element in rdd.collect():
                r.addToList(element)
    items.foreachRDD(handleItem)
    

    def ipHandle(time,rdd):
        if rdd.isEmpty() is False:
            # rddstr = "{"+','.join(rdd.collect())+"}"
            for element in rdd.collect():
                r.addToHashSet(element[0], element[1] )
    ipcount.foreachRDD(ipHandle)
  

    ssc.start()
    ssc.awaitTermination()

安装好spark-2.0.2-bin-hadoop2.7,脚本测试ok,最后就需要通过spark streaming提交任务(即提交calculate.py)。任务正常执行的话,数据就会从Kafka导出,经处理后,导入redis。

/usr/local/spark-2.0.2-bin-hadoop2.7/bin/spark-submit --jars /usr/local/spark-2.0.2-bin-hadoop2.7/spark-streaming-kafka-0-8-assembly_2.11-2.0.2.jar /home/dcb/python/pyspark/calculate.py

③编写脚本从redis中取出数据,存入mysql。

这一步相信大家没问题。

④H5图表展示

H5图表实时展示+github,感兴趣的朋友可以看一下。

小结

Flume+Kafka+Spark,是一个相对比较流行且可行的实时计算组合,可定制性较高,如果项目需求比较复杂,建议深入了解后进行定制开发。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,941评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,397评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,345评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,851评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,868评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,688评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,414评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,319评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,775评论 1 315
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,945评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,096评论 1 350
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,789评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,437评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,993评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,107评论 1 271
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,308评论 3 372
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,037评论 2 355

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,659评论 18 139
  • ** 今天看了一下kafka官网,尝试着在自己电脑上安装和配置,然后学一下官方document。** Introd...
    RainChang阅读 5,005评论 1 30
  • kafka的定义:是一个分布式消息系统,由LinkedIn使用Scala编写,用作LinkedIn的活动流(Act...
    时待吾阅读 5,324评论 1 15
  • 目前为止,已经讨论了机器学习和批处理模式的数据挖掘。现在审视持续处理流数据,实时检测其中的事实和模式,好像从湖泊来...
    abel_cao阅读 9,008评论 1 20
  • 小时候父母总以表姐作我的范本。而表姐也确实是不容易。 当时舅舅因为在工地上打工从脚手架摔下来,虽然保住了一条命,但...
    蒹葭苍苍123阅读 386评论 2 1