Live555源码解析(1) - Main 寻根问祖,留其筋骨

开始分析MediaServer源码前,结合官方文档说明,对各文件夹源码总结如下:

  • groupsock
    静态库,封装了network interfaces和sockets。举例而言,Groupsock类中封装了一个收发组播数据包的socket。

  • liveMedia
    静态库,定义了一系列类簇,根类为Medium,这些类支持了多种媒体类型和编解码器。

  • WindowsAudioInputDevice
    静态库,实现了liveMedia中的AudioInputDevice虚基类,可为Windows程序提供从输入设备中读取PCM audio samples的功能。

  • UsageEnvironment
    静态库,内部主要有如下几个虚基类

    • TaskScheduler

      为DelayedTask、socket后台操作处理、事件event提供调度支持

    • HashTable

      定义通用hash表的接口,供其他代码使用

    • UsageEnvironment

      集成TaskScheduler、groupsock、liveMedia等模块功能,添加Result、Error消息机制,从而提供使用环境。

  • BasicUsageEnvironment
    静态库,由于UsageEnvironment中仅给出虚基类定义,为了能够真正地运行,所以从UsageEnvironment继承并实现了BasicUsageEnvironment类。它用于实现易用的命令行程序,其中,Read events和delayed operations通过select()循环进行支持。

  • mediaServer
    可执行文件,通过逻辑集成上述库所提供的的功能,提供串流服务。

MediaServer

MeidaServer运行效果如图所示:

面对一个庞大的项目工程,想要分析源码,必须先找出有效突破口,本文中将从live555MediaServer.cpp 中main()函数开始切割工程。

1. main()

main()函数并不复杂,下面列出整理后的源码:

int main(int argc, char **argv)
{
    // 准备使用环境
    TaskScheduler* scheduler = BasicTaskScheduler::createNew();
    UsageEnvironment* env = BasicUsageEnvironment::createNew(*scheduler);
    
    // RTSP Server授权控制,默认关闭. 如需开启,按如下步骤:
    // 1. define ACCESS_CONTROL
    // 2. authDB->addUserRecord("xxxx", "xxxx");
    UserAuthenticationDatabase* authDB = NULL;
#ifdef ACCESS_CONTROL
    authDB = new UserAuthenticationDatabase;
    authDB->addUserRecord("username1", "password1");
#endif  
    
    // 创建RTSPServer,默认使用554端口,如已被占用或其他错误,则尝试使用8554端口
    RTSPServer* rtspServer;
    portNumBits rtspServerPortNum = 554;
    rtspServer = DynamicRTSPServer::createNew(*env, rtspServerPortNum, authDB);
    if(rtspServer == NULL)
    {
        rtspServerPortNum = 8554;
        rtspServer = DynamicRTSPServer::createNew(*env, rtspServerPortNum, authDB);
    }
    if(rtspServer == NULL)
    {
        *env << "Failed to create RTSP server: " << env->getResultMsg() <<"\n";
        exit(1);
    }
    
    // 打印版本信息、rtspURL前缀、及其他说明信息
    *env << ...
    
    //@1.1 尝试建立一个HTTP server,用于提供 RTSP-over-HTTP通道,尝试端口为80, 8000, 8080
    if(rtspServer->setUpTunnelingOverHTTP(80) || rtspServer->setUpTunnelingOverHTTP(8000)
        || rtspServer->setUpTunnelingOverHTTP(8080))
    {
        *env << "(We use port " << rtspServer->httpServerPortNum() << " for optional RTSP-over-HTTP tunneling, or for HTTP live streaming (for indexed Transport Stream files only).)\n";
    } 
    else
    {
        *env << "(RTSP-over-HTTP tunneling is not available.)\n";
    }
    
    //@2 开启事件循环
    env->taskScheduler().doEventLoop();
    return 0;
}

@1.1 建立HTTP server

所谓Tunneling,实质上分为三个步骤:

  • 将源数据(无论什么格式)抽象为简单数据
  • 发送端使用目标协议对数据进行包裹,此处为HTTP
  • 接收端参照协议对数据解包,重新获得源数据

为什么要进行HTTP Tunneling?

因为部分客户端可能为iPhones或iPads,而这些设备仅支持Apple家的HLS(HTTP Live Streaming)机制的串流。具体信息可参阅官方文档中Streaming to iPhones and iPads一节。

@1.2 开启事件循环

