WebRTC源码分析-线程基础之Message && MessageData && MessageHandler

前言

本文将介绍消息循环中的消息(Message),消息中持有的数据(MessageData),处理消息的Handler(MessageHandler)的基本内容。

其中Message与MessageData相关的结构体位于rtc_base/message_queue.h中,MessageHandler相关的类位于rtc_base/message_handler.h中

消息Message

WebRTC中消息相关的类分为两种,一种是Message,表征的是即时消息,投放到消息循环中期待能被立马消费;另外一种是DelayedMessage,表征的是延迟消息,投放到消息循环中不会立马被消费,而是延迟一段时间才会被消费。

Message 源码如下所示

struct Message {
  Message()
      : phandler(nullptr), message_id(0), pdata(nullptr), ts_sensitive(0) {}
  inline bool Match(MessageHandler* handler, uint32_t id) const {
    return (handler == nullptr || handler == phandler) &&
           (id == MQID_ANY || id == message_id);
  }
  Location posted_from;             // 消息自哪儿产生
  MessageHandler* phandler;    // 消息如何处理
  uint32_t message_id;              // 消息id
  MessageData* pdata;              // 消息中携带的数据
  int64_t ts_sensitive;                 // 消息产生的时间
};

Message结构体成员有如下几种:

  • Location posted_from: 该成员描述了此消息结构是在哪个函数,哪个文件的哪一行产生的,即消息产生的源头。通常实际使用中使用宏RTC_FROM_HERE来产生Location对象,Location类的简单分析见WebRTC源码分析-定位之Location
  • MessageHandler* phandler:该成员持有消息如何被消费的方法,当消息从消息循环中被取出后,将使用 MessageHandler的OnMessage(Message* msg)来对消息进行处理。特别的PeerConnection对象也是MessageHandler,实现了OnMessage(Message* msg)方法。
  • uint32_t message_id:32位无符号整型的消息id。通常有两类特别的消息,使用特殊的消息id来识别,分别是MQID_ANY (-1)表示所有的消息;MQID_DISPOSE(-2)表示需要丢弃的消息,该消息已经在MQM的分析文章中出现,详见WebRTC源码分析-线程基础之MessageQueueManager。当然,还有使用其他id值用以做特殊处理的,比如某个实现了MessageHandler.OnMessage方法的类,可能需要处理好几种不同的消息,那么就可以将不同消息id值当作区分消息类别的标志,从而在OnMessage方法中分门别类处理好几种消息了。
  • int64_t ts_sensitive:64位时间戳,单位ms。当不关心消息是否处理过慢时,也即消息时间不敏感时,该值为0;若关心消息是否得到即时处理,一般会设置ts_sensitive为消息创建时的时间戳 + kMaxMsgLatency常量(150ms),当该消息从消息循环中取出被处理时,将会检测当前时间msCurrent与ts_sensitive的大小,若msCurrent>ts_sensitive,则表示该消息并没有得到即时的处理,会打印警告日志。超时时间计算为msCurrent-ts_sensitive+kMaxMsgLatency。
  • MessageData* pdata:消息数据。有多个变种,后面详述。

Message结构体还提供了两个方法。

  • 构造函数:没啥好说,Message没有提供析构函数,那么Message持有的消息处理器phandler以及消息数据pdata何时被销毁就非常值得注意了。
  • 匹配函数:该函数在MQ清理消息时,用于评判哪些消息是满足条件的,即匹配上了。看源码可知,只要handler为空或者相等 并且 消息id为MQID_ANY或者相等,就被认为是找到了满足条件的消息。

DelayedMessage源码如下

// DelayedMessage goes into a priority queue, sorted by trigger time.  Messages
// with the same trigger time are processed in num_ (FIFO) order.
class DelayedMessage {
 public:
  DelayedMessage(int64_t delay,
                 int64_t trigger,
                 uint32_t num,
                 const Message& msg)
      : cmsDelay_(delay), msTrigger_(trigger), num_(num), msg_(msg) {}

