流是什么?
一种处理实时数据的方法。还有一种使用场景是当前要处理的数据很大,无法一次放入内存,使用流能够使用很小的内存完成处理。python中主要是靠
生成器来解决。即在调用时才处理。而不是预先要加载全部数据。
def gen():
yield 1
yield 2
yield 3
只有使用next调用才执行。
在flask中的response是支持流的。
实现视频流需要的格式是
multipart/x-mixed-replace
这是每一帧需要包含的信息。
b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n'
程序的基本结构如下。
index绑定到 / 路由上。返回一个界面。
gen方法使用生成器产生实时数据
video_feed 进行响应
#!/usr/bin/env python
from flask import Flask, render_template, Response
from camera import Camera
app = Flask(__name__)
@app.route('/')
def index():
return render_template('index.html')
def gen(camera):
while True:
frame = camera.get_frame()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
@app.route('/video_feed')
def video_feed():
return Response(gen(Camera()),
mimetype='multipart/x-mixed-replace; boundary=frame')
if __name__ == '__main__':
app.run(host='0.0.0.0', debug=True,threaded=True)
这是index.html模板,可以看到src指向上面的viedofeed函数,返回流响应。
<html>
<head>
<title>Video Streaming Demonstration</title>
</head>
<body>
<h1>flask Video Streaming</h1>
![]({{ url_for('video_feed') }})
</body></html>
下一步就是获取帧了。首先使用三个图片来替代。
from time import time
class Camera(object):
def __init__(self):
self.frames = [open(f + '.jpg', 'rb').read() for f in ['1', '2', '3']]
def get_frame(self):
return self.frames[int(time()) % 3]
这样实现的效果是三幅图片持续循环。
下一步是获取真正的直播流。
使用opencv的python模块就可以。
def frames():
camera = cv2.VideoCapture(0)
if not camera.isOpened():
raise RuntimeError('Could not start camera.')
while True:
# read current frame
_, img = camera.read()
# encode as a jpeg image and return it
yield cv2.imencode('.jpg', img)[1].tobytes()
一直捕获数据,然后通过生成器产生。将这个函数接入到上面的gen函数是可以直接执行的。
性能提升
这种不做任何处理的方式会消耗大量的cpu资源,其中一个原因是因为后台捕获的线程和向
客户端发送的线程不同步。这就导致后台一直在大量获取数据,但并不是所有的数据都被传送了出去。
为了提高性能将两者进行同步。只把后台获取的原始帧进行发送。
使用线程的threading.event能够实现。
class CameraEvent(object):
# ...
class BaseCamera(object):
# ...
event = CameraEvent()
# ...
def get_frame(self):
"""Return the current camera frame."""
BaseCamera.last_access = time.time()
# 阻塞 等待相机线程来唤醒
BaseCamera.event.wait()
BaseCamera.event.clear()# 将标志位设置为阻塞
return BaseCamera.frame
@classmethod
def _thread(cls):
# ...
for frame in frames_iterator:
BaseCamera.frame = frame
BaseCamera.event.set() # 唤醒上面阻塞的地方
#判断最后查看时间,来决定是否关闭。
if time.time() - BaseCamera.last_access > 10:
frames_iterator.close()
上面的wait方法会阻塞直到标志位变为true,clear方法将标志位重置为false
set方法重置为true 通过这个机制实现同步。首先是等待唤醒,然后再将标志位
变为flase,然后继续进行。
在电脑上开启服务,然后在手机上查看如下: