1)前言
- 在WebRtc Video Receiver 创建分析(一)一文中分析了Video Stream Receiver流的创建以及各模块之间的关系
- 在WebRtc Video Receiver RTP包接收分析(二)一文中分析了Video Stream Receiver流回调机制的注册机制,以及对接收到的RTP流进行解码分析。
- 同时在上文中也分析了在解包RTP封装VCMPacket包的时候对RTP包的容错性检测,以及关键帧请求的场景。
- 本文着重分析在对VCMPacket进行组包前,NACK Module模块的运行原理以及在整个过程中对NACK Module丢包判断机制。
- 在webrtc视频接收流框架中每一路流都由独立的NACK 处理模块。
- 首先回顾RTP包接收流程图如下:
- 从上图中可以清晰的看出网络框架收到RTP包后,经过
Call
模块将RTP包分发到RtpVideoStreamReceiver
模块。
2)NackModule的工作原理以及和RtpVideoStreamReceiver之间的关系
2.1)M79版本
在模块
RtpVideoStreamReceiver
中定义了模块NackModule
成员变量,并在其构造函数中对成员变量nack_module进行实例化。再由上面的RTP包处理业务流程图,当模块
RtpVideoStreamReceiver
每次对rtp包进行处理的时候都会调用NackModule::OnReceivedPacket()
主动驱动NackModule
模块,在该函数中会根据传入的seq number 来判断包的连续性,如果包不连续会生成相应丢包信息,并将丢包信息插入到丢包列表当中,同时发送丢包请求。-
发送丢包请求分成两个分之,一个分之是在
NackModule::OnReceivedPacket()
函数中直接发送,另一个分支是由于NackModule
由Module
派生而来,实现其Process()方法。通过定时执行Process()方法遍历其内部数据结构,判断是否要发送响应的丢包请求,逻辑如下图:
NackModule
模块同时依赖RtpVideoStreamReceiver::RtcpFeedbackBuffer
模块,在其模块中有nack_sender_
成员变量和keyframe_request_sender_
成员变量,在构造NackModule
模块的时候会通过参数的形式传入并对nack_sender_
和keyframe_request_sender_
成员赋值。同时由RtpVideoStreamReceiver::RtcpFeedbackBuffer
模块的派生关系可知,最终传入的是RtpVideoStreamReceiver::RtcpFeedbackBuffer
模块指针。
- 由上图可知最终经过NACK模块的统计和处理,发送丢包请求和关键帧请求都是通过
RtpVideoStreamReceiver
的成员变量rtcp_feedback_buffer_来构建请求包最后发出。
2.2)M85版本的变化
主线处理逻辑上没有太大的变化,只是类名发生了变化,由原来的
NackModule
变成了NackModule2
-
NackModule2
不再由Module派生而是改用RepeatingTaskHandle
来定时重复发送丢包请求。
-
下面简要介绍
NackModule
所管理的数据结构。
nack_list_
集合主要用于记录已丢包的信息,以seq 为key,以NackInfo
为value进行管理。keyframe_list_
用于记录每次回调OnReceiverPacket
过来的如果是关键帧,则将其插入到该集合。recovered_list_
用于记录RTX或FEC恢复过来的包。M85版本和M79版本对上述数据结构的管理保持一致。
3)NackModule OnReceivedPacket函数工作流程
int NackModule2::OnReceivedPacket(uint16_t seq_num,
bool is_keyframe,/*是否为关键帧*/
bool is_recovered/*是否为恢复的包RTX or FEC*/) {
rtc::CritScope lock(&crit_);
// TODO(philipel): When the packet includes information whether it is
// retransmitted or not, use that value instead. For
// now set it to true, which will cause the reordering
// statistics to never be updated.
bool is_retransmitted = true;
//newest_seq_num_可以理解成截止当前收到的最新的一个seq number
if (!initialized_) {
newest_seq_num_ = seq_num;
if (is_keyframe)
keyframe_list_.insert(seq_num);
initialized_ = true;
return 0;
}
// Since the |newest_seq_num_| is a packet we have actually received we know
// that packet has never been Nacked.
//seq_num 表示当前刚收到包的序列号,newest_seq_num_表示截止当前收到的最新的一个seq number,怎么理解呢,在seq未环绕的情况下可以理解成最大的一个
if (seq_num == newest_seq_num_)
return 0;
//如果发生了丢包,这里收到重传包则会条件成立seq_num表示当前收到的重传包的序列号
if (AheadOf(newest_seq_num_, seq_num)) {
// An out of order packet has been received.
auto nack_list_it = nack_list_.find(seq_num);
int nacks_sent_for_packet = 0;
//如果nack_list_集合中有seq_num则进行清除,同时记录当前包历经了多少次重传再收到
if (nack_list_it != nack_list_.end()) {
nacks_sent_for_packet = nack_list_it->second.retries;
nack_list_.erase(nack_list_it);
}
if (!is_retransmitted)
UpdateReorderingStatistics(seq_num);
//返回当前包经历了多少次数,在组包模块中会使用到。
return nacks_sent_for_packet;
}
// Keep track of new keyframes.
// 如果当前包为关键帧则插入到keyframe_list_
if (is_keyframe)
keyframe_list_.insert(seq_num);
// lower_bound(val):返回容器中第一个【大于或等于】val值的元素的iterator位置。
// And remove old ones so we don't accumulate keyframes.
auto it = keyframe_list_.lower_bound(seq_num - kMaxPacketAge);
if (it != keyframe_list_.begin())
keyframe_list_.erase(keyframe_list_.begin(), it);
if (is_recovered) {
recovered_list_.insert(seq_num);
// Remove old ones so we don't accumulate recovered packets.
auto it = recovered_list_.lower_bound(seq_num - kMaxPacketAge);
if (it != recovered_list_.begin())
recovered_list_.erase(recovered_list_.begin(), it);
// Do not send nack for packets recovered by FEC or RTX.
return 0;
}
AddPacketsToNack(newest_seq_num_ + 1, seq_num);
newest_seq_num_ = seq_num;
// Are there any nacks that are waiting for this seq_num.
std::vector<uint16_t> nack_batch = GetNackBatch(kSeqNumOnly);
if (!nack_batch.empty()) {
// This batch of NACKs is triggered externally; the initiator can
// batch them with other feedback messages.
nack_sender_->SendNack(nack_batch, /*buffering_allowed=*/true);
}
return 0;
}
- 本文采用最新m85版本对该函数的工作流程进行分析。
- 如果首次接收包,判断是否为关键帧,如果是将其插入到
keyframe_list_
,然后直接返回。 - 如果上次和本次的包seq 一样,直接返回,对已经收到的包不做丢包处理。
- 使用
AheadOf(newest_seq_num_, seq_num)
函数,判断newest_seq_num_
是否在seq_num
之前。AheadOf
函数的核心原理是检测两个包之间的距离,该函数帮助我们做了seq 环绕问题的处理。在没有环绕问题的情况下,假设seq 从0~2^16-1,在这个范围内传输,若在传输过程中出现了丢包,看如下log
newest_seq_num_:36 seq_num:37 is_keyframe:0 is_recovered: 0 AheadOf(newest_seq_num_, seq_num) : 0
newest_seq_num_:37 seq_num:38 is_keyframe:0 is_recovered: 0 AheadOf(newest_seq_num_, seq_num) : 0
newest_seq_num_:38 seq_num:41 is_keyframe:0 is_recovered: 0 AheadOf(newest_seq_num_, seq_num) : 0
newest_seq_num_:41 seq_num:42 is_keyframe:0 is_recovered: 0 AheadOf(newest_seq_num_, seq_num) : 0
newest_seq_num_:42 seq_num:43 is_keyframe:0 is_recovered: 0 AheadOf(newest_seq_num_, seq_num) : 0
newest_seq_num_:43 seq_num:40 is_keyframe:0 is_recovered: 1 AheadOf(newest_seq_num_, seq_num) : 1
newest_seq_num_:43 seq_num:39 is_keyframe:0 is_recovered: 1 AheadOf(newest_seq_num_, seq_num) : 1
newest_seq_num_:43 seq_num:44 is_keyframe:0 is_recovered: 0 AheadOf(newest_seq_num_, seq_num) : 0
- 根据上述的调试信息不难看出,假设上一次已经收到了43号包,32号包到44号包之间,丢了39号和40号包,丢了后会发送nack重传或者依据fec进行恢复,如上述log信息,当上一次收到的包为43号包的时候,然后本次收到了40号(前面丢了的)包,此时
AheadOf(43, 40)
将返回true,事实上43号包也是在40号包之前接收到的,可以看出在未有环绕的情况下如果AheadOf(a, b)
函数当a > b的时候返回true。 - 当
AheadOf(newest_seq_num_, seq_num)
成立的条件下会根据当前的seq_num从nack_list_
寻找对应的seq,此处表示已经收到了重传包,所以要将其从nack_list_
容器中进行清除,最后返回该恢复包请求重传或恢复的次数。 - 如果是正常的包,假设当前传入的是key_frame包,会将新关键帧包插入到
keyframe_list_
,同时会删除keyframe_list_
中旧的包,判断旧包的原则如下:
newest_seq_num_:5 seq_num:6 is_keyframe:1 keyframe_list_.size():0 recovered_list_.size():0 nack_list_.size():0
newest_seq_num_:6 seq_num:7 is_keyframe:1 keyframe_list_.size():1 recovered_list_.size():0 nack_list_.size():0
newest_seq_num_:7 seq_num:8 is_keyframe:0 keyframe_list_.size():2 recovered_list_.size():0 nack_list_.size():0
newest_seq_num_:8 seq_num:9 is_keyframe:0 keyframe_list_.size():2 recovered_list_.size():0 nack_list_.size():0
const int kMaxPacketAge = 10000;
/*这里会返回第一个大于0的位置,也就是6号包的位置*/
auto it = keyframe_list_.lower_bound(seq_num - kMaxPacketAge);/*6-10000*/
if (it != keyframe_list_.begin())
keyframe_list_.erase(keyframe_list_.begin(), it);
-
keyframe_list_.lower_bound(seq_num - kMaxPacketAge)
这段代码表示返回第一个大于seq_num - kMaxPacketAge的位置,根据上述调试信息可知,当插入7号包的时候返回的是keyframe_list_.begin()
的位置,所以不会删除6号包。 - 假设当前
keyframe_list_
已经记录的6号和7号包,然后当前来了10007号包,同样10007号包为key_frame,此时上述函数返回的位置为7号包的位置,此时6号包会被删除。 - 对于H264数据而言,通俗的理解就是
keyframe_list_
容器记录了当前传入过来的P帧所对应的gop。 - 当前传入的包, is_recovered为true时,也就是该包时由RTX或FEC恢复过来的,此时会将该seq插入到
recovered_list_
,同时会删除过期的记录,删除原理和keyframe_list_
的删除一致。如上述调试,39号和40号包会被记录到recovered_list_
。 - 调用AddPacketsToNack函数对seq的连续性进行判断,判断是否丢包,然后记录丢包的序号,将其插入到
nack_list_
,该函数为判断丢包的核心。 - 更新
newest_seq_num_
为当前包序号(未丢包,也不是恢复的情况下) - 若
nack_batch
不为空则表示有丢包,则会直接发起丢包重传请求,由于重传请求也可能会发生丢包的情况,所以需要有定时重复任务的配合。
4)NackModule AddPacketsToNack函数丢包判断工作原理
void NackModule::AddPacketsToNack(uint16_t seq_num_start,//newest_seq_num_ + 1
uint16_t seq_num_end//seq_num) {
// Remove old packets.
auto it = nack_list_.lower_bound(seq_num_end - kMaxPacketAge);
nack_list_.erase(nack_list_.begin(), it);
// If the nack list is too large, remove packets from the nack list until
// the latest first packet of a keyframe. If the list is still too large,
// clear it and request a keyframe.
// 缓存太多丢失的包,进行清除处理
uint16_t num_new_nacks = ForwardDiff(seq_num_start, seq_num_end);
if (nack_list_.size() + num_new_nacks > kMaxNackPackets) {
while (RemovePacketsUntilKeyFrame() &&
nack_list_.size() + num_new_nacks > kMaxNackPackets) {
}
if (nack_list_.size() + num_new_nacks > kMaxNackPackets) {
nack_list_.clear();
RTC_LOG(LS_WARNING) << "NACK list full, clearing NACK"
" list and requesting keyframe.";
keyframe_request_sender_->RequestKeyFrame();
return;
}
}
/*丢包判断逻辑,如果包连续的话应该是不进for循环的*/
for (uint16_t seq_num = seq_num_start; seq_num != seq_num_end; ++seq_num) {
// Do not send nack for packets that are already recovered by FEC or RTX
if (recovered_list_.find(seq_num) != recovered_list_.end())
continue;
/*默认WaitNumberOfPackets(0.5)返回0*/
NackInfo nack_info(seq_num, seq_num + WaitNumberOfPackets(0.5),
clock_->TimeInMilliseconds());
RTC_DCHECK(nack_list_.find(seq_num) == nack_list_.end());
nack_list_[seq_num] = nack_info;
}
}
- 根据在OnReceivedPacket函数中的调用seq_num_start=newest_seq_num_ + 1,而seq_num_end=seq(当前传入的seq),以如下丢包的情况序列进行分析。
newest_seq_num_:38 seq_num:41 is_keyframe:0 is_recovered: 0
newest_seq_num_:41 seq_num:42 is_keyframe:0 is_recovered: 0
newest_seq_num_:42 seq_num:43 is_keyframe:0 is_recovered: 0
- 截止上一次收到的最新包的序列号为38,而当前收到的seq为41很明显,丢了39和40号包,当收到41号包的时候,此时seq_num_start=38+1=39,也就是期望值当前传入的应该是39号包,但是实际上当前收到的是41号包
- 调用ForwardDiff函数判断39号包和41号包之间的距离,这个函数会解决环绕的问题,如果在未环绕的情况下,num_new_nacks=41-39=2,也就是算出丢了多少个包。
-
nack_list_
最多可容纳1000个包,如果nack_list_
当前大小加上num_new_nacks
的大小大于或者等于1000个了,那么会调用RemovePacketsUntilKeyFrame()
函数来移除nack_list_
中的元素。 - 要重传的包数量
nack_list_.size()
在进行RemovePacketsUntilKeyFrame()
操作后若还超过规定大小,就开始清空要重传的数据包列表nack_list_.clear()
,然后请求关键帧。 - 使用for 循环进行丢包判断,若包连续for循环的逻辑是不成立的,通过判断
seq_num !=seq_num_end
来进行判断,如果seq_num !=seq_num_end
,表示seq_num是已经丢失的包,同时通过seq_num查找recovered_list_
,看recovered_list_
容器中是否已经收到了该丢失的包, - 最终如果容器中未找到seq_num包,则以seq_num、当前时间创建
NackInfo
,并将其记录到nack_list_容器当中。 - 在实际的传输过程中如果网络不好,丢包严重就会导致延迟和马赛克的现象,而合理请求I帧恰好能缓解该问题,
RemovePacketsUntilKeyFrame
函数的原理如下:
bool NackModule::RemovePacketsUntilKeyFrame() {
while (!keyframe_list_.empty()) {
/* 从keyframe_list_中得到第一个值(假设为a),然后以此值为value,找出nack_list_容器中第一个大于等于a的迭代器的位置
* 将nack_list_的启始位置到对应a值这个seq之间的全部删除,也就是a以前的seq全部移除。
*/
auto it = nack_list_.lower_bound(*keyframe_list_.begin());
if (it != nack_list_.begin()) {
// We have found a keyframe that actually is newer than at least one
// packet in the nack list.
nack_list_.erase(nack_list_.begin(), it);
return true;
}
//如果it == nack_list_.begin() 说明这个关键帧也很老了,将其移除掉。
// If this keyframe is so old it does not remove any packets from the list,
// remove it from the list of keyframes and try the next keyframe.
keyframe_list_.erase(keyframe_list_.begin());
}
return false;
}
- 这里还是引用上述假设,假设当前缓存的包好为37号包,而此时39和40号包丢了,同时由于丢包严重
nack_list_
集合中前面还缓存了诸多的丢包没有恢复,大小超标了。 - 以此为例分析
nack_list_.lower_bound(37)
会返回nack_list_
容器39号包的位置,并不会等于nack_list_.begin()
,因为该容器中39号包之前可能还有很多没有恢复过来的包,这中情况由于keyframe_list_
记录的gop和当前已丢失的包的seq比较临近,所以会删除39号以前丢失的包,然后正常发送丢包请求。 - 另外一种情况是假设发送端i帧间隔比较大的话,那么此时
keyframe_list_
当前记录的gop可能为5号包,那么假设nack_list_.begin()
刚好缓存了6号丢失的包的话,这个时候就会将keyframe_list_
中的对应交老的gop删除,此种情况可能会导致keyframe_list_
为空而nack_list_
依旧过大,从而会引发AddPacketsToNack
函数中直接清除所有的丢包列表发送关键帧请求的。 - 通过查找临近的关键帧的seq(大于等于丢包集合中的首个值),然后将该seq 之前的丢包seq 从
nack_list_
中移除。 - 在发送端你可以合理设置I帧的的发送间隔,而适当将kMaxNackPackets的大小进行缩小,比如从默认的1000个改成30个,这样的话一旦出现网络抖动的情况,如果丢包超过30个,就会进行I帧请求,来降低延迟。但是这种做法浏览器是无法实现的,因为默认可以缓存1000个丢失的包在该模块的处理中进行I帧请求的概率较低。因为默认假设3秒一个I帧,按照每秒60帧,假设一帧就一个包,那么丢了一个gop区间也就是180个包,它还是会走丢包重传的策略,这样就会导致延迟。
5)NackModule NACK发送流程
5.1)M79版本
NackModule NACK发送流程分两种情况,情况一在每次处理接收到的seq后如果判断有丢包,则会立马发送,(基于kSeqNumOnly)。
另外一种情况是基于NackModule的模块线程驱动,基于kTimeOnly。
这两种情况的驱动都复用一个函数
GetNackBatch()
将要发送的seq 封装成std::vector<uint16_t>容器。GetNackBatch()
函数的实现如下:
std::vector<uint16_t> NackModule::GetNackBatch(NackFilterOptions options) {
bool consider_seq_num = options != kTimeOnly;
bool consider_timestamp = options != kSeqNumOnly;
int64_t now_ms = clock_->TimeInMilliseconds();
std::vector<uint16_t> nack_batch;
auto it = nack_list_.begin();
while (it != nack_list_.end()) {
bool delay_timed_out =
now_ms - it->second.created_at_time >= send_nack_delay_ms_;
bool nack_on_rtt_passed = now_ms - it->second.sent_at_time >= rtt_ms_;
/*在创建NackInfo的时候send_at_seq_num和其对应丢包的seq值是相等的,在默认情况下
*/
bool nack_on_seq_num_passed =
it->second.sent_at_time == -1 &&
AheadOrAt(newest_seq_num_, it->second.send_at_seq_num);
if (delay_timed_out && ((consider_seq_num && nack_on_seq_num_passed) ||
(consider_timestamp && nack_on_rtt_passed))) {
nack_batch.emplace_back(it->second.seq_num);
++it->second.retries;
it->second.sent_at_time = now_ms;
if (it->second.retries >= kMaxNackRetries) {
RTC_LOG(LS_WARNING) << "Sequence number " << it->second.seq_num
<< " removed from NACK list due to max retries.";
it = nack_list_.erase(it);
} else {
++it;
}
continue;
}
++it;
}
return nack_batch;
}
send_nack_delay_ms_
的默认值为0,可以通过配置“WebRTC-SendNackDelayMs/10”作用到FiledTrial属性进行配置。rtt_ms_
默认值为kDefaultRttMs=100ms
,该值为动态值通过调用UpdateRtt
函数进行更新。条件1:delay_timed_out默认情况下都是成立的,因为
it->second.created_at_time
在创建NackInfo
的时候赋值。以上分两种情况,情况1是根据
kSeqNumOnly
这种情况下需要判断nack_on_seq_num_passed
是否成立,由于每次创建NackInfo
后并将其添加进nack_list_
的时候,都对newest_seq_num_ = seq_num
进行赋值操作,所以理论上,每次在判断nack_on_seq_num_passed
条件的时候newest_seq_num_
总是先于send_at_seq_num
的,比如说丢了39号包,但是此时收到的是41号包。将符合条件的
NackInfo
取出其seq加入到nack_batch容器进行返回。对于每一个丢失的seq,最多的请求次数为
kMaxNackRetries=10
次。-
情况2根据rtt来发送,rtt_ms_为动态更新,其更新逻辑如下:
VideoReceiveStream
模块在构造过程中向CallStats
注册了监听器,只想this指针。而
VideoReceiveStream
模块为CallStatsObserver
的派生类,所以重写了OnRttUpdate()方法。由于
CallStats
由Module派生而来所以它的Process()会定时执行,执行过程中遍历监听者列表,最终如上函数回调流程向NackModule
模块更新rtt。
5.2)M85版本
NackModule2::NackModule2(TaskQueueBase* current_queue,
Clock* clock,
NackSender* nack_sender,
KeyFrameRequestSender* keyframe_request_sender,
TimeDelta update_interval /*= kUpdateInterval*/)
: worker_thread_(current_queue),
update_interval_(update_interval),
clock_(clock),
nack_sender_(nack_sender),
keyframe_request_sender_(keyframe_request_sender),
reordering_histogram_(kNumReorderingBuckets, kMaxReorderedPackets),
initialized_(false),
rtt_ms_(kDefaultRttMs),
newest_seq_num_(0),
send_nack_delay_ms_(GetSendNackDelay()),
backoff_settings_(BackoffSettings::ParseFromFieldTrials()) {
repeating_task_ = RepeatingTaskHandle::DelayedStart(
TaskQueueBase::Current(), update_interval_,
[this]() {
RTC_DCHECK_RUN_ON(worker_thread_);
std::vector<uint16_t> nack_batch = GetNackBatch(kTimeOnly);
if (!nack_batch.empty()) {
// This batch of NACKs is triggered externally; there is no external
// initiator who can batch them with other feedback messages.
nack_sender_->SendNack(nack_batch, /*buffering_allowed=*/false);
}
return update_interval_;
},
clock_);
}
-
repeating_task_
重复任务队列以20ms为周期进行重复调度。 - 其原理和m79版本一致,都是用过
GetNackBatch
函数对nack_list_
容器进行遍历,找出需要重传包的seq,然后封装成集合,最后调用SendNack
进行重传请求。
6) 总结
- 本文主要阐述了
NackModule
模块的工作原理,主要描述其判断丢包的核心逻辑,以及发送nack的处理逻辑。 -
NackModule
模块的核心就是通过维护三个容器来实现对丢包信息的管理。 - 最终在
NackModule::OnReceivedPacket
函数中如果收到的恢复包它的返回值为非0值,返回的是丢包请求尝试的次数,该值对于RtpVideoStreamReceiver
模块后续的处理起到了什么作用?在后续进行分析。 -
Process
以最大10次*每次的时间间隔,假设超过10次放弃该包。 - 对于m85版本不再使用Module模块来驱动,而是使用重复任务队列以20ms的周期进行调度。
- 在重传请求过程中如果超过10次请求还没有请求到重传包,则会放弃重传。
- 在实际的应用的过程中,需要合理的请求I帧,但
NackModule
模块的工作主要是负责丢包的重传,对于I帧的请求只是在丢包及其严重的时候才会发起。