  bool operator<(const DelayedMessage& dmsg) const {
    return (dmsg.msTrigger_ < msTrigger_) ||
           ((dmsg.msTrigger_ == msTrigger_) && (dmsg.num_ < num_));
  }

  int64_t cmsDelay_;  // for debugging
  int64_t msTrigger_;
  uint32_t num_;
  Message msg_;
};

DelayedMessage与Message并非是继承is-a关系,而是has-a的关系。提供了除Message msg_成员之外的另几个成员:

  • int64_t cmsDelay_:延迟时间,消息延迟多长时间后需要被处理,单位ms
  • int64_t msTrigger_:触发时间,消息需要被处理的时间戳,为消息创建时的时间戳+cmsDelay_,单位ms
  • uint32_t num_:消息的序号,在某个MQ中延迟消息的num_单调递增。

由于DelayedMessage在MQ中会放至于专门的优先级队列中,优先级如何确定?DelayedMessage重载了小于"<"运算符以确定排序,先按消息需要被处理的触发时间戳msTrigger_排序;若触发时间相同,则按消息序号排序。当某个延迟消息要投放到延迟队列中时,该优先级队列会根据“<”运算符去比较这个延迟消息与队列中的消息以确定应在队列的哪个位置插入该消息。

MessageList被定义为std::list<Message>, MessageQueue中的即时消息就存储于MessageList类别的列表中,按照先入先出的方式挨个进行处理。

typedef std::list<Message> MessageList;

消息MessgeData

MessgeData 消息数据基类,其析构函数被定义为virtual类型,以防内存泄漏

class MessageData {
 public:
  MessageData() {}
  virtual ~MessageData() {}
};

TypedMessageData 使用模板定义的MessageData 的一个子类,便于扩展。

template <class T>
class TypedMessageData : public MessageData {
 public:
  explicit TypedMessageData(const T& data) : data_(data) {}
  const T& data() const { return data_; }
  T& data() { return data_; }
 private:
  T data_;
};

ScopedMessageData 类似于 TypedMessageData,用于指针类型。在析构函数中,自动对该指针调用 delete。

// Like TypedMessageData, but for pointers that require a delete.
template <class T>
class ScopedMessageData : public MessageData {
 public:
  explicit ScopedMessageData(std::unique_ptr<T> data)
      : data_(std::move(data)) {}
  // Deprecated.
  // TODO(deadbeef): Remove this once downstream applications stop using it.
  explicit ScopedMessageData(T* data) : data_(data) {}
  // Deprecated.
  // TODO(deadbeef): Returning a reference to a unique ptr? Why. Get rid of
  // this once downstream applications stop using it, then rename inner_data to
  // just data.
  const std::unique_ptr<T>& data() const { return data_; }
  std::unique_ptr<T>& data() { return data_; }

  const T& inner_data() const { return *data_; }
  T& inner_data() { return *data_; }

 private:
  std::unique_ptr<T> data_;
};

ScopedRefMessageData 类似于ScopedMessageData,用于引用计数的指针类型。

// Like ScopedMessageData, but for reference counted pointers.
template <class T>
class ScopedRefMessageData : public MessageData {
 public:
  explicit ScopedRefMessageData(T* data) : data_(data) {}
  const scoped_refptr<T>& data() const { return data_; }
  scoped_refptr<T>& data() { return data_; }

 private:
  scoped_refptr<T> data_;
};

DisposeData 这个值得注意下,和前面几个MessageData子类不一样,该对象并没有提供其内部数据data_的方法,那么该对象正如其名,持有需要进行销毁的数据。在哪些场景下使用?

  • 有些函数不便在当前函数范围内销毁对象,那么就可以将需要销毁的对象封装到DisposeData中,并进一步封装成消息并投递到消息队列中,等待线程的消息循环取出消息并进行销毁时将该对象销毁。见范例 HttpServer::Connection::~Connection;这里类似于QT中QObject的deletelater()方法的作用,起到延迟销毁的作用。
  • 某对象属于某一线程,因此销毁操作应该交给所有者线程。这个可以通过调用对应线程对象Thread的MessageQueue的Dispose(T* doomed)来实现。在介绍MessageQueue的文章中会详述。
