//使用队列方式调用,如果是使用线程,就是调用 decode_thread_ 的线程方法了,最终都会调用到 HandleEncodedFrame
见Start()方法的
if (!use_task_queue_) {
decode_thread_.Start();
} else {
decode_queue_.PostTask([this] {
RTC_DCHECK_RUN_ON(&decode_queue_);
decoder_stopped_ = false;
StartNextDecode();
});
}
frame_buffer2.cc 里面 frames_ 编码帧数据取出的过程
void VideoReceiveStream::StartNextDecode() // webrtc\src\video\video_receive_stream.cc
{
/**
void FrameBuffer::NextFrame(
int64_t max_wait_time_ms,
bool keyframe_required,
rtc::TaskQueue* callback_queue,
std::function<void(std::unique_ptr<EncodedFrame>, ReturnReason)> handler)
**/
frame_buffer_->NextFrame(...) // webrtc\src\modules\video_coding\frame_buffer2.cc(line:70)
{
.... //输入参数的赋值
StartWaitForNextFrameOnQueue();
{
--
int64_t wait_ms = FindNextFrame(...)
{
---
for (auto frame_it = frames_.begin()) // 遍历 frames_
{
EncodedFrame* frame = frame_it->second.frame.get(); //得到某一帧视频编码视频数据
....
frames_to_decode_ = std::move(current_superframe); //将可解码的帧存入 frames_to_decode_ 队列
}
}
--
callback_task_ = RepeatingTaskHandle::DelayedStart(..) //延迟回调传传入的回调方法,
{
-- GetNextFrame() //获得查找到的帧
{
for (FrameMap::iterator& frame_it : frames_to_decode_) {
frames_to_decode_ // 从find的数据 frames_to_decode_ 里面通过条件去除可用帧返回
}
}
--
// 回调到外面的方法,并且传出frame帧
// void VideoReceiveStream::HandleEncodedFrame() \\webrtc\src\video\video_receive_stream.cc
// decode_queue_.PostTask(DecodeTask{this, std::move(frame)});
stream->HandleEncodedFrame(std::move(frame))
{
{
....
int decode_result = video_receiver_.Decode(frame.get()); // 函数为:int32_t VideoReceiver::Decode(const webrtc::VCMEncodedFrame* frame)
{
int32_t VideoReceiver::Decode(const VCMEncodedFrame& frame) //webrtc\src\modules\video_coding\\video_receiver.cc
{
VCMGenericDecoder* decoder =_codecDataBase.GetDecoder(frame, &_decodedFrameCallback); //获取VCMGenericDecoder对象
VCMGenericDecoder::Decode() //webbrtc\src\modules\video_coding\generic_decoder.cc
{
...
decoder_->Decode()
{
ZKVideoDecoder::Decode()
}
...
}
}
}
....
}
// 递归调用 StartNextDecode
}
}
}
}
}
frame_buffer2.cc 里面 frames_ 编码帧数据存入的过程
void RtpTransport::OnReadPacket(rtc::PacketTransportInternal* transport,
const char* data,
size_t len,
const int64_t& packet_time_us,
int flags) // webrtc\src\pc\rtp_transport.cc RtpTransport(line:241)
{
if (!cricket::IsValidRtpPacketSize(packet_type, len)) //检查rtp 包的可用性
if (packet_type == cricket::RtpPacketType::kRtcp)
OnRtcpPacketReceived(std::move(packet), packet_time_us); //进入 rtcp 包的处理
else
OnRtpPacketReceived(std::move(packet), packet_time_us); // webrtc\src\pc\srtp_transport.cc(line:202)
{
....
DemuxPacket(std::move(packet), packet_time_us); // webrtc\src\pc\rtp_transport.cc RtpTransport(line:183)
{
if (!parsed_packet.Parse(std::move(packet))){} // 解析rtp 包
parsed_packet.set_arrival_time_ms((packet_time_us + 500) / 1000); //设置包到达时间
if (!rtp_demuxer_.OnRtpPacket(parsed_packet)) {..} // webrtc\src\call\rtp_demuxer.cc RtpDemuxer(line:174)
{
RtpPacketSinkInterface* sink = ResolveSink(packet);
{
}
sink->OnRtpPacket(packet); \\ webrtc\src\video\rtp_video_stream_receiver.cc RtpVideoStreamReceiver(414)
{
last time 记录
ReceivePacket(packet) \\ webrtc\src\video\rtp_video_stream_receiver.cc RtpVideoStreamReceiver(606)
{
if (!depacketizer->Parse(&parsed_payload, packet.payload().data()) //rtp 负载信息解析,比如负载头,负载的分离,nalu的解析,负载长度
RTPHeader rtp_header; //rtp 头信息分析
......
OnReceivedPayloadData(parsed_payload.payload, parsed_payload.payload_length,
rtp_header, video_header, generic_descriptor_wire,
packet.recovered()) \\ webrtc\src\video\rtp_video_stream_receiver.cc RtpVideoStreamReceiver(line:319)
{
VCMPacket packet(payload_data, payload_size, rtp_header, video_header,
ntp_estimator_.Estimate(rtp_header.timestamp),
clock_->TimeInMilliseconds()); // playload_data 字节数据构造成 VCMPacket
if (nack_module_) {
... // nack 模块处理
}
if (packet.codec() == kVideoCodecH264) {
插入sps,pps
}
rtcp_feedback_buffer_.SendBufferedRtcpFeedback(); //按需,发送rtcp 消息
if (!packet_buffer_->InsertPacket(&packet)) {..} \\ webrtc\src\modules\video_coding\packet_buffer.cc PacketBuffer(line:70)
{
-- OnTimestampReceived(packet->timestamp); //时间搓记录
-- 首包记录和延迟包处理
-- if (data_buffer_[index].seqNum == packet->seqNum) {} // 重复包处理
-- while (ExpandBufferSize() && sequence_buffer_[seq_num % size_].used) {} //sequence_buffer_ 容量扩充
-- 如果扩充后,容量仍然不够,就要容器数据清空
-- sequence_buffer_[index] 相关数据赋值
-- UpdateMissingPackets() 根据seqnum,更新记录丢包
-- found_frames = FindFrames(seq_num); //重要,组合帧信息
{
std::vector<std::unique_ptr<RtpFrameObject>> found_frames; // 创建 RtpFrameObject 独占智能指针对象的 数组
// 遍历sequence_buffer_ 里面所有的帧信息
for (size_t i = 0; i < size_ && PotentialNewFrame(seq_num); ++i)
{
PotentialNewFrame(seq_num) // 初步排查,是否潜在可能,这个seq_num 后能找到一帧
sequence_buffer_[index].continuous = true; // 连续参数复制为0
if (sequence_buffer_[index].frame_end)
{
int64_t frame_timestamp = data_buffer_[start_index].timestamp; // 记录下,当前要找的帧的时间搓
// 找到一个帧的最后一包,然后往前遍历,去寻找找到帧的开始,break的时候,就是找到了开始,直接return ,就是异常,找不到可用的帧了
while (true)
{
frame_size += data_buffer_[start_index].sizeBytes; //累加帧字节大小
max_nack_count = std::max(max_nack_count, data_buffer_[start_index].timesNacked); //取最大重传次数
sequence_buffer_[start_index].frame_created = true; // 修改包连续信息状态 为frame_created
...
packet_infos.push_back(data_buffer_[start_index].packet_info); // VCPPacket 的 RtpPacketInfo属性存入队列
if (!is_h264 && sequence_buffer_[start_index].frame_begin){break;} //找到帧开始
start_index = start_index > 0 ? start_index - 1 : size_ - 1; //向前滚动
// In the case of H264 we don't have a frame_begin bit (yes,
// |frame_begin| might be set to true but that is a lie). So instead
// we traverese backwards as long as we have a previous packet and
// the timestamp of that packet is the same as this one. This may cause
// the PacketBuffer to hand out incomplete frames.
// See: https://bugs.chromium.org/p/webrtc/issues/detail?id=7106
if (is_h264 &&(!sequence_buffer_[start_index].used ||data_buffer_[start_index].timestamp != frame_timestamp)) {
// 一直往前找,直到timestamp 与 当前帧时间搓不一样的时候,break掉,
// 代码里面也有注释,说这里有风险,因为帧开始 frame_begin 标识可能没有,也有可能标识设置为true,单其实不是真的,可能导致拼接一个错误的帧
break;
}
}
std::reverse(packet_infos.begin(), packet_infos.end()); // 包信息逆转一下
}
}
}
// 循环处理组好的帧
for (std::unique_ptr<RtpFrameObject>& frame : found_frames)
assembled_frame_callback_->OnAssembledFrame(std::move(frame)); \\ webrtc\src\video\rtp_video_stream_receiver.cc RtpVideoStreamReceiver(line:502)
{
...
RequestKeyFrame()
reference_finder_->ManageFrame(std::move(frame)); webrtc\src\modules\video_coding\rtp_frame_reference_finder.cc RtpFrameReferenceFinder (line:35)
{
-- FrameDecision decision = ManageFrameInternal(frame.get()); // 组帧信息
--
frame_callback_->OnCompleteFrame(std::move(frame)); \\ webrtc\src\video\rtp_video_stream_receive.cc RtpVideoStreamReceiver (line:538)
{
-- video_coding::RtpFrameObject* rtp_frame = static_cast<video_coding::RtpFrameObject*>(frame.get()); //获得RtpFrameObject 对象
--
complete_frame_callback_->OnCompleteFrame(std::move(frame)); \\ webrtc\src\video\video_receive_stream.cc-- VideoReceiveStream(line:531)
{
... //一些时间的处理,比如延迟什么的
int64_t last_continuous_pid = frame_buffer_->InsertFrame(std::move(frame)); \\ webrtc\src\modules\video_coding\frame_buffer2.cc(line:450)
{
}
}
}
}
}
}
}
}
rtp_receive_statistics_->OnRtpPacket(packet);
...
}
}
}
}
}
重要结构体解析
webrtc\include\modules\video_coding\packet.h
class VCMPacket
{
uint8_t payloadType; //playload 类型 比如101
uint32_t timestamp; // 所在帧的rtp 时间
int64_t ntp_time_ms_; // 所在帧的npt 时间 可能是 -1
uint16_t seqNum; // transport seq_num 比如 : 1,2,3
const uint8_t* dataPtr; //字节数据
size_t sizeBytes; // 字节数据长度
bool markerBit; // mark 标识,可以标识是否帧结束
int timesNacked; // 这个rtp包重传次数
/**
* @brief
* nalu 类型
* if (is_first_packet_in_frame() && markerBit) 比如这个条件,如果是一帧的开始,也是结束,那么这个rtp包,包含了一个完整的nalu,比如一个sps包
* else if (is_first_packet_in_frame()) 如果只是第一个包,那么就是一个nalu的开始
* else if (markerBit) 如果是一帧的结束,那么就是一个nalu的结束
* else 那么就是不nalu 里面的一部分
*
* 这一段的解释,在构造函数里面可以分析到
*/
VCMNaluCompleteness completeNALU;
bool insertStartCode; // h264 的帧开始
RTPVideoHeader video_header; //rtp 头
absl::optional<RtpGenericFrameDescriptor> generic_descriptor;
RtpPacketInfo packet_info; // TODO:还不清楚
}
webrtc\src\modules\video_coding\packet_buffer.h
class PacketBuffer {
public:
/**
* 参数:
* clock: 时钟
* start_buffer_size:
* max_buffer_size:
* OnAssembledFrameCallback: 用于去组装帧的回调方法
*/
static rtc::scoped_refptr<PacketBuffer> Create(
Clock* clock,
size_t start_buffer_size,
size_t max_buffer_size,
OnAssembledFrameCallback* frame_callback);
virtual ~PacketBuffer();
// Returns true unless the packet buffer is cleared, which means that a key
// frame request should be sent. The PacketBuffer will always take ownership
// of the |packet.dataPtr| when this function is called. Made virtual for
// testing.
virtual bool InsertPacket(VCMPacket* packet);
void ClearTo(uint16_t seq_num);
void Clear();
void PaddingReceived(uint16_t seq_num);
// Timestamp (not RTP timestamp) of the last received packet/keyframe packet.
absl::optional<int64_t> LastReceivedPacketMs() const;
absl::optional<int64_t> LastReceivedKeyframePacketMs() const;
// Returns number of different frames seen in the packet buffer
int GetUniqueFramesSeen() const;
int AddRef() const;
int Release() const;
protected:
// Both |start_buffer_size| and |max_buffer_size| must be a power of 2.
PacketBuffer(Clock* clock,
size_t start_buffer_size,
size_t max_buffer_size,
OnAssembledFrameCallback* frame_callback);
private:
friend RtpFrameObject;
// 包 连续信息
struct ContinuityInfo {
// The sequence number of the packet.
uint16_t seq_num = 0; // 包seq_num
bool frame_begin = false; // 是否一帧的首包
bool frame_end = false; // 是否一帧的尾包
bool used = false; // 包是否正占用着一个buffer 位置
bool continuous = false; // 是否连续,判断条件是这seq_num 前的所有包都 used 为true,也就是背插入了buffer
bool frame_created = false; // 这个包背拿去组帧用了
};
Clock* const clock_;
// Tries to expand the buffer.
bool ExpandBufferSize() RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_);
// Test if all previous packets has arrived for the given sequence number.
bool PotentialNewFrame(uint16_t seq_num) const
RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_);
// Test if all packets of a frame has arrived, and if so, creates a frame.
// Returns a vector of received frames.
std::vector<std::unique_ptr<RtpFrameObject>> FindFrames(uint16_t seq_num)
RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_);
// Copy the bitstream for |frame| to |destination|.
// Virtual for testing.
virtual bool GetBitstream(const RtpFrameObject& frame, uint8_t* destination);
// Get the packet with sequence number |seq_num|.
// Virtual for testing.
virtual VCMPacket* GetPacket(uint16_t seq_num)
RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_);
// Mark all slots used by |frame| as not used.
// Virtual for testing.
virtual void ReturnFrame(RtpFrameObject* frame);
void UpdateMissingPackets(uint16_t seq_num)
RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_);
// Counts unique received timestamps and updates |unique_frames_seen_|.
void OnTimestampReceived(uint32_t rtp_timestamp)
RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_);
rtc::CriticalSection crit_;
// Buffer size_ and max_size_ must always be a power of two.
size_t size_ RTC_GUARDED_BY(crit_);
const size_t max_size_;
// The fist sequence number currently in the buffer.
uint16_t first_seq_num_ RTC_GUARDED_BY(crit_);
// If the packet buffer has received its first packet.
bool first_packet_received_ RTC_GUARDED_BY(crit_);
// If the buffer is cleared to |first_seq_num_|.
bool is_cleared_to_first_seq_num_ RTC_GUARDED_BY(crit_);
// Buffer that holds the inserted packets.
std::vector<VCMPacket> data_buffer_ RTC_GUARDED_BY(crit_);
// Buffer that holds the information about which slot that is currently in use
// and information needed to determine the continuity between packets.
std::vector<ContinuityInfo> sequence_buffer_ RTC_GUARDED_BY(crit_);
// Called when all packets in a frame are received, allowing the frame
// to be assembled.
OnAssembledFrameCallback* const assembled_frame_callback_;
// Timestamp (not RTP timestamp) of the last received packet/keyframe packet.
absl::optional<int64_t> last_received_packet_ms_ RTC_GUARDED_BY(crit_);
absl::optional<int64_t> last_received_keyframe_packet_ms_
RTC_GUARDED_BY(crit_);
int unique_frames_seen_ RTC_GUARDED_BY(crit_);
absl::optional<uint16_t> newest_inserted_seq_num_ RTC_GUARDED_BY(crit_);
std::set<uint16_t, DescendingSeqNumComp<uint16_t>> missing_packets_
RTC_GUARDED_BY(crit_);
// Indicates if we should require SPS, PPS, and IDR for a particular
// RTP timestamp to treat the corresponding frame as a keyframe.
const bool sps_pps_idr_is_h264_keyframe_;
// Stores several last seen unique timestamps for quick search.
std::set<uint32_t> rtp_timestamps_history_set_ RTC_GUARDED_BY(crit_);
// Stores the same unique timestamps in the order of insertion.
std::queue<uint32_t> rtp_timestamps_history_queue_ RTC_GUARDED_BY(crit_);
mutable volatile int ref_count_ = 0;
};
RtpFrameObject
webrtc\include\modules\video_coding\frame_object.h
class RtpFrameObject : public EncodedFrame
{
private:
rtc::scoped_refptr<PacketBuffer> packet_buffer_; // PacketBuffer 指针的计数引用
VideoFrameType frame_type_; // 帧类型,I,P
VideoCodecType codec_type_; // 编码类型,H264
uint16_t first_seq_num_; // 帧的低一个rtp的seq_num
uint16_t last_seq_num_; // 帧的最后一个rtp的seq_num
int64_t last_packet_received_time_; //帧最后一个包接收到的时间
int times_nacked_; // 帧里面 传次数最高的一个包的重传次数
}
void PacketBuffer::UpdateMissingPackets(uint16_t seq_num) {
if (!newest_inserted_seq_num_)
newest_inserted_seq_num_ = seq_num;
const int kMaxPaddingAge = 1000;
if (AheadOf(seq_num, *newest_inserted_seq_num_)) {
uint16_t old_seq_num = seq_num - kMaxPaddingAge;
auto erase_to = missing_packets_.lower_bound(old_seq_num);
missing_packets_.erase(missing_packets_.begin(), erase_to);
// Guard against inserting a large amount of missing packets if there is a
// jump in the sequence number.
if (AheadOf(old_seq_num, *newest_inserted_seq_num_)) {
*newest_inserted_seq_num_ = old_seq_num;
}
++*newest_inserted_seq_num_;
while (AheadOf(seq_num, *newest_inserted_seq_num_)) {
missing_packets_.insert(*newest_inserted_seq_num_);
++*newest_inserted_seq_num_;
}
} else {
missing_packets_.erase(seq_num);
}
}
1.AheadOf(A, B)可以简单理解为A>B。实际环形数据的比较与环形存储空间size有关,相当与(A > B) xor (abs(A - B) > size / 2)。
2.首先判断当前包序号seq_num是否在之前插入的包序号newest_inserted_seq_num之后,如果不是,证明是之前的丢包,走else逻辑。如果是,则开始判断是否有丢包。
3.删除与当前包序号相差超过1000的丢包序号,因为太老了。如果seq_num - 1000 > newest_inserted_seq_num,证明中间丢包太多,只从seq_num - 1000开始插入丢包序号。
4.newest_inserted_seq_num到seq_num之间的全部包被认定为丢包。
举例:收包1000, 1001, 1002, 1005,1003……收到1005时认定1003/1004丢包,收到1003时删除丢包列表中的1003,只剩1004。