时间循环的开启实质上是通过UsageEnvironment的TaskScheduler中doEventLoop()函数完成的,这是整个Server活过来的key,因此必然需要进一步跟踪进去。

2. env->taskScheduler().doEventLoop()

void BasicTaskScheduler0::doEventLoop(char volatile* watchVariable)
{
    while(1){
        //@2.1 wathcVariable
        if(watchVariable != NULL && *watchVariable != 0) break;
        //@2.2 SingleStep()
        SingleStep();
    }
}

@2.1 watchVariable

当且仅当watchVariable有值时提前break。查找doEventLoop()调用处,发现除main()中该值为NULL外,其他地方都给出了变量用于控制执行。

调用者分别有:

  • live555MediaServer -> main()
  • DynamicRTSPServer
  • AC3AudioStreamFramer
  • DVVideoStreamFramer
  • H264VideoFileServerMediaSubsession
  • H265VideoFileServerMediaSubsession
  • MP3StreamState
  • MPEG1or2FileServerDemux
  • MPEG4VideoFileServerMediaSubsession
  • SIPClient

@2.2 SingleStep

查找得到该函数实现部分位于BasicTaskScheduler中。此处不免有些困惑,为何不直接放在类BasicTaskScheduler0中?

为了解开这个困惑,此处先粗略整理如下继承、功能关系(后续会具体分析):

  • 虚基类TaskScheduler

    含DelayedTask、socket operations background handling以及直接的EventTrigger三类事件处理接口。

  • BasicTaskScheduler0 : public TaskScheduler

    主要实现doEventLoop()函数及DelayedTask、EventTrigger部分

  • BasicTaskScheduler : public BasicTaskScheduler0

    主要实现SingleStep()函数及background handling部分

3. SingleStep()

