简单之美 | Kafka+Spark Streaming+Redis实时计算整合实践
http://shiyanjun.cn/archives/1097.html
//
我们的应用场景是分析用户使用手机App的行为,描述如下所示:
手机客户端会收集用户的行为事件(我们以点击事件为例),将数据发送到数据服务器,我们假设这里直接进入到Kafka消息队列
后端的实时服务会【从Kafka消费数据】,将数据读出来并进行【实时分析】,这里选择Spark Streaming,因为Spark Streaming提供了与Kafka整合的内置支持
经过Spark Streaming实时计算程序分析,将【(实时)结果写入Redis】,可以【实时获取用户的行为数据】,并可以【导出进行离线综合统计分析】
在Spark Streaming中,每个DStream包含了一个时间间隔之内的数据项的集合,我们可以理解为指定时间间隔之内的一个batch,每一个batch就构成一个RDD数据集,所以DStream就是一个个batch的有序序列,时间是连续的,按照时间间隔将数据流分割成一个个离散的RDD数据集,如图所示(来自官网):
我们都知道,Spark支持两种类型操作:Transformations和Actions。Transformation从一个已知的RDD数据集经过转换得到一个新的RDD数据集,这些【Transformation操作包括map、filter、flatMap、union、join等】,而且Transformation具有lazy的特性,调用这些操作并没有立刻执行对已知RDD数据集的计算操作,而是在调用了另一类型的Action操作才会真正地执行。Action执行,会真正地对RDD数据集进行操作,返回一个【计算结果给Driver程序】,或者没有返回结果,如将计算结果数据进行持久化,【Action操作包括reduceByKey、count、foreach、collect等】。关于Transformations和Actions更详细内容,可以查看官网文档。
同样、Spark Streaming提供了类似Spark的两种操作类型,分别为Transformations和Output操作,它们的操作对象是DStream,作用也和Spark类似:Transformation从一个已知的DStream经过转换得到一个新的DStream,而且Spark Streaming还额外增加了一类针对【Window的操作,当然它也是Transformation,但是可以更灵活地控制DStream的大小(时间间隔大小、数据元素个数)】,例如window(windowLength, slideInterval)、countByWindow(windowLength, slideInterval)、reduceByWindow(func, windowLength, slideInterval)等。Spark Streaming的Output操作允许我们将DStream数据输出到一个外部的存储系统,如数据库或文件系统等,执行Output操作类似执行Spark的Action操作,使得该操作之前lazy的Transformation操作序列真正地执行。
//
首先,写了一个Kafka Producer模拟程序,用来模拟向Kafka实时写入用户行为的事件数据,数据是JSON格式,示例如下:
1
{"uid":"068b746ed4620d25e26055a9f804385f","event_time":"1430204612405","os_type":"Android","click_count":6}
一个事件包含4个字段:
uid:用户编号
event_time:事件发生时间戳
os_type:手机App操作系统类型
click_count:点击次数
//
【无论是在本地模式、Standalone模式,还是在Mesos或YARN模式下,整个Spark集群的结构都可以用上图抽象表示,只是各个组件的运行环境不同,导致组件可能是分布式的,或本地的,或单个JVM实例的】。如在本地模式,则上图表现为在同一节点上的单个进程之内的多个组件;而在YARN Client模式下,Driver程序是在YARN集群之外的一个节点上提交Spark Application,其他的组件都运行在YARN集群管理的节点上。在Spark集群环境部署Application后,【在进行计算的时候会将作用于RDD数据集上的函数(Functions)发送到集群中Worker上的Executor上(在Spark Streaming中是作用于DStream的操作)】,那么这些函数操作所作用的对象(Elements)必须是可序列化的,通过Scala也可以使用lazy引用来解决,否则这些对象(Elements)在跨节点序列化传输后,无法正确地执行反序列化重构成实际可用的对象。上面代码我们使用lazy引用(Lazy Reference)来实现的,代码如下所示: