1 rtp/rtcp通道创建流程
RtpTransportInternal为webrtc 网络rtp以及rtcp传输层的接口,rtp和rtcp数据经该api层讲数据发送到PacketTransportInternal它们之间的关系如下图:
- RtpTransportInternal主要供用户拿到rtp数据流后调用,而PacketTransportInternal为底层网络通信层接口
- RtpTransportInternal的创建时机是在创建PeerConnection的时候通过绑定音频轨和视频轨的时候创建
- PacketTransportInternal的创建时机是在当通过调用PeerConnection的SetLocalDescription或者SetRemoteDescription的时候创建,也就是握手完成后创建
- RtpTransportInternal对象中持有PacketTransportInternal对象一个用于发送rtp包一个用于发送rtcp包
1.1 Transport的创建
- 通过SetLocalDescription或者SetRemoteDescription经过一系列的函数回调最终会创建RtpTransportInternal和PacketTransportInternal
- 首先在MaybeCreateJsepTransport函数中通过CreateIceTransport()创建P2PTransportChannel
- 其次以P2PTransportChannel为参数创建rtp_dtls_transport,同理如果rtp和rtcp不同端口的话还需要创建rtcp_dtls_transport
- 再次以rtp_dtls_transport和rtcp_dtls_transport为参数调用CreateDtlsSrtpTransport创建dtls_srtp_transport,创建完后调用SetDtlsTransports并传入rtcp_dtls_transport和dtls_srtp_transport将其保存到RtpTransportInternal类中的rtcp_packet_transport_和rtp_packet_transport_
- 同时在SetDtlsTransports函数中会调用RtpTransport::SetRtpPacketTransport将RtpTransport类和PacketTransportInternal类中提供的信号进行绑定,这样当PacketTransportInternal从网络连接中获取到数据后经过相应的信号处理即可触发RtpTransport类中的相关函数
- 最后以上面创建好的各transport为参数构造cricket::JsepTransport并调用SetTransportForMid将以mid为key,以JsepTransport对象为value将其插入到mid_to_transport_容器当中,在后续的通信当中根据mid返回使用
- 经过上面的流程就已经将RtpTransportInternal和PacketTransportInternal关联起来
1.2 RtpTransportInternal的初始化
// TODO(steveanton): Perhaps this should be managed by the RtpTransceiver.
#pc/peer_connection.cc
cricket::VideoChannel* PeerConnection::CreateVideoChannel(
const std::string& mid) {
RtpTransportInternal* rtp_transport = GetRtpTransport(mid);
MediaTransportConfig media_transport_config =
transport_controller_->GetMediaTransportConfig(mid);
cricket::VideoChannel* video_channel = channel_manager()->CreateVideoChannel(
call_ptr_, configuration_.media_config, rtp_transport,
media_transport_config, signaling_thread(), mid, SrtpRequired(),
GetCryptoOptions(), &ssrc_generator_, video_options_,
video_bitrate_allocator_factory_.get());
if (!video_channel) {
return nullptr;
}
video_channel->SignalDtlsSrtpSetupFailure.connect(
this, &PeerConnection::OnDtlsSrtpSetupFailure);
video_channel->SignalSentPacket.connect(this,
&PeerConnection::OnSentPacket_w);
video_channel->SetRtpTransport(rtp_transport);
return video_channel;
}
-
其流程如下:
- 根据mid从拿RtpTransportInternal实例
- 通过ChannelManager创建VideoChannel病调用inin_w函数并传入RtpTransportInternal对象
- 将BaseChannel中定义的SignalSentPacket和PeerConnection::OnSentPacket_w函数进行绑定,当数据发送完后会被调用
- 通过SetRtpTransport将RtpTransportInternal对象保存到BaseChannel类中供后续rtp/rtcp发送接收以及处理使用
- 在SetRtpTransport函数中调用ConnectToRtpTransport函数,该函数的核心作用是使用信号机制将BaseChannel和RtpTransportInternal对象定义的信号进行绑定,这样RtpTransportInternal接收到数据流或可以发送流的时候会通过信号机制触发BaseChannel类中的相应回调
- ConnectToRtpTransport函数的实现如下:
#pc/channel.cc
bool BaseChannel::ConnectToRtpTransport() {
RTC_DCHECK(rtp_transport_);
if (!RegisterRtpDemuxerSink()) {
return false;
}
rtp_transport_->SignalReadyToSend.connect(
this, &BaseChannel::OnTransportReadyToSend);
rtp_transport_->SignalRtcpPacketReceived.connect(
this, &BaseChannel::OnRtcpPacketReceived);
// TODO(bugs.webrtc.org/9719): Media transport should also be used to provide
// 'writable' state here.
rtp_transport_->SignalWritableState.connect(this,
&BaseChannel::OnWritableState);
rtp_transport_->SignalSentPacket.connect(this,
&BaseChannel::SignalSentPacket_n);
return true;
}
#pc/rtp_transport_internal.h
class RtpTransportInternal : public sigslot::has_slots<> {
public:
virtual ~RtpTransportInternal() = default;
// Called whenever a transport's ready-to-send state changes. The argument
// is true if all used transports are ready to send. This is more specific
// than just "writable"; it means the last send didn't return ENOTCONN.
sigslot::signal1<bool> SignalReadyToSend;
// Called whenever an RTCP packet is received. There is no equivalent signal
// for RTP packets because they would be forwarded to the BaseChannel through
// the RtpDemuxer callback.
sigslot::signal2<rtc::CopyOnWriteBuffer*, int64_t> SignalRtcpPacketReceived;
// Called whenever a transport's writable state might change. The argument is
// true if the transport is writable, otherwise it is false.
sigslot::signal1<bool> SignalWritableState;
sigslot::signal1<const rtc::SentPacket&> SignalSentPacket;
virtual bool RegisterRtpDemuxerSink(const RtpDemuxerCriteria& criteria,
RtpPacketSinkInterface* sink) = 0;
virtual bool UnregisterRtpDemuxerSink(RtpPacketSinkInterface* sink) = 0;
}
- ConnectToRtpTransport的核心实现就是将BaseChannel类和RtpTransportInternal类中定义的信号进行绑定
- SignalRtcpPacketReceived信号当收到rtcp包时会触发进而调用BaseChannel::OnRtcpPacketReceived
- SignalSentPacket信号当连接可数据发送后被触发,由ice connection层触发
- 调用RegisterRtpDemuxerSink注册rtp分发器
// This class represents a receiver of already parsed RTP packets.
#call/rtp_packet_sink_interface.h
class RtpPacketSinkInterface {
public:
virtual ~RtpPacketSinkInterface() = default;
virtual void OnRtpPacket(const RtpPacketReceived& packet) = 0;
};
#pc/channel.cc
class BaseChannel : public sigslot::has_slots<>,
public webrtc::RtpPacketSinkInterface{
// RtpPacketSinkInterface overrides.
void OnRtpPacket(const webrtc::RtpPacketReceived& packet) override;
};
#pc/channel.cc
bool BaseChannel::RegisterRtpDemuxerSink() {
RTC_DCHECK(rtp_transport_);
return network_thread_->Invoke<bool>(RTC_FROM_HERE, [this] {
return rtp_transport_->RegisterRtpDemuxerSink(demuxer_criteria_, this);
});
}
#pc/rtp_transport.h
bool RtpTransport::RegisterRtpDemuxerSink(const RtpDemuxerCriteria& criteria,
RtpPacketSinkInterface* sink) {
rtp_demuxer_.RemoveSink(sink);
if (!rtp_demuxer_.AddSink(criteria, sink)) {
RTC_LOG(LS_ERROR) << "Failed to register the sink for RTP demuxer.";
return false;
}
return true;
}
- 在RtpTransport类中RegisterRtpDemuxerSink将BaseChannel类注册成rtp数据的消费者
- 在RtpTransport类的OnReadPacket函数实现中当接收到rtp或rtcp数据流后,首先判断类型如果是rtp数据则将数据通过rtp_demuxer_得到注册的消费者(RtpPacketSinkInterface),也就是BaseChannel,然后通过回调其OnRtpPacket函数将rtp包交给BaseChannel处理
- 如果是rtcp包则通过信号机制触发SignalRtcpPacketReceived信号进而调用BaseChannel::OnRtcpPacketReceived函数对rtcp包进行处理
- 当创建VideoChannel或者AudioChannel的时候通过一系列的调用最终会创建RtpTransportInternal对象并且会将该对象保存到BaseChannel对象当中供后续使用
2 rtp/rtcp数据发送
2.1 rtp包的数据发送流程
-
rtp数据发送流程经过PacedSender队列管理,然后再由paced_sender实现平滑发送,其入队前大致流程如下
- 经过PacedSender所管理的队列处理后其最终将rtp包发送到网络层,其流程如下
-
本文涉及到PacedSender相关的东西不做详细分析,在PacedSender原理分析一文中有详细分析
- 在PacketRouter::SendPacket函数中会为rtp包加入TransportSequenceNumber,webrtc子m55版本之后开始使用发送端bwe算法来实现动态码率自适应,在rtp包发送头部必须扩展transport number用于支持tcc算法
- rtp包经过如上图数据连接将数据发送给BaseChannel,最后在BaseChannel层将数据交给RtpTransport最后发送到网络
- 在RTPSender::TrySendPacket通过RTPSender::SendPacketToNetwork将数据发给channel层后,自身会将当前发送的rtp包保存到packet_history_,RtpPacketHistory用于缓存发送包,若有丢包重传则从该缓存中拿数据
2.2 rtcp包的数据发送流程
- 根据ModuleRtpRtcpImpl的派生关系,它继承Module,重载TimeUntilNextProcess和Process函数
- 每隔TimeUntilNextProcess时间间隔会回调一次Process
#modules/rtp_rtcp/source/rtp_rtcp_impl.cc
// Returns the number of milliseconds until the module want a worker thread
// to call Process.
int64_t ModuleRtpRtcpImpl::TimeUntilNextProcess() {
return std::max<int64_t>(0,
next_process_time_ - clock_->TimeInMilliseconds());
}
next_process_time_初始化值为clock_->TimeInMilliseconds() +kRtpRtcpMaxIdleTimeProcessMs(5ms)
默认初始情况为每隔5ms调用一次Process
#modules/rtp_rtcp/source/rtp_rtcp_impl.cc
// Process any pending tasks such as timeouts (non time critical events).
void ModuleRtpRtcpImpl::Process() {
const int64_t now = clock_->TimeInMilliseconds();
next_process_time_ = now + kRtpRtcpMaxIdleTimeProcessMs;
.......省略
if (rtcp_sender_.TimeToSendRTCPReport())
rtcp_sender_.SendRTCP(GetFeedbackState(), kRtcpReport);
if (TMMBR() && rtcp_receiver_.UpdateTmmbrTimers()) {
rtcp_receiver_.NotifyTmmbrUpdated();
}
}
- 首先更新next_process_time_,由此来看后续的回调时间是已流逝的时间加上5ms
- 先调用GetFeedbackState获取反馈状态
- 然后使用SendRTCP进行RTCP包发送
-
其大概的函数调用流程如下
- ModuleProcess线程周期性的调用ModuleRtpRtcpImpl::Process函数(最小调用间隔为5ms),该函数通过RTCPSender::TimeToSendRTCPReport函数判断当前是否要立即发送RTCP报文
- 若需要立即发送RTCP报文,首先通过ModuleRtpRtcpImpl::GetFeedbackState函数获取RTP包发送统计信息,然后调用RTCPSender::SendRTCP函数将RTCP数据发送到网络层.
- RTCPSender::SendRTCP函数调用RTCPSender::SendCompoundRTCP对RTCP包进行组合
- RTCPSender::SendCompoundRTCP函数首先会调用PrepareReport(feedback_state)根据feedback_state状态确定当前要发送那种类型的RTCP包,紧接着调用其编译函数,构造该类包,如SR包使用BuildSR函数,RR报文使用BuildRR等,最后将构造好的的RTCP包存到PacketContainer容器当中,最后调用其SendPackets函数进行发送
- 接下来以SR包为例,分析其构造原理,进而分析RTCP SR包相关内容,SR报文的发送时机是在接收到报文后如果由数据发送则会发送SR报文,相对与媒体发送端
#modules/rtp_rtcp/source/rtp_rtcp_impl.cc
// TODO(pbos): Handle media and RTX streams separately (separate RTCP
// feedbacks).
RTCPSender::FeedbackState ModuleRtpRtcpImpl::GetFeedbackState() {
RTCPSender::FeedbackState state;
// This is called also when receiver_only is true. Hence below
// checks that rtp_sender_ exists.
if (rtp_sender_) {
StreamDataCounters rtp_stats;
StreamDataCounters rtx_stats;
rtp_sender_->GetDataCounters(&rtp_stats, &rtx_stats);
state.packets_sent =
rtp_stats.transmitted.packets + rtx_stats.transmitted.packets;
state.media_bytes_sent = rtp_stats.transmitted.payload_bytes +
rtx_stats.transmitted.payload_bytes;
state.send_bitrate = rtp_sender_->BitrateSent();
}
state.module = this;
LastReceivedNTP(&state.last_rr_ntp_secs, &state.last_rr_ntp_frac,
&state.remote_sr);
state.last_xr_rtis = rtcp_receiver_.ConsumeReceivedXrReferenceTimeInfo();
return state;
}
- 首先根据ModuleRtpRtcpImpl::GetFeedbackState获取rtp发送统计信息
- 调用RTCPSender::GetDataCounters获取rtp_stats和rtx_stats,其中rtp_stats代表的是正常发送的RTP流的统计,而rtx_stats应该是丢包重传发送的RTP流的统计,GetDataCounters是从定义在RTCPSender的成员变量 StreamDataCounters rtp_stats_ RTC_GUARDED_BY(statistics_crit_);StreamDataCounters rtx_rtp_stats_ RTC_GUARDED_BY(statistics_crit_);中获取
- 这两个变量的更新是在RTPSender::SendPacketToNetwork将rtp包发送到网络层后如果发送成功则调用RTPSender::UpdateRtpStats函数对其更新统计
- StreamDataCounters主要在这里主要是统计成功发送的rtp包数量,发送的字节数等信息
- 最后调用LastReceivedNTP更新last_rr_ntp_secs,last_rr_ntp_frac以及remote_sr,其中last_rr_ntp_secs对应的是发送端上一次接收到SR报告时的NTP时间对应的秒数,last_rr_ntp_frac为发送端上一次接收到SR报告时的NTP时间的小数,该NTP时间都是在发送端进行计算的属于local NTP time
- remote_sr为remote ntp time 计算得来,用与本次SR报告发送是计算时延?
#modules/rtp_rtcp/source/rtp_rtcp_impl.cc
bool ModuleRtpRtcpImpl::LastReceivedNTP(
uint32_t* rtcp_arrival_time_secs, // When we got the last report.
uint32_t* rtcp_arrival_time_frac,
uint32_t* remote_sr) const {
// Remote SR: NTP inside the last received (mid 16 bits from sec and frac).
uint32_t ntp_secs = 0;
uint32_t ntp_frac = 0;
if (!rtcp_receiver_.NTP(&ntp_secs, &ntp_frac, rtcp_arrival_time_secs,
rtcp_arrival_time_frac, NULL)) {
return false;
}
*remote_sr =
((ntp_secs & 0x0000ffff) << 16) + ((ntp_frac & 0xffff0000) >> 16);
return true;
}
- 该函数的核心实现是通过RTCPReceiver::NTP函数是通过获取远程端发送过来的包得出来的,rtcp_arrival_time_secs为发送端上一次接收到SR报告的ntp时间的秒数(在发送端计算),rtcp_arrival_time_frac为发送端上一次接收到SR报告的ntp时间小数(在发送端计算),ntp_secs为发送端上一次接收到远端发送过来的SR报告,从其报告中解析其NTP字段得出的时间秒数,ntp_frac为发送端上一次接收到远端发送过来的SR报告,从其报告中解析其NTP字段得出的时间小数,对应的是remote NTP time
- remote_sr为remote ntp time 计算得来,用于填充SR报告中的LSR部分
- 成功获取到RTCPSender::FeedbackState后,到这里已经包含了在本次RTCP报文发送时已经得到了已经发送了多少的RTP数据等信息
int32_t RTCPSender::SendRTCP(const FeedbackState& feedback_state,
RTCPPacketType packetType,
int32_t nack_size,
const uint16_t* nack_list) {
return SendCompoundRTCP(
feedback_state, std::set<RTCPPacketType>(&packetType, &packetType + 1),
nack_size, nack_list);
}
- 以FeedbackState为参数调用SendCompoundRTCP将其发送到底层
int32_t RTCPSender::SendCompoundRTCP(
const FeedbackState& feedback_state,
const std::set<RTCPPacketType>& packet_types,
int32_t nack_size,
const uint16_t* nack_list) {
PacketContainer container(transport_, event_log_);
size_t max_packet_size;
{
rtc::CritScope lock(&critical_section_rtcp_sender_);
// Add all flags as volatile. Non volatile entries will not be overwritten.
// All new volatile flags added will be consumed by the end of this call.
SetFlags(packet_types, true);
.....
// We need to send our NTP even if we haven't received any reports.
RtcpContext context(feedback_state, nack_size, nack_list,
clock_->TimeInMicroseconds());
PrepareReport(feedback_state);
std::unique_ptr<rtcp::RtcpPacket> packet_bye;
auto it = report_flags_.begin();
while (it != report_flags_.end()) {
auto builder_it = builders_.find(it->type);
RTC_DCHECK(builder_it != builders_.end())
<< "Could not find builder for packet type " << it->type;
if (it->is_volatile) {
report_flags_.erase(it++);
} else {
++it;
}
BuilderFunc func = builder_it->second;
std::unique_ptr<rtcp::RtcpPacket> packet = (this->*func)(context);
if (packet == nullptr)
return -1;
// If there is a BYE, don't append now - save it and append it
// at the end later.
if (builder_it->first == kRtcpBye) {
packet_bye = std::move(packet);
} else {
container.Append(packet.release());
}
}
// Append the BYE now at the end
if (packet_bye) {
container.Append(packet_bye.release());
}
if (packet_type_counter_observer_ != nullptr) {
packet_type_counter_observer_->RtcpPacketTypesCounterUpdated(
remote_ssrc_, packet_type_counter_);
}
RTC_DCHECK(AllVolatileFlagsConsumed());
max_packet_size = max_packet_size_;
}
size_t bytes_sent = container.SendPackets(max_packet_size);
return bytes_sent == 0 ? -1 : 0;
}
- 首先SetFlags更新report_flags_集合将当前传入的packet_types插入到集合
- 以当前NTP时间做为参数构造RtcpContext
- PrepareReport以feedback_state为参数,来选择要构建什么样的rtcp报文,同时会根据feedback_state中当前rtp包的发送码率配合report_interval_ms_来刷新next_time_to_send_rtcp_也就是下次发送RTCP的时间
- 根据对PrepareReport的分析,它的原理是假设当前report_flags_集合中包含了kRtcpSr或者kRtcpRr并且volatile值为false的情况下表示RTCP已经有了则直接返回
- 否则如果RtcpMode为kCompound(webrtc默认为这个模式),则会根据sending_变量判断当前是发送端还是接收端,SetFlag(sending_ ? kRtcpSr : kRtcpRr, true),如果是发送端则准备发送发送者报告,如果是接收端不发送数据则准备发送RR报文
- 同时还会准备扩展报文如kRtcpAnyExtendedReports=>SetFlag(kRtcpAnyExtendedReports, true);
- PrepareReport函数更新完report_flags_集合,将要发送的报文的flag插入到该集合后,开始对report_flags_集合进行遍历,对report_flags_集合中已有的标记使用相应的build函数构造报文,如SR报文BuildSR函数
- 将构造好的rtcp::RtcpPacket添加到container容器,然后将当前已经构造完的报文的标记从report_flags_集合中移除
- 最后调用container.SendPackets(max_packet_size);将报文发给RTCP通道
- 以SR报文为例分析SR RTCP协议
协议地址:
http://tools.ietf.org/html/rfc3550#section-6.4.1
- RTCP SR报文分成两大部分第一部分是发送者信息,第二部分是接收者报告块
- 发送者信息块的描述如下
Fields | explain |
---|---|
NTP timestamp | 64 bits 网络时间戳,用于不同源之间的同步,如音频和视频 |
RTP timestamp | 32 bits RTP包发送的相对时间戳,该frame从编码器中编码出来的时间 |
sender's packet count | 32 bits 发送者总发送的包数,SSRC发生变化时会被重置 |
sender's octet count | 32 bits 发送者总发送的字节数=每个包的字节数X总的发包数 |
- 接收报告块的描述如下:
Fields | Explain |
---|---|
SSRC n | 32 bits, source identifier,接收到的每个媒体源,如音频和视频,n表示第几个 |
fraction lost: | 8 bits 上一次报告之后到本次报告之间的丢包比列 |
number of packets lost | 24 bits 自接收开始丢包总数,迟到的包不算 |
highest sequence number | 32 bits 低16位表示收到的最大seq,高16位表示seq的循环次数 |
interarrival jitter | 32 bits 估算的RTP包到达时间间隔的统计方差,延迟大小 |
last SR timestamp (LSR) | 32 bits 上一次接收到的SR的NTP时间戳(remote ntp time),取值为:ntp_msw&0xffff + ntp_lsw>>16(取ntp_msw的低16位和ntp_lsw的高16位) |
delay since last SR (DLSR) | 32 bits 从接收到上一个SR包到发送此接收报告块之间的延时,以1/65536秒为单位. |
- BuildSR函数的实现如下:
std::unique_ptr<rtcp::RtcpPacket> RTCPSender::BuildSR(const RtcpContext& ctx) {
// Timestamp shouldn't be estimated before first media frame.
RTC_DCHECK_GE(last_frame_capture_time_ms_, 0);
// The timestamp of this RTCP packet should be estimated as the timestamp of
// the frame being captured at this moment. We are calculating that
// timestamp as the last frame's timestamp + the time since the last frame
// was captured.
int rtp_rate = rtp_clock_rates_khz_[last_payload_type_];
if (rtp_rate <= 0) {
rtp_rate =
(audio_ ? kBogusRtpRateForAudioRtcp : kVideoPayloadTypeFrequency) /
1000;
}
// Round now_us_ to the closest millisecond, because Ntp time is rounded
// when converted to milliseconds,
// 同一帧数据的rtp_timestamp的时间戳是一样的
uint32_t rtp_timestamp =
timestamp_offset_ + last_rtp_timestamp_ +
((ctx.now_us_ + 500) / 1000 - last_frame_capture_time_ms_) * rtp_rate;
rtcp::SenderReport* report = new rtcp::SenderReport();
report->SetSenderSsrc(ssrc_);
report->SetNtp(TimeMicrosToNtp(ctx.now_us_));
report->SetRtpTimestamp(rtp_timestamp);
report->SetPacketCount(ctx.feedback_state_.packets_sent);
report->SetOctetCount(ctx.feedback_state_.media_bytes_sent);
report->SetReportBlocks(CreateReportBlocks(ctx.feedback_state_));
return std::unique_ptr<rtcp::RtcpPacket>(report);
}
- 依次依据RTCP SR报告约束填充信息
- 着重分析其接收报告块信息的填充
std::vector<rtcp::ReportBlock> RTCPSender::CreateReportBlocks(
const FeedbackState& feedback_state) {
std::vector<rtcp::ReportBlock> result;
if (!receive_statistics_)
return result;
// TODO(danilchap): Support sending more than |RTCP_MAX_REPORT_BLOCKS| per
// compound rtcp packet when single rtcp module is used for multiple media
// streams.
result = receive_statistics_->RtcpReportBlocks(RTCP_MAX_REPORT_BLOCKS);
if (!result.empty() && ((feedback_state.last_rr_ntp_secs != 0) ||
(feedback_state.last_rr_ntp_frac != 0))) {
// Get our NTP as late as possible to avoid a race.
uint32_t now = CompactNtp(TimeMicrosToNtp(clock_->TimeInMicroseconds()));
uint32_t receive_time = feedback_state.last_rr_ntp_secs & 0x0000FFFF;
receive_time <<= 16;
receive_time += (feedback_state.last_rr_ntp_frac & 0xffff0000) >> 16;
uint32_t delay_since_last_sr = now - receive_time;
// TODO(danilchap): Instead of setting same value on all report blocks,
// set only when media_ssrc match sender ssrc of the sender report
// remote times were taken from.
for (auto& report_block : result) {
report_block.SetLastSr(feedback_state.remote_sr);
report_block.SetDelayLastSr(delay_since_last_sr);
}
}
return result;
}
- 首先receive_statistics_->RtcpReportBlocks获取接收报告块的个数
- 计算delay_since_last_sr也就是本次发送SR报告距离上一次收到SR报告之间的时延
- SetLastSr(feedback_state.remote_sr),将上一次接收到的SR报告中的ntp时间填充LSR
- SetDelayLastSr填充本次发送SR报告距离上一次收到SR报告之间的时延