在非结构化数据领域,技术带来了前所未有的爆炸性变化。移动设备、Web站点、社交媒体、科学仪器、卫星、IoT设备以及监控摄像头这样的数据源每秒钟都会产生大量的图片和视频。
核心要点
- 为了可靠且高效地处理大规模的视频流数据,需要有一个可扩展、能容错、松耦合的分布式系统;
- 本文中的示例应用使用开源的技术来构建这样的系统,这些技术包括OpenCV、Kafka和Spark。另外,还可以使用Amazon S3或HDFS进行存储;
- 该系统包含了三个主要的组件:视频流收集器(Video Stream Collector)、流数据缓冲(Stream Data Buffer)以及视频流处理器(Video Stream Processor);
- 视频流收集器需要与一个网络摄像机(IP camera)集群协同工作,这些摄像机提供视频内容的实时流数据,并且还会使用OpenCV视频处理库把视频流转换为帧,将数据以JSON的格式传递给Kafka Broker,供流数据缓冲组件使用;
- 视频流处理组件基于Apache Spark构建,同样会使用OpenCV进行视频流数据的处理。
在非结构化数据领域,技术带来了前所未有的爆炸性变化。移动设备、Web站点、社交媒体、科学仪器、卫星、IoT设备以及监控摄像头这样的数据源每秒钟都会产生大量的图片和视频。
管理和有效分析这些数据是一个很大的挑战,我们可以考虑一下某个城市的视频监控网络。试图监控每个摄像头的视频流来发现感兴趣的对象和事件是不现实且低效的。相反,计算机视觉(computer vision,CV)库能够处理这些视频流并提供智能的视频分析和对象探测结果。
但是,传统的CV系统有一定的局限性。在传统的视频分析系统中,带有CV库的服务器会同时收集和处理数据,所以服务器的故障将会丢失视频流数据。探测节点故障并将处理进程转移到其他节点上可能会导致碎片化的数据。
有很多的实际任务将大数据相关的技术推进到了视频流分析领域:并行且按需处理大规模的视频流、从视频帧中抽取不同的信息集、使用不同的机器学习库分析数据、将分析得到的数据以流的方式发送到应用的不同组件中以便于后续处理、将处理后的数据以不同的格式进行输出等等。
视频流分析——动作感应
为了可靠且高效地处理大规模的视频流数据,需要有一个可扩展、能容错、松耦合的分布式系统。在本文所讨论的视频流分析中,我们将会讨论这些原则。
视频流分析包括如下的类型:
- 对象跟踪(object tracking)
- 动作感应(motion detection)
- 面部识别(face recognition)
- 手势识别(gesture recognition)
- 增强现实(augmented reality)
- 图像分割(image segmentation)
本文中示例应用的使用场景将会是视频流中的动作感应。
动作感应指的是查找一个物体(通常会是人)相对于其周边环境位置变化的过程。它大多数用于持续监视特定区域的视频监控系统。CV库提供的算法会分析这种摄像机所提供的实时视频并查找所发生的动作。如果感应到动作的话,将会触发一个事件,这个事件可以发送消息给应用或提示用户。
在本文中,用于视频流分析的应用由三个主要的组件组成:
- 视频流收集器(video stream collector)
- 视频数据缓冲(stream data buffer)
- 视频流处理器(video stream processor)
视频流收集器要接受来自网络摄像机集群的视频流数据。这个组件将视频帧序列化为流数据缓冲,这是一个用于视频流数据的可容错数据队列(queue)。视频流处理器消费缓冲中的流数据并进行处理。这个组件将会使用视频处理算法在视频流数据中探测动作。最后,处理过的数据或图片文件会存储到S3 bucket或HDFS目录中。这个视频流处理系统在设计时使用了OpenCV、Apache Kafka以及Apache Spark框架。
OpenCV、Kafka和Spark简介
如下简单介绍了相关的框架。
OpenCV
OpenCV(Open Source Computer Vision Library)是一个基于BSD许可证开源的库。这个库使用C++编写,但是也提供了Java API。OpenCV包含了数百个CV算法,能够用来处理和分析图片及视频文件。请参考该文档了解更多细节。
Kafka
Apache Kafka是一个分布式的流平台,它提供了一个发布和订阅流记录(streams of records)的系统。这些记录能够按照可容错的方式进行存储,消费者可以处理这些数据。关于Kafka的更多信息,请参见该文档。
Spark
Apache Spark是一个快速、通用的集群计算系统。它提供了用于SQL和结构化数据处理的模块、用于机器学习的MLlib、用于图像处理的GraphX以及Spark Streaming。该文档中包含了关于Spark的更多细节。
系统架构
图1展现了视频流分析系统的架构图。
[图片上传中...(image-df0f69-1547015523382-1)]
<small>图1 视频流分析系统的架构图</small>
设计与实现
本示例应用的代码可以通过GitHub获取。
如下的章节介绍了样例中视频流收集器、流数据缓冲以及视频流处理器的设计与实现细节。
视频流收集器
视频流收集器会与一个网络摄像机集群协同工作,这些摄像机会提供实时视频。该组件必须要从每个摄像机读取数据并将视频流转换为一系列的视频帧。为了区分每个网络摄像机,收集器要通过camera.url和camera.id属性维护摄像机ID与URL之间的映射,这两个属性会在stream-collector.properties文件中定义。这些属性在定义时,可以按照逗号分隔的格式定义摄像机URL和ID的列表。不同的摄像机可能会以不同的规格来提供数据,比如编解码器(codec)、分辨率或者每秒的帧数。在通过视频流创建帧的时候,收集器必须要保留这些细节数据。
收集器使用OpenCV视频处理库将视频流转换为帧。每帧都会调整为所需的分辨率(比如640x480)。OpenCV将每帧或每幅图片存储为Mat对象。Mat需要转换为可连续的(字节数组)形式,在这个过程要保留帧的完整信息,比如rows、cols和type。视频流收集器使用如下的JSON信息格式来存储这些细节。
<pre style="margin: 0px; padding: 0px; overflow-wrap: break-word; white-space: pre-wrap; font-family: "courier new", courier, monospace;">
{"cameraId":"cam-01","timestamp":1488627991133,"rows":12,"cols":15,"type":16,"data":"asdfh"}
</pre>
cameraId
是摄像机的唯一ID。timestamp
是帧生成的时间。rows、cols
和type
是OpenCV Mat特定的细节信息。data
是基于base-64编码的字符串,代表了帧的字节数组。
视频流收集器使用Gson库将数据转换为JSON消息,这些消息会被发送至video-stream-event
topic上。它会使用KafkaProducer客户端将JSON消息发送至Kafka broker。KafkaProducer会将每个key发送至相同的分区并保证消息的顺序。
<pre style="margin: 0px; padding: 0px; overflow-wrap: break-word; white-space: pre-wrap; font-family: "courier new", courier, monospace;">
JsonObject obj = new JsonObject(); obj.addProperty("cameraId",cameraId); obj.addProperty("timestamp", timestamp); obj.addProperty("rows", rows); obj.addProperty("cols", cols); obj.addProperty("type", type); obj.addProperty("data", Base64.getEncoder().encodeToString(data)); String json = gson.toJson(obj); producer.send(new ProducerRecord<String, String>(topic,cameraId,json),new EventGeneratorCallback(cameraId));
</pre>
<small>图2 以JSON消息的格式发送图片的代码片段</small>
Kafka的设计场景主要是用来处理较小的文本信息,但是这里的JSON信息中包含了视频帧的字节数组,它会比较大(比如能够达到1.5MB),所以Kafka在处理较大的信息之前,需要进行配置的变更。如下的KafkaProducer属性需要进行调整:
batch.size
max.request.size
compression.type
请参见Kafka文档的Producer Configs章节以及本项目 GitHub上的代码和属性文件了解更多细节。
视频数据缓冲
为了无丢失地处理大量的视频流数据,将这些视频数据保存到临时存储中就是非常必要的了。对于收集器所产生的数据,Kafka broker的作用就像是一个缓冲队列(buffer queue)。Kafka使用文件系统来存储信息,对这些信息的保存时间是可以配置的。
如果在处理之前就将数据保存到存储中,那就能保证它的持久性,同时还能提升系统的整体性能,因为处理器可以根据负载在不同的时间按照不同的速度来处理数据。当数据的生成速度超过数据的处理速度时,这种方式能够提升系统的可靠性。
Kafka能够保证在单个分区中给定topic的消息顺序。如果数据的顺序比较重要的话,在处理这类数据时,该特性就是非常有用的。为了存储较大的信息,在Kafka服务器的server.properties
文件中需要调整如下的配置:
message.max.bytes
replica.fetch.max.bytes
请参见Kafka文档的Broker Configs章节来了解这些属性的详细信息。
视频流处理器
视频流处理器会执行下面的三个步骤:
- 从Kafka broker中以
VideoEventData
dataset的形式读取JSON信息; - 根据摄像机ID对
VideoEventData
dataset进行分组并将其传递给视频流处理器; - 根据JSON数据创建一个Mat对象并处理视频流数据。
视频流收集器是基于Apache Spark构建的。Spark提供了Spark Streaming API,该API能够使用离散的流(discretized stream)或DStream,并且还提供了基于dataset的新Structured Streaming API。本应用中的视频流收集器使用Structured Streaming API来消费和处理来自Kafka的数据。需要注意的是,本应用所处理的格式化数据是JSON消息的形式,视频流处理器所要处理的非结构化视频数据会作为JSON消息的属性。Spark的文档这样写道“Structured Streaming提供了快速、可扩展、容错、端到端且保证仅执行一次的流处理功能,用户无需考虑流的相关事宜”。这也是视频流处理器围绕Spark的 Structured Streaming设计的原因所在。Structured Streaming为结构化的文本数据提供了内置的支持,并且支持聚合查询(aggregation queries)的状态管理。该引擎还提供了一些其他的特性,比如处理非聚合查询以及datasets外部的状态管理(Spark 2.2.0的新特性)。
为了处理较大的信息,如下的Kafka消费者配置必须要传递给Spark引擎:
max.partition.fetch.bytes
max.poll.records
请参见Kafka文档的Consumer Configs章节了解这些属性的更多信息。
该组件的主类是VideoStreamProcessor
。这个类首先创建一个SparkSession
对象,它是与Spark SQL引擎协作的入口点。下一步是定义传入消息的模式(schema),这样的话,Spark就能够使用该模式将消息从字符串格式解析为JSON格式。Spark的bean encoder能够将其转换为Dataset<VideoEventData>
。VideoEventData是一个Java bean类,它会持有JSON消息的数据。
<pre style="margin: 0px; padding: 0px; overflow-wrap: break-word; white-space: pre-wrap; font-family: "courier new", courier, monospace;">
Dataset<VideoEventData> ds = spark.readStream().format("kafka") .option("kafka.bootstrap.servers",prop.getProperty("kafka.bootstrap.servers")) .option("subscribe",prop.getProperty("kafka.topic")) .option("kafka.max.partition.fetch.bytes",prop.getProperty("kafka.max.partition.fetch.bytes")) .option("kafka.max.poll.records", prop.getProperty("kafka.max.poll.records")) .load().selectExpr("CAST(value AS STRING) as message") .select(functions.from_json(functions.col("message"),schema).as("json")) .select("json.*").as(Encoders.bean(VideoEventData.class));
</pre>
<small>3 spark streaming处理kafka消息的代码片段</small>
接下来,groupByKey会根据摄像机的ID对数据集进行分组,得到KeyValueGroupedDataset<String, VideoEventData>。它会使用一个mapGroupsWithState transformation并作用于一组VideoEventData (Iterator<VideoEventData>),这些数据代表了本次批处理的视频帧,会根据摄像机ID进行分组。这个transformation会首先检查上一条已被处理的VideoEventData(视频帧)是否存在,并将其传递给视频处理器用于下一步的处理。在视频处理之后,上一条被处理的VideoEventData(视频帧)会被返回,而状态已更新。为了启动流应用,需要在console sink和update output模式下针对dataset调用writeStream方法。
请阅读GitHub上的属性文件和代码了解更多细节。
技术和工具 Technologies and Tools
如下的表格列出了该视频流分析系统所用到的技术和工具
| <small>技术和工具</small> | <small>版本</small> | <small>下载URL</small> |
| <small>JDK</small> | 1.8 | <small>http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html</small> |
| <small>Maven</small> | <small>3.3.9</small> | <small>https://maven.apache.org/download.cgi</small> |
| <small>ZooKeeper</small> | <small>3.4.8</small> | <small>https://zookeeper.apache.org/releases.html</small> |
| <small>Kafka</small> | <small>2.11-0.10.2.0</small> | <small>http://kafka.apache.org/downloads.html</small> |
| <small>Spark</small> | <small>2.2.0</small> | <small>http://spark.apache.org/downloads.html</small> |
| <small>OpenCV</small> | <small>3.2.0</small> | <small>http://opencv.org/releases.html</small> |
请参考文档了解这些工具的安装和配置。Kafka文档和Spark文档详细介绍了如何搭建环境以及如何以独立模式和集群模式运行应用。要安装OpenCV的话,请参考OpenCV文档。OpenCV也可以通过Anaconda安装。
构建与部署
本节详细介绍了如何构建和运行示例应用的视频流收集器和视频流处理器组件。这个应用既能用来处理离线的视频文件,也能处理实时的摄像机数据,但是在这里我们配置它分析一个离线示例视频文件。请按照下述的步骤构建和运行应用。
1.下载并安装上述表格中所列的工具。确保ZooKeeper和Kafka服务器已处于启动运行的状态;
2. 该应用会使用OpenCV原生库(.dll或.so),所以使用System.loadLibrary()
加载它们。在系统环境变量中设置这些原生库的目录路径或者将路径作为命令行参数传递进来。例如,对于64位的Windows机器,原生库文件(opencv_java320.dll和opencv_ffmpeg320_64.dll)的路径将会是{OpenCV Installation Directory} \build\java\x64。
3.stream-collector.properties
文件会将Kafka topic作为video-stream-event
。在Kafka中创建该topic和分区(partitions)。我们可以使用kafka-topic命令来创建topic和分区;
4. stream-processor.properties
文件有一个processed.output.dir
属性,它指定了处理后图片的保存路径。创建文件并为该属性设置目录路径;
5.stream-collector.properties
文件有一个camera.url
属性,它保存了视频文件或视频源的路径或URL。确保路径或URL的正确性;
6.检查VideoStreamCollector
和VideoStreamProcessor
组件中的log4j.properties
文件,设置stream-collector.log
和stream-processor.log
文件的目录路径。检查应用在这些日志文件中所生成的日志信息,如果应用在运行中出现错误的话,这些日志会有一定的用处;
7. 应用会使用来自OpenCV JAR文件的OpenCV API,但是在Maven中央仓库中并没有包含OpenCV JAR文件。在本应用中打包了OpenCV JAR文件,可以将其安装到本地Maven仓库中。在pom.xml文件中,maven-install-plugin
已经进行了配置,它会与clean阶段(phase)关联来安装这个JAR文件。为了将OpenCV JAR安装到本地Maven仓库中,切换至video-stream-processor文件夹并运行如下命令:
<pre style="margin: 0px; padding: 0px; overflow-wrap: break-word; white-space: pre-wrap; font-family: "courier new", courier, monospace;">
mvn clean
</pre>
8. 为了让应用的逻辑尽可能简单,VideoStreamProcessor
只处理新的消息。VideoStreamProcessor
应该要先于VideoStreamCollector
组件启动并运行。如果要通过Maven运行VideoStreamProcessor
,切换至video-stream-processor文件夹并执行如下命令:
<pre style="margin: 0px; padding: 0px; overflow-wrap: break-word; white-space: pre-wrap; font-family: "courier new", courier, monospace;">
mvn clean package exec:java -Dexec.mainClass="com.iot.video.app.spark.processor.VideoStreamProcessor"
</pre>
9.在VideoStreamProcessor
启动之后,我们接下来就可以启动VideoStreamCollector
组件了。切换至video-stream-collector文件夹并执行如下命令:
<pre style="margin: 0px; padding: 0px; overflow-wrap: break-word; white-space: pre-wrap; font-family: "courier new", courier, monospace;">
mvn clean package exec:java -Dexec.mainClass="com.iot.video.app.kafka.collector.VideoStreamCollector" -Dexec.cleanupDaemonThreads=false
</pre>
GitHub项目上打包了一个sample.mp4视频文件。这个视频文件的URL和ID已经通过camera.url
和camera.id
属性在stream-collector.properties文件中进行了配置。在处理完视频文件之后,图片将会存储到预先配置的目录中(参见第4步)。图4展现了应用的示例输出。
[图片上传中...(image-88112-1547015523380-0)]
<small>图4 动作感应的示例输出</small>
这个应用能够配置并处理多个视频源(包括离线的和实时的)。例如,除了sample.mp4文件之外,假设我们还要添加来自webcam的feed视频,编辑stream-collector.properties文件,在camera.url
属性中添加逗号分隔的整数值(第一个webcam对应0,第二个webcam对应2,以此类推),添加对应的摄像机ID到camera.id
属性中(cam-01,cam-02等等),同样要使用逗号分隔。如下是一个样例:
<pre style="margin: 0px; padding: 0px; overflow-wrap: break-word; white-space: pre-wrap; font-family: "courier new", courier, monospace;">
camera.url=../sample-video/sample.mp4,0
</pre>
<pre style="margin: 0px; padding: 0px; overflow-wrap: break-word; white-space: pre-wrap; font-family: "courier new", courier, monospace;">
camera.id=vid-01,cam-01
</pre>
结论
大规模的视频流分析需要有一个大数据技术作为支撑的健壮系统。像OpenCV、Kafka和Spark这样的开源技术能够用来构建可容错的分布式系统,并基于此来进行视频流分析。我们使用OpenCV和Kafka构建视频流收集组件,它会从不同的源接收视频流,并将其发送至视频流缓冲组件。Kafka作为视频数据的缓冲组件,它为流数据提供了可持久化的存储。视频流处理组件使用OpenCV以及Spark的Structured Streaming进行构建。这个组件会从流数据缓冲中获取流式数据,并对数据进行分析。处理后的文件可以放到预先配置好的HDFS或S3 bucket中。我们使用动作感应作为视频流分析的用例,并提供了一个示例应用。
参考资料
- Online Security Analytics on Large Scale Video Surveillance System (Spark Summit East 2016)
- Large Scale Image Processing in Real-Time Environments with Kafka (CS & IT)
- OpenCV 文档
- Kafka 文档
- Apache Spark 文档
- ZooKeeper 文档
[原文](Video Stream Analytics Using OpenCV, Kafka and Spark Technologies)
基于Spark的实时视频分析系统