Android的Java Handler机制想必大家都清楚怎么用,那Native层的Handler机制怎么使用的呢?
Epoll机制介绍
epoll在Linux2.6内核正式提出,是基于事件驱动的I/O方式,相对于select来说,epoll没有描述符个数限制,使用一个文件描述符管理多个描述符,将用户关心的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。
Linux中提供的epoll相关函数如下:
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
epoll_create 函数创建一个epoll句柄,参数size表明内核要监听的描述符数量。调用成功时返回一个epoll句柄描述符,失败时返回-1。
-
epoll_ctl 函数注册要监听的事件类型。四个参数解释如下:
epfd 表示epoll句柄
op 表示fd操作类型,有如下3种
EPOLL_CTL_ADD 注册新的fd到epfd中
EPOLL_CTL_MOD 修改已注册的fd的监听事件
EPOLL_CTL_DEL 从epfd中删除一个fd
fd 是要监听的描述符
event 表示要监听的事件
epoll_event 结构体定义如下:
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
-
epoll_wait 函数等待事件的就绪,成功时返回就绪的事件数目,调用失败时返回 -1,等待超时返回 0。
epfd 是epoll句柄
events 表示从内核得到的就绪事件集合
maxevents 告诉内核events的大小
timeout 表示等待的超时事件
epoll是Linux内核为处理大批量文件描述符而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。
epoll除了提供select/poll那种IO事件的水平触发(Level Triggered)外,还提供了边缘触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。
Native Looper的用法
- 创建Looper对象
sp<Looper> mLooper = ReverseLooper::prepare(0);
- 在工作线程中启用Looper循环
void threadloop(){
int32_t ret = mLooper->pollOnce(-1);
switch (ret){
case ReverseLooper::POLL_WAKE:
case ReverseLooper::POLL_CALLBACK:
ALOGD("Looper::POLL_WAKE OR POLL_CALLBACK");
break;
case ReverseLooper::POLL_ERROR:
ALOGE("Looper::POLL_ERROR");
case ReverseLooper::POLL_TIMEOUT:
// timeout (should not happen)
ALOGD("Looper::POLL_TIMEOUT");
break;
default:
// should not happen
ALOGE("Looper::pollOnce() returned unknown status %d", ret);
break;
}
}
- 创建自己的处理消息的MessageHandler
class MyHandler : public MessageHandler{
public:
virtual void handleMessage(const Message& message);
};
sp<MyHandler> mHandler = new MyHandler;
- 发送消息
mLooper->sendMessage(mHandler, Message(msgID));
mLooper->sendMessageDelayed(ms2ns(timeout), mHandler, Message(msgID));
Looper 不仅仅可以做消息队列, 还可以监听其他的文件描述符的事件,如Socket fd等,暂不分析这一块的内容
Looper机制分析
Looper初始化
Looper初始化首先调用了Looper.prepare方法
sp<Looper> Looper::prepare(int opts) {
//从当前线程的ThreadLocal中获取Looper对象
sp<ReverseLooper> looper = ReverseLooper::getForThread();
if (looper == NULL) {
//如果没有则创建一个Looper对象,放入当前线程的ThreadLocal中
looper = new ReverseLooper(allowNonCallbacks);
ReverseLooper::setForThread(looper);
}
return looper;
}
prepare方法比较简单,就是从当前的线程的ThreadLocal内存中获取Looper对象,如果未获取Looper对象,则先创建一个,放入到线程的ThreadLocal中。
Looper::Looper(bool allowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
//创建了一个event的文件描述符,该描述符用于监听是否有Message消息
mWakeEventFd = eventfd(0, EFD_NONBLOCK);
AutoMutex _l(mLock);
//初始化Epoll
rebuildEpollLocked();
}
Looper的构造方法用eventfd创建了一个事件的文件描述符mWakeEventFd,后续Looper 发送的消息是否收到都是监听这个文件描述符的。
然后调用rebuildEpollLocked初始化Epoll机制.
void ReverseLooper::rebuildEpollLocked() {
// 如果mEpollFd > 0, 则先close掉就的Epoll
if (mEpollFd >= 0) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this);
#endif
close(mEpollFd);
}
// 创建一个新的Epoll
mEpollFd = epoll_create(EPOLL_SIZE_HINT);
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno);
//将上一步创建的mWakeEventFd添加到Epoll的事件监听中,当有可读事件之后会唤醒Epoll并回调。
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeEventFd;
int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance. errno=%d",
errno);
}
rebuildEpollLocked用来初始化Epoll机制,首先调用epoll_create创建了一个epoll, 然后将mWakeEventFd添加到Epoll的事件监听中,等有新的可读消息到来的时候,Epoll会被唤醒并回调通知.
到此为止,Epoll的初始化完成了,接着看Looper.pollonce()方法.
Looper初始化完成之后,就要开始进入循环等待并处理消息队列中的消息了
int ReverseLooper::pollInner(int timeoutMillis) {
//参数timeout是等待超时时间,此处要进行调整
//参数 -1表示一直等待,没有超时时间,直到有可读消息唤醒再返回
//但是消息队列中的下一条要处理的消息超时时间是小于我们设置的超时时间的话,就要调整,以最近要唤醒的时间为准
// Adjust the timeout based on when the next message is due.
if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
if (messageTimeoutMillis >= 0
&& (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
timeoutMillis = messageTimeoutMillis;
}
}
// Poll.
int result = POLL_WAKE;
// We are about to idle.
mPolling = true;
//开始等待消息
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
......
我们知道Looper是有一个消息队列的,队列中有两类消息,一类是立刻需要处理的,另一类是设定延迟,到指定时间在处理。每当有消息发送,会存档到消息队列,存放的时候会按时间排序,最先要处理的放在队列的头部,并把改消息要处理的时间设置到mNextMessageUptime中。
Timeout时间参数的意义和Epoll的超时时间是一致的
timeout > 0: 表示最多等待多长时间就返回,即使没有任何可读消息
timeout = 0: 表示调用后立刻返回,不阻塞
timeout = -1: 表示没有超时时间,一直等到有可读消息为止.
示例代码中,我们设置的参数是-1,如果消息队列中没有消息,此处参数仍然是-1,如果消息队列中有等待处理的消息,我们就计算出到下一条消息的时间,作为超时时间,等到时间后,唤醒Epoll开始处理.
此处是我们还没有发送消息,所以消息队列是空,参数仍然是-1。
接着调用epoll_wait开始等待,超时或者消息唤醒.
消息发送
消息发送,我们就以延迟消息为例进行分析
void ReverseLooper::sendMessageDelayed(nsecs_t uptimeDelay, const sp<MessageHandler>& handler,
const Message& message) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
sendMessageAtTime(now + uptimeDelay, handler, message);
}
参数中Handler指针就是我们自己实现的MessageHandler的指针,计算出该消息触发的时间,当前时间+延迟时间。
然后调用sendMessageAtTime进行处理
void ReverseLooper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
const Message& message) {
size_t i = 0;
{ // acquire lock
AutoMutex _l(mLock);
//从mMessageEnvelopes消息队列中逐个比较触发时间,按时间排序。将消息插入到对应的位置
size_t messageCount = mMessageEnvelopes.size();
while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
i += 1;
}
MessageEnvelope messageEnvelope(uptime, handler, message);
mMessageEnvelopes.insertAt(messageEnvelope, i, 1);
//如果当前正在发送消息,到此直接返回即可
if (mSendingMessage) {
return;
}
} // release lock
// 如果当前消息放在了队列第一个,也就是说当前消息比所有的消息都要更早触发,或者是队列中还没有其他消息,只有一个,此时就需要唤醒Epoll进行处理
if (i == 0) {
wake();
}
}
sendMessageAtTime主要任务就是根据发送消息的时间,将消息插入到消息队列中,消息队列是按照时间排序,越早触发的消息在队列中的位置越靠前。
如果排在了队列的第一个位置,有两种可能
- 当前消息需要最早触发
- 队列中仅此一个消息
这两种个情况下,都需要唤醒Epoll对消息进行处理。
第一种情况,如果队列中已经有其他消息在等待处理,此时Epoll设置的超时时间是,第二个消息的,触发时间比第一个刚插入的消息要晚,所以唤醒Epoll,让Epoll重新调试唤醒时间,如果不是延迟消息就需要立刻处理.
第二种情况,队列中仅此一个,此时Epoll设置的超时时间还是-1,会一直等待,也需要唤醒Epoll调整超时时间,非超时消息需要立刻处理.
接着看wake方法
void ReverseLooper::wake() {
uint64_t inc = 1;
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
ALOGW("Could not write wake signal, errno=%d", errno);
}
}
}
wake方法向我们监听的mWakeEventFd中写入数据,这样Epoll监听到有可读消息就会被唤醒.
消息处理
在消息初始化的时候,队列中没有消息,Epoll_wait会阻塞,现在我们接着阻塞点继续往下分析
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// No longer idling.
mPolling = false;
// Acquire lock.
mLock.lock();
//eventCount返回值表示唤醒epoll_wait消息的个数
// >0, 表示有n个消息需要处理,epoll唤醒是因为监听到消息
// =0, 表示没有唤醒消息,epoll唤醒是因为设置的超时时间到了
// <0, 表示epoll_wait出错了
// eventcount小于0,说明是wait出错了
if (eventCount < 0) {
if (errno == EINTR) {
goto Done;
}
ALOGW("Poll failed with an unexpected error, errno=%d", errno);
result = POLL_ERROR;
goto Done;
}
// 超时返回
if (eventCount == 0) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - timeout", this);
#endif
result = POLL_TIMEOUT;
goto Done;
}
// 有消息唤醒,需要处理
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeEventFd) {
if (epollEvents & EPOLLIN) {
//从mWakeEventFd中将消息清除掉.
awoken();
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
} else {
....
//文件描述符监听功能暂时不分析
}
}
Done: ;
// 处理消息回调
// 首先把下一个消息处理时间设置为LLONG_MAX
mNextMessageUptime = LLONG_MAX;
//遍历消息队列
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
//如果当前消息的时间 < 当前时间,说明该消息处理时间已经过了,需要立刻处理,此时将消息从消息
//队列移除,然后调用对应的MessageHandler的handleMessage方法
if (messageEnvelope.uptime <= now) {
// Remove the envelope from the list.
// We keep a strong reference to the handler until the call to handleMessage
// finishes. Then we drop it so that the handler can be deleted *before*
// we reacquire our lock.
{ // obtain handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
this, handler.get(), message.what);
#endif
handler->handleMessage(message);
} // release handler
mLock.lock();
mSendingMessage = false;
result = POLL_CALLBACK;
} else {
// 如果该消息还不到处理时间,则设置mNextMessageUptime为下一个消息处理时间
//退出遍历
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}
// Release lock.
mLock.unlock();
return result;
消息处理的逻辑:
epoll_wait会被唤醒有两种情况
1:新添加的消息需要重置Epoll的wait超时时间,此时没有消息需要立刻处理
2:有消息需要处理
这两种情况都会遍历消息队列,从消息队列头开始遍历,如果消息的处理时间已经到了,那么立刻回调消息MessageHandler的handleMessage方法
如果消息还没有到处理时间,设置mNextMessageUptime为下一个待处理消息的时间,然后退出遍历, 示例代码中threadloop会重新执行pollonce方法,然后重新进入epoll_wait等待消息超时.
总结
- Looper初始化创建一个mWakeEventFd,然后初始化了Epoll, 将mWakeEventFd添加到了Epoll进行监听,epoll进入wait方法,当有可读消息会进行唤醒.
- 发送消息会将消息按照时间排序插入到Looper的消息队列中,越早需要处理的在队列中越靠前,如果插入队列中的位置是第一个,此时需要唤醒epoll
- 可能是需要立刻执行的消息
- 可能需要让Epoll重新调整等待超时时间
向mWakeEventFd中写入内容,唤醒epoll
- Epoll唤醒后,遍历消息队列
比较消息的时间,将消息队列中到处理时间的消息移除掉,并回调消息的handleMessage方法
未到处理时间的消息,将下一个消息处理时间记录下来,然后重新进入wait等待,直到下一条消息需要处理.