void BasicTaskScheduler::SingleStep(unsigned maxDelayTime)
{
    fd_set readSet = fReadSet;
    fd_set writeSet = fWriteSet;
    fd_set exceptionSet = fExceptionSet
    
    DelayInterval const& timeToDelay = fDelayQueue.timeToNextAlarm();
    struct timeval tv_timeToDelay;
    tv_timeToDelay.tv_sec = timeToDelay.seconds();
    tv_timeToDelay.tv_usec = timeToDelay.useconds();
    // .tv_sec过大(<= 11.5days)会引发select()失败,所以先规范tv_timeToDelay值域
    ...
    
    //@3.1 select
    int selectResult = select(fMaxNumSockets, &readSet, &writeSet, &exceptionSet, &tv_timeToDelay);
    if(selectResult <0)
    {
        if( GetLastError() != EINTR ) // 异常错误,视为严重故障;打印错误信息后退出
        {           
            print_Set_info();
            abort();                
        }
    }
    else //if(selectResult <0)
    {
        //@3.2 HandlerIterator, HandlerDescriptor, HandlerSet
        HandlerIterator iter(*fHandlers);
        HandlerDescriptor* handler;
        
        //@3.3 fLastHandledSocketNum
        if(fLastHandledSocketNum >= 0)
        {   
            // 如已处理过socket读写,则找到前次socket读写的下一个链表节点
            while((handler = iter.next()) != NULL)
                if(handler->socketNum == fLastHandledSocketNum) break;
            
            if(handler == NULL)  // 未找到,重置相关值
            {               
                fLastHandlerSocketNum = -1;
                iter.reset();
            }
        }
        while((handler = iter.next()) != NULL)
        {
            // 找到链表中合法节点,开始处理
            int sock = handler->socketNum;
            int resultConditionSet = 0;
            if(FD_ISSET(sock, &readSet) && FD_ISSET(sock, &fReadSet))
                resultConditionSet |= SOCKET_READABLE;
            if(FD_ISSET(sock, &writeSet) && FD_ISSET(sock, &fWriteSet) 
                resultConditionSet |= SOCKET_WRITEABLE;
            if(FD_ISSET(sock, &exceptionSet) && FD_ISSET(sock, &fExceptionSet)
                resultConditionSet |= SOCKET_EXCEPTION;
            if((resultConditionSet&handler->conditionSet) != 0 && handler->handlerProc != NULL)
            {
                // 保存当前处理节点socketNum
                fLastHandledSocketNum = sock;
                (*handler->handlerProc)(handler->clientData, resultConditionSet);
                break;
            }
        } // while((handler = iter.next()) != NULL)
        
        if(handler == NULL && fLastHandledSocketNum >= 0)
        {
            // 上一次处理的socketNum不为0,且未在当前链表中找到,但只要链表本身不为空,就必须处理socket读写
            int sock = handler->socketNum;
            int resultConditionSet = 0;
            if(FD_ISSET(sock, &readSet) && FD_ISSET(sock, &fReadSet)) resultConditionSet |= SOCKET_READABLE;
            if(FD_ISSET(sock, &writeSet) && FD_ISSET(sock, &fWriteSet) resultConditionSet |= SOCKET_WRITEABLE;
            if(FD_ISSET(sock, &exceptionSet) && FD_ISSET(sock, &fExceptionSet) resultConditionSet |= SOCKET_EXCEPTION;
            if((resultConditionSet&handler->conditionSet) != 0 && handler->handlerProc != NULL)
            {
                fLastHandledSocketNum = sock;
                (*handler->handlerProc)(handler->clientData, resultConditionSet);
                break;
            }
            if(handler == NULL) fLastHandledSocketNum = -1;
        }
        
        //@3.4 fTriggersAwaitingHandling && fLastUsedTriggerMask
        if(fTriggersAwaitingHandling != 0)
        {
            if(fTriggersAwaitingHandling == fLastUsedTriggerMask)
            {
                fTriggersAwaitingHandling &= ~fLastUsedTriggerMask;
                if(fTriggeredEventHandlers[fLastUsedTriggerNum] != NULL)
                {
                    (*fTriggeredEventHandlers[fLastUsedTriggerNum])(fTriggeredEventClientDtas[fLastUsedTriggerNum]);
                }
            }
            else
            {
                unsigned i = fLastUsedTriggerNum;
                EventTriggerId mask = fLastUsedTriggerMask;
                
                do{
                    i = (i+1)%MAX_NUM_EVENT_TRIGGERS;
                    mask >>= 1;
                    if(mask == 0) mask = 0x80000000;
                    
                    if((fTriggersAwaitingHandling&mask) != 0)
                    {
                        fTriggersAwaitingHandling &= ~mask;
                        if(fTriggeredEventHandlers[i] != NULL)
                        {
                            (*fTriggeredEventHandlers[i])(fTriggeredEventClientDatas[i]);
                        }
                        
                        fLastUsedTriggerMask = mask;
                        fLastUsedTriggerNum = i;
                        break;
                    }
                } while(i != fLastUsedTriggerNum);
            }
        }
    }   
    //@3.5 hadnleAlarm
    fDelayQueue.hadnleAlarm();
}   

@3.1 select

select(或pselect)允许进程监控多个文件描述符(fd),阻塞直到至少一个fd处于I/O操作ready状态为止。通常fd会在可进行进行块读取或高效写操作时,切换为ready状态。

当select成功时,会返回参数中三个set(read/write/exception)之一的fd,如返回值为0,表示超时前并未出发任何fd。
如果返回值为-1,表示函数出错,可进一步获取文件错误代码(Window使用WSAGetLastError, Linux中直接使用errno值)并进行判断分析:

  • EBADF set参数中存在无效fd
  • EINTR 中断信号
  • EINVALnfds参数为负数,或timeout时长无效
  • ENOMEM内部表格分配内存失败

更进一步的select说明,将在后续单独说明后给出链接。

@3.2 HandlerIterator, HandlerDescriptor, HandlerSet

千言万语不如一张图。下图中列出三者关系,颜色与说明相对应。


那么问题来了,该双向链表是怎么产生的?也就是说谁,何时,在哪里调用了HandlerSet::assignHandler()函数?
通过查找assignHandler,发现唯一调用入口为BasicTaskScheduler::setBackgroundHandling。进一步的,调用setBackgroundHandling()函数的有如下位置:

GenericMediaServer::ClientConnection::ClientConnection() READABLE|EXCEPTION incomingRequstHandler
SocketDescriptor::registerRTPInterface() READABLE|EXCEPTION tcpReadHandler
RTSP::RTSPClient() READABLE|EXCEPTION incomingDataHandler
RTSPClient::openConnection() READABLE|EXCEPTION incomingDataHandler
RTSPClient::openConnection() READABLE|EXCEPTION incomingDataHandler
RTSPClient::connectToServer() WRITABLE|EXCEPTION connectionHandler
RTSPClient::handleAlternativeRequestByte1() READABLE|EXCEPTION incomingDataHandler
RTSPClient::connectionHandler1() READABLE|EXCEPTION incomingDataHandler
RTSPServer::RTSPClientConnection::handleAlternativeRequestByte1() READABLE|EXCEPTION incomingRequestHandler
RTSPServer::RTSPClientConnection::changeClientInputSocket() READABLE|EXCEPTION incomingRequstHandler
TCPStreamSink::processBuffer() WRITABLE socketWritableHandler

大致分析可得出如下结论:

  • 事件来源主要为创建RTSP实例,RTSPClient、RTSPServer间交互,以及TCPSink进行Buffer处理
  • 事件Handler对应为Request处理,连接建立、接收数据处理,以及TCP读写

事件部分本篇就到此为止,后续将更进一步展开分析。

@3.3 fLastHandledSocketNum

fLastHandledSocketNum只有两种取值可能:

  • -1 :HandlerSet为空时,因此默认值也为-1
  • 已成功处理过得Handler socketNum值
    具体执行过程,已在上述源码中添加注释。

@3.4 fTriggersAwaitingHandling && fLastUsedTriggerMask

由于event handler会修改可读socket set,因此我们在处理完socket后才开始检查Event情况。
fTriggersAwaitingHandlingfLastUsedTriggerMask都是以32bit值进行实现的,也就是说,使用时,他们的每一个bit都具有意义。

默认情况下,fTriggersAwaitingHandling值为0,fLastUsedTriggerMask为1。而源码中可以看到,如fTriggersAwaitingHandling为0值时,是不会启动event trigger机制的。因此首先要找到,fTriggersAwaitingHandling值会首先在哪里被修改。结果如下:

  • BasicTaskScheduler0::deleteEventTrigger() fTriggersAwaitingHandling &= ~eventTriggerId
  • BasicTaskScheduler0::triggerEvent() fTriggersAwaitingHandling |= eventTriggerId;

即Event处理机制启动前,必须先triggerEvent进行添加,且在处理前未deleteEventTrigger才可以生效。
fLastUsedTriggerMask的功效和fLastHandledSocketNum类似,都只是为了更快的定位到开始点,并进行循环查找。

@3.5 hadnleAlarm

void DelayQueue::handleAlarm()
{
    // @3.5.1 synchronize
    if(head()->fDeltaTimeRemaining != DELAY_ZERO) synchronize();    
    if(head()->fDelatTimeRemaining == DELAY_ZERO) 
    {
        DelayQueueEntry *toRemove = head();
        removeEntry(toRemove);
        toRemove->handleTimeout();
    }
}
@3.5.1 synchronize

检查到DeltaTimeRemaining不为0时,需要进行同步。这里需要补充下定时器的概念。

定时器实现
计算机中存在两种时钟,分别为:

  • 硬件时钟,电池供电,也称为RTC,在系统关机时起作用
  • 系统时钟,基于RTC根据CPU中断计时,电脑运行时有效

两者频率并不严格相同,细节部分这里不具体讨论,这里只要知道系统时钟的前提是CPU中断即可。

假设开启了一个计时器,比如Sleep(5000),然而CPU每个时钟周期并非以ms为单位,如2.4GHz的CPU,时钟周期为(1/2.4G)s,因此每个时钟周期或中断产生(具体机制视CPU而定),会检查剩余时间是否为0甚至小于0。时钟周期的差异会引发计时器的误差,也因为如此,计时器都是有精度的,并非都能实现所需精度。

回到源码,这里的DelayedTask也是使用了类似的定时器机制,每次同步仅仅是跟当前时间作比较,并更新timeSinceLastSync。当timeSinceLastSync >= fDeltaTimeRemaining时,判定为定时器timeout,并执行对应TaskFunc()

总结

至此,MediaServer大的轮廓已经分析完毕,主要相关的动线有三条:

  • Socket I/O
  • Event
  • DelayedTask

只要牢牢抓住这三条动线,结合相关入口、出口,就可以将MediaServer源码的血肉充实起来。本篇中如有疑虑或错误指出,还请大家指出。同样可以提出想要着重了解的内容,后续更新会选择性加入其中。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,194评论 6 490
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,058评论 2 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,780评论 0 346
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,388评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,430评论 5 384
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,764评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,907评论 3 406
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,679评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,122评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,459评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,605评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,270评论 4 329
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,867评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,734评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,961评论 1 265
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,297评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,472评论 2 348

推荐阅读更多精彩内容