template <class T>
class DisposeData : public MessageData {
 public:
  explicit DisposeData(T* data) : data_(data) {}
  virtual ~DisposeData() { delete data_; }

 private:
  T* data_;
};

WrapMessageData && UseMessageData 两个模板方法,分别用来将数据封装成MessageData以及取出对应的数据,封装拆箱操作。

template <class T>
inline MessageData* WrapMessageData(const T& data) {
  return new TypedMessageData<T>(data);
}

template <class T>
inline const T& UseMessageData(MessageData* data) {
  return static_cast<TypedMessageData<T>*>(data)->data();
}

消息处理器MessageHandler

MessageHandler 消息处理器的基类,子类在继承了该类之后要重载 OnMessage 函数,在其中实现消息响应的逻辑。

class MessageHandler {
 public:
  virtual ~MessageHandler();
  virtual void OnMessage(Message* msg) = 0;
 protected:
  MessageHandler() {}
 private:
  RTC_DISALLOW_COPY_AND_ASSIGN(MessageHandler);
};

FunctorMessageHandler MessageHandler的模板子类,用于帮助实现阻塞地跨线程在指定线程上执行某个方法,并可获取执行结果。Thread的Invoke()方法使用该类。该类的构造函数中使用了C++11的右值引用,转发语义std::forward,以及转移语义std::move。详见博客C++11 std::move和std::forward

// Helper class to facilitate executing a functor on a thread.
template <class ReturnT, class FunctorT>
class FunctorMessageHandler : public MessageHandler {
 public:
  explicit FunctorMessageHandler(FunctorT&& functor)
      : functor_(std::forward<FunctorT>(functor)) {}
  virtual void OnMessage(Message* msg) { result_ = functor_(); }
  const ReturnT& result() const { return result_; }
  // Returns moved result. Should not call result() or MoveResult() again
  // after this.
  ReturnT MoveResult() { return std::move(result_); }
 private:
  FunctorT functor_;
  ReturnT result_;
};

无返回值特例FunctorMessageHandler 返回值类型为 void 的函数的FunctorMessageHandler特化版本

// Specialization for ReturnT of void.
template <class FunctorT>
class FunctorMessageHandler<void, FunctorT> : public MessageHandler {
 public:
  explicit FunctorMessageHandler(FunctorT&& functor)
      : functor_(std::forward<FunctorT>(functor)) {}
  virtual void OnMessage(Message* msg) { functor_(); }
  void result() const {}
  void MoveResult() {}
 private:
  FunctorT functor_;
};

总结

至此,基本上将MQ相关的“边角料”介绍完毕了,重点的知识再回顾下:

  • WebRTC中有两类消息需要在消息循环中得以处理,即时消息Message以及延迟消息DelayedMessage。他们被投递进入消息循环时,分别进入不同的队列,即时消息Message进入MessageLits类别的即时消息队列,该队列是先入先出的对列,这类消息期待得到立即的处理;延迟消息DelayedMessage进入PriorityQueue类别的延迟消息队列,该队列是优先级队列,根据延迟消息本身的触发时间以及消息序号进行排序,越早触发的消息将越早得以处理。如果再算上线程上同步发送消息,同步阻塞执行方法的话还有另外一个SendList,当然,这不是本文需要说明的内容了。
  • 消息数据的类别有好多种,各自起到不同的作用,尤其要注意DisposeData用来利用消息循环处理消息的功能来自然而然地销毁某个类别的数据。
  • 消息处理器最重要的就是其OnMessage方法,该方法是消息最终得以处理的地方。WebRTC中的很多重要的类就是MessageHandler的子类,比如PeerConnection;
  • 消息处理器的子类FunctorMessageHandler为跨线程执行方法提供了便利,后续会在线程相关的文章中重点阐述。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,949评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,772评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,419评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,812评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,927评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,102评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,171评论 3 411
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,921评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,366评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,675评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,820评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,523评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,162评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,126评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,647评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,732评论 2 351

推荐阅读更多精彩内容