MessageQueue类封装了与消息队列有关的操作,一个以消息驱动的系统中,最重要部分就是消息队列和消息处理循环。
MessageQueue的核心代码在native层,可以处理java和native的事件
1.1MessageQueue创建
构造方法,调用nativeInit
frameworks/base/core/java/android/os/MessageQueue.java
MessageQueue(boolean quitAllowed) {mQuitAllowed = quitAllowed;mPtr = nativeInit();
}
nativeInit是一个native方法,方法中调用构造初始化了native层的MessageQueue
frameworks/base/core/jni/android_os_MessageQueue.cpp
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();if (!nativeMessageQueue) {jniThrowRuntimeException(env, "Unable to allocate native queue");return 0;}
frameworks/base/core/jni/android_os_MessageQueue.cpp
NativeMessageQueue::NativeMessageQueue() :mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {mLooper = Looper::getForThread();if (mLooper == NULL) {mLooper = new Looper(false);Looper::setForThread(mLooper);}
}
如果mLooper为空,创建mLooper(是一种以线程为单位的单例模式)。通过setForThread与线程关联
一个线程会有一个Looper来循环处理消息队列的消息,下列一行代码是获取线程的本地存储空间中的
mLooper = Looper::getForThread();
1.2 消息提取
next()方法用于从消息队列中获取下一条消息,并返回该消息对象
frameworks/base/core/java/android/os/MessageQueue.java
@UnsupportedAppUsageMessage next() {// Return here if the message loop has already quit and been disposed.// This can happen if the application tries to restart a looper after quit// which is not supported.final long ptr = mPtr;if (ptr == 0) {return null;}int pendingIdleHandlerCount = -1; // -1 only during first iterationint nextPollTimeoutMillis = 0;for (;;) {if (nextPollTimeoutMillis != 0) {Binder.flushPendingCommands();}nativePollOnce(ptr, nextPollTimeoutMillis);synchronized (this) {// Try to retrieve the next message. Return if found.final long now = SystemClock.uptimeMillis();Message prevMsg = null;Message msg = mMessages;if (msg != null && msg.target == null) {// Stalled by a barrier. Find the next asynchronous message in the queue.do {prevMsg = msg;msg = msg.next;} while (msg != null && !msg.isAsynchronous());}if (msg != null) {if (now < msg.when) {// Next message is not ready. Set a timeout to wake up when it is ready.nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);} else {// Got a message.mBlocked = false;if (prevMsg != null) {prevMsg.next = msg.next;} else {mMessages = msg.next;}msg.next = null;if (DEBUG) Log.v(TAG, "Returning message: " + msg);msg.markInUse();return msg;}} else {// No more messages.nextPollTimeoutMillis = -1;}// Process the quit message now that all pending messages have been handled.if (mQuitting) {dispose();return null;}// If first time idle, then get the number of idlers to run.// Idle handles only run if the queue is empty or if the first message// in the queue (possibly a barrier) is due to be handled in the future.if (pendingIdleHandlerCount < 0&& (mMessages == null || now < mMessages.when)) {pendingIdleHandlerCount = mIdleHandlers.size();}if (pendingIdleHandlerCount <= 0) {// No idle handlers to run. Loop and wait some more.mBlocked = true;continue;}if (mPendingIdleHandlers == null) {mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];}mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);}// Run the idle handlers.// We only ever reach this code block during the first iteration.for (int i = 0; i < pendingIdleHandlerCount; i++) {final IdleHandler idler = mPendingIdleHandlers[i];mPendingIdleHandlers[i] = null; // release the reference to the handlerboolean keep = false;try {keep = idler.queueIdle();} catch (Throwable t) {Log.wtf(TAG, "IdleHandler threw exception", t);}if (!keep) {synchronized (this) {mIdleHandlers.remove(idler);}}}// Reset the idle handler count to 0 so we do not run them again.pendingIdleHandlerCount = 0;// While calling an idle handler, a new message could have been delivered// so go back and look again for a pending message without waiting.nextPollTimeoutMillis = 0;}}
首先,方法会检查消息循环是否已经退出并被销毁,如果是,则直接返回null。这种情况可能发生在应用程序在退出后尝试重新启动消息循环,但这是不被支持的。
接下来,方法会初始化两个变量pendingIdleHandlerCount和nextPollTimeoutMillis。pendingIdleHandlerCount用于记录待运行的空闲处理器(IdleHandler)的数量,初始值为-1,表示在第一次迭代时。nextPollTimeoutMillis用于设置下一次轮询的超时时间,初始值为0。
然后,方法进入一个无限循环,不断尝试从消息队列中获取下一条消息。
在每次循环中,首先检查nextPollTimeoutMillis的值,如果不为0,则调用Binder.flushPendingCommands()方法来刷新待处理的Binder命令。
接下来,调用nativePollOnce()方法来从底层获取下一条消息。该方法会阻塞当前线程,直到有消息到达或超时。
然后,使用synchronized关键字对this进行同步,以确保多线程环境下的安全访问。
在同步块中,首先尝试从消息队列中获取下一条消息。如果找到了消息,则判断该消息是否已经准备好处理。如果还未准备好,则计算出下一次轮询的超时时间,并更新nextPollTimeoutMillis的值。如果消息已经准备好处理,则将mBlocked标志设置为false,将该消息从消息队列中移除,并返回该消息。
如果没有找到消息,则将nextPollTimeoutMillis的值设置为-1,表示没有更多的消息。
接下来,检查mQuitting标志,如果为true,表示消息循环已经退出,需要进行清理操作,并返回null。
然后,检查是否是第一次空闲处理。如果是第一次空闲处理,并且消息队列为空或者下一条消息的时间还未到达,则获取待运行的空闲处理器的数量。
如果待运行的空闲处理器数量小于等于0,则将mBlocked标志设置为true,继续下一次循环。
如果待运行的空闲处理器数量大于0,则将空闲处理器列表转换为数组,并存储在mPendingIdleHandlers中。
接下来,运行空闲处理器。在第一次迭代中,会遍历所有待运行的空闲处理器,并调用它们的queueIdle()方法。如果queueIdle()方法返回false,则表示该空闲处理器不再需要运行,需要将其从空闲处理器列表中移除。
最后,将pendingIdleHandlerCount重置为0,以确保下一次循环不再运行空闲处理器。然后,将nextPollTimeoutMillis的值设置为0,以便立即进行下一次轮询。
总结起来,这段代码的作用是从消息队列中获取下一条消息,并返回该消息对象。它会根据消息的准备状态和时间戳来确定是否需要等待,同时还会处理空闲处理器的运行。这样,消息循环可以不断地处理消息,并在空闲时运行空闲处理器。
Java层投递Message
enqueueMessage()方法用于将消息添加到消息队列中。
frameworks/base/core/java/android/os/MessageQueue.java
boolean enqueueMessage(Message msg, long when) {if (msg.target == null) {throw new IllegalArgumentException("Message must have a target.");}synchronized (this) {if (msg.isInUse()) {throw new IllegalStateException(msg + " This message is already in use.");}if (mQuitting) {IllegalStateException e = new IllegalStateException(msg.target + " sending message to a Handler on a dead thread");Log.w(TAG, e.getMessage(), e);msg.recycle();return false;}msg.markInUse();msg.when = when;Message p = mMessages;boolean needWake;if (p == null || when == 0 || when < p.when) {// New head, wake up the event queue if blocked.msg.next = p;mMessages = msg;needWake = mBlocked;} else {// Inserted within the middle of the queue. Usually we don't have to wake// up the event queue unless there is a barrier at the head of the queue// and the message is the earliest asynchronous message in the queue.needWake = mBlocked && p.target == null && msg.isAsynchronous();Message prev;for (;;) {prev = p;p = p.next;if (p == null || when < p.when) {break;}if (needWake && p.isAsynchronous()) {needWake = false;}}msg.next = p; // invariant: p == prev.nextprev.next = msg;}// We can assume mPtr != 0 because mQuitting is false.if (needWake) {nativeWake(mPtr);}}return true;}
首先,方法会检查消息的target是否为null,如果是,则抛出IllegalArgumentException异常,表示消息必须具有目标。
接下来,使用synchronized关键字对this进行同步,以确保多线程环境下的安全访问。
在同步块中,首先检查消息是否已经在使用中,如果是,则抛出IllegalStateException异常,表示该消息已经在使用中。
然后,检查mQuitting标志,如果为true,表示消息循环已经退出,需要进行清理操作。在这种情况下,会抛出IllegalStateException异常,并将消息回收后返回false。
接下来,将消息标记为正在使用,并设置消息的时间戳为when。
然后,获取消息队列中的第一条消息p。
接下来,根据消息的时间戳when和当前消息队列中的消息的时间戳进行比较,确定消息的插入位置。
如果消息队列为空,或者when为0,或者when小于第一条消息的时间戳,则将消息作为新的头部插入到消息队列中,并将needWake标志设置为mBlocked的值。
如果消息需要插入到队列的中间位置,通常情况下不需要唤醒事件队列,除非队列头部有一个障碍(barrier)并且消息是队列中最早的异步消息。在这种情况下,将needWake标志设置为mBlocked的值,并遍历消息队列,找到合适的插入位置。
最后,将消息插入到队列中,并更新前后消息的关联关系。
最后,如果需要唤醒事件队列,则调用nativeWake()方法唤醒事件队列。
总结起来,这段代码的作用是将消息添加到消息队列中。它会根据消息的时间戳和队列中已有消息的时间戳来确定插入位置,并根据需要唤醒事件队列。这样,消息循环可以按照一定的顺序处理消息,并在需要时唤醒事件队列。
nativeWake
nativeWake是一个native函数,用于唤醒队列
frameworks/base/core/jni/android_os_MessageQueue.cpp
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);nativeMessageQueue->wake();
}
通过reinterpret_cast获取NativeMessageQueue对象,然后调用wake函数,wake函数调用Looper的wake
void NativeMessageQueue::wake() {mLooper->wake();
}
wake
system/core/libutils/Looper.cpp
void Looper::wake() {
#if DEBUG_POLL_AND_WAKEALOGD("%p ~ wake", this);
#endifuint64_t inc = 1;ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));if (nWrite != sizeof(uint64_t)) {if (errno != EAGAIN) {LOG_ALWAYS_FATAL("Could not write wake signal to fd %d (returned %zd): %s",mWakeEventFd.get(), nWrite, strerror(errno));}}
}
向文件描述符写入一个inc的值来实现唤醒操作
1.3 nativePollOnce分析
nativePollOnce()方法用来从底层获取下一条消息,该方法会阻塞当前线程,直到有消息到达或超时
frameworks/base/core/jni/android_os_MessageQueue.cpp
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,jlong ptr, jint timeoutMillis) {NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
调用了pollOnce方法
frameworks/base/core/jni/android_os_MessageQueue.cpp
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {mPollEnv = env;mPollObj = pollObj;mLooper->pollOnce(timeoutMillis);mPollObj = NULL;mPollEnv = NULL;if (mExceptionObj) {env->Throw(mExceptionObj);env->DeleteLocalRef(mExceptionObj);mExceptionObj = NULL;}
}
调用了Looper中的pollOnce方法
system/core/libutils/Looper.cpp
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {int result = 0;for (;;) {while (mResponseIndex < mResponses.size()) {const Response& response = mResponses.itemAt(mResponseIndex++);int ident = response.request.ident;if (ident >= 0) {int fd = response.request.fd;int events = response.events;void* data = response.request.data;
#if DEBUG_POLL_AND_WAKEALOGD("%p ~ pollOnce - returning signalled identifier %d: ""fd=%d, events=0x%x, data=%p",this, ident, fd, events, data);
#endifif (outFd != nullptr) *outFd = fd;if (outEvents != nullptr) *outEvents = events;if (outData != nullptr) *outData = data;return ident;}}if (result != 0) {
#if DEBUG_POLL_AND_WAKEALOGD("%p ~ pollOnce - returning result %d", this, result);
#endifif (outFd != nullptr) *outFd = 0;if (outEvents != nullptr) *outEvents = 0;if (outData != nullptr) *outData = nullptr;return result;}result = pollInner(timeoutMillis);}
}
首先,定义了一个int类型的变量result,并将其初始化为0。
然后,使用一个无限循环来进行轮询操作。在每次循环中,首先通过while循环遍历mResponses中的响应对象。
在遍历过程中,获取当前响应对象response,并从中提取出标识符ident、文件描述符fd、事件events和数据data。
如果标识符ident大于等于0,则表示该响应对象是一个有效的事件。在这种情况下,将文件描述符、事件和数据分别赋值给outFd、outEvents和outData指向的变量,并返回标识符ident。
如果遍历完所有响应对象后仍未找到有效的事件,则检查result的值。如果result不等于0,则表示在之前的轮询操作中发生了错误,直接返回result。
如果以上两种情况都不满足,则调用pollInner()方法进行实际的轮询操作,并将返回的结果赋值给result。
总结起来,这段代码的作用是在事件循环中进行一次轮询操作,等待并处理事件。它通过遍历响应对象来查找有效的事件,并将相关信息返回。如果没有找到有效的事件,则返回之前的轮询结果。
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData)
该方法接受四个参数:
timeoutMillis:超时等待事件,如果为-1,表示无线等到,直到有事件发生。为0表示无需等待立即返回
outFd:发生事件的那个文件描述符
outEvents:在该文件描述符上发生了哪些事件,包括可读、可写、错误和终端四个事件。四个事件都是从epoll转化而来
outData:存储上下文数据,由用户在添加监听句柄时传递,用于传递用户自定义的数据
该方法的返回值也有特殊意义:
返回值为ALOOPER_POLL_WAKE:这次返回由wake函数触发,也就是管道写端的那次写事件触发
ALOOPER_POLL_TIMEOUT:等待超时
ALOOPER_POLL_ERROR:等待过程中发生错误
ALOOPER_POLL_CALLBACK:表示某个被监听的句柄因某种原因被触发
pollInner
pollinner方法很长,截取关键部分
int Looper::pollInner(int timeoutMillis) {
...// Poll.int result = POLL_WAKE;mResponses.clear();mResponseIndex = 0;// We are about to idle.mPolling = true;struct epoll_event eventItems[EPOLL_MAX_EVENTS];int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);// No longer idling.mPolling = false;// Acquire lock.mLock.lock();// Rebuild epoll set if needed.if (mEpollRebuildRequired) {mEpollRebuildRequired = false;rebuildEpollLocked();goto Done;}// Check for poll error.if (eventCount < 0) {if (errno == EINTR) {goto Done;}ALOGW("Poll failed with an unexpected error: %s", strerror(errno));result = POLL_ERROR;goto Done;}// Check for poll timeout.if (eventCount == 0) {
...result = POLL_TIMEOUT;goto Done;}// Handle all events.
#if DEBUG_POLL_AND_WAKEALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endiffor (int i = 0; i < eventCount; i++) {int fd = eventItems[i].data.fd;uint32_t epollEvents = eventItems[i].events;if (fd == mWakeEventFd.get()) {if (epollEvents & EPOLLIN) {awoken();} else {ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);}} else {ssize_t requestIndex = mRequests.indexOfKey(fd);if (requestIndex >= 0) {int events = 0;if (epollEvents & EPOLLIN) events |= EVENT_INPUT;if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;if (epollEvents & EPOLLERR) events |= EVENT_ERROR;if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;pushResponse(events, mRequests.valueAt(requestIndex));} else {ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is ""no longer registered.", epollEvents, fd);}}}
Done: ;
...
}
在这段代码中,首先调用epoll_wait函数等待事件的发生,然后根据事件的类型进行相应的处理。
在事件处理过程中,代码会检查是否需要重新构建epoll集合。如果需要重新构建,会调用rebuildEpollLocked方法,并跳转到Done标签所在的位置。
接下来,代码会检查事件的数量。如果事件数量小于0,表示发生了错误,会输出相应的错误日志,并将结果设置为POLL_ERROR,然后跳转到Done标签。
如果事件数量为0,表示发生了超时,会将结果设置为POLL_TIMEOUT,然后跳转到Done标签。
如果事件数量大于0,表示有事件发生,代码会遍历所有的事件,并根据事件的类型进行相应的处理。如果事件是唤醒事件,会调用awoken方法进行处理。如果事件是注册的文件描述符上的事件,会将事件类型和相关的请求信息添加到响应队列中。
最后,代码会跳转到Done标签所在的位置,继续执行标签后面的代码。
根据事件类型进行相应的处理后,就该处理事件了,下面是Done之后的代码
int Looper::pollInner(int timeoutMillis) {
...// Invoke pending message callbacks.mNextMessageUptime = LLONG_MAX;while (mMessageEnvelopes.size() != 0) {nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);if (messageEnvelope.uptime <= now) {// Remove the envelope from the list.// We keep a strong reference to the handler until the call to handleMessage// finishes. Then we drop it so that the handler can be deleted *before*// we reacquire our lock.{ // obtain handlersp<MessageHandler> handler = messageEnvelope.handler;Message message = messageEnvelope.message;mMessageEnvelopes.removeAt(0);mSendingMessage = true;mLock.unlock();#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKSALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",this, handler.get(), message.what);
#endifhandler->handleMessage(message);} // release handlermLock.lock();mSendingMessage = false;result = POLL_CALLBACK;} else {// The last message left at the head of the queue determines the next wakeup time.mNextMessageUptime = messageEnvelope.uptime;break;}}// Release lock.mLock.unlock();// Invoke all response callbacks.for (size_t i = 0; i < mResponses.size(); i++) {Response& response = mResponses.editItemAt(i);if (response.request.ident == POLL_CALLBACK) {int fd = response.request.fd;int events = response.events;void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKSALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",this, response.request.callback.get(), fd, events, data);
#endif// Invoke the callback. Note that the file descriptor may be closed by// the callback (and potentially even reused) before the function returns so// we need to be a little careful when removing the file descriptor afterwards.int callbackResult = response.request.callback->handleEvent(fd, events, data);if (callbackResult == 0) {removeFd(fd, response.request.seq);}// Clear the callback reference in the response structure promptly because we// will not clear the response vector itself until the next poll.response.request.callback.clear();result = POLL_CALLBACK;}}return result;
}
在这段代码中,首先会设置mNextMessageUptime为一个很大的值,以确保在没有新消息到达时不会触发下一次唤醒。
然后,代码进入一个循环,遍历消息队列中的消息。对于每个消息,代码会检查消息的到达时间是否小于等于当前时间。如果是,表示该消息已经到达,需要处理该消息。
在处理消息之前,代码会获取消息的处理器(handler)和消息内容,并从消息队列中移除该消息。然后,代码会释放锁,并调用处理器的handleMessage方法来处理该消息。(处理Native的Message,调用Native Handler的handleMessage处理该Message)
处理完消息后,代码会重新获取锁,并将mSendingMessage设置为false,表示消息处理完成。然后,将结果设置为POLL_CALLBACK,表示有回调发生。
如果消息的到达时间大于当前时间,表示还有未到达的消息,代码会将下一次唤醒时间设置为该消息的到达时间,并跳出循环。
接下来,代码会释放锁,并遍历所有的响应(response)。对于每个响应,代码会检查响应的标识是否为POLL_CALLBACK,如果是,表示需要调用回调函数。
代码会获取响应中的文件描述符(fd)、事件(events)和数据(data),然后调用回调函数的handleEvent方法进行处理。如果回调函数返回值为0,表示需要移除该文件描述符。
最后,代码会清除响应结构中的回调引用,并将结果设置为POLL_CALLBACK。然后,返回结果。
总的来说,这段代码实现了处理消息和响应的逻辑。它会遍历消息队列中的消息,处理已到达的消息,并根据响应的标识调用相应的回调函数。处理完消息和响应后,返回相应的结果。
添加监控请求
添加监控请求就是调用epoll_ctl添加文件句柄,以Nativce的Activity的代码来分析mRequests
frameworks/base/core/jni/android_app_NativeActivity.cpp
loadNativeCode_native(JNIEnv* env, jobject clazz, jstring path, jstring funcName,jobject messageQueue, jstring internalDataDir, jstring obbDir,jstring externalDataDir, jint sdkVersion, jobject jAssetMgr,jbyteArray savedState, jobject classLoader, jstring libraryPath){
... code->messageQueue->getLooper()->addFd(code->mainWorkRead, 0, ALOOPER_EVENT_INPUT, mainWorkCallback, code.get());
... }
调用Looper的addFd函数,第一个参数表示监听的fd,第二个参数0表示ident,第三个参数表示需要监听的事件,这里只监听可读事件,第四个参数为回调函数,当该fd发生指定事件,looper会回调该函数,第五个参数为回调函数的参数
system/core/libutils/Looper.cpp
int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
#if DEBUG_CALLBACKSALOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident,events, callback.get(), data);
#endifif (!callback.get()) {if (! mAllowNonCallbacks) {ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");return -1;}if (ident < 0) {ALOGE("Invalid attempt to set NULL callback with ident < 0.");return -1;}} else {ident = POLL_CALLBACK;}{ // acquire lockAutoMutex _l(mLock);Request request;request.fd = fd;request.ident = ident;request.events = events;request.seq = mNextRequestSeq++;request.callback = callback;request.data = data;if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number -1struct epoll_event eventItem;request.initEventItem(&eventItem);ssize_t requestIndex = mRequests.indexOfKey(fd);if (requestIndex < 0) {int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, fd, &eventItem);if (epollResult < 0) {ALOGE("Error adding epoll events for fd %d: %s", fd, strerror(errno));return -1;}mRequests.add(fd, request);} else {int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_MOD, fd, &eventItem);if (epollResult < 0) {if (errno == ENOENT) {// Tolerate ENOENT because it means that an older file descriptor was// closed before its callback was unregistered and meanwhile a new// file descriptor with the same number has been created and is now// being registered for the first time. This error may occur naturally// when a callback has the side-effect of closing the file descriptor// before returning and unregistering itself. Callback sequence number// checks further ensure that the race is benign.//// Unfortunately due to kernel limitations we need to rebuild the epoll// set from scratch because it may contain an old file handle that we are// now unable to remove since its file descriptor is no longer valid.// No such problem would have occurred if we were using the poll system// call instead, but that approach carries others disadvantages.
#if DEBUG_CALLBACKSALOGD("%p ~ addFd - EPOLL_CTL_MOD failed due to file descriptor ""being recycled, falling back on EPOLL_CTL_ADD: %s",this, strerror(errno));
#endifepollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, fd, &eventItem);if (epollResult < 0) {ALOGE("Error modifying or adding epoll events for fd %d: %s",fd, strerror(errno));return -1;}scheduleEpollRebuildLocked();} else {ALOGE("Error modifying epoll events for fd %d: %s", fd, strerror(errno));return -1;}}mRequests.replaceValueAt(requestIndex, request);}} // release lockreturn 1;
}
addFd方法接受一个sp对象作为回调函数。在这段代码中,首先会检查回调函数是否为空。如果为空,会根据mAllowNonCallbacks的值来判断是否允许设置空回调函数。如果不允许,则会返回错误。如果允许,并且ident小于0,则会返回错误。
如果回调函数不为空,则会将ident设置为POLL_CALLBACK,表示该文件描述符是一个回调文件描述符。
然后,代码会获取锁,并创建一个Request对象,设置相应的属性。接着,代码会初始化一个epoll_event结构体,并调用epoll_ctl函数将文件描述符添加到epoll事件集合中。
如果添加成功,则会将Request对象添加到mRequests集合中。如果添加失败,则会根据错误类型进行处理。如果错误类型是ENOENT,表示文件描述符在移除之前已经关闭,并且一个新的文件描述符与相同的编号已经创建并且正在首次注册。在这种情况下,代码会重新将文件描述符添加到epoll事件集合中,并调用scheduleEpollRebuildLocked方法重新构建epoll事件集合。如果是其他错误类型,则表示添加失败,会返回错误。
最后,代码会释放锁,并返回成功。
总的来说,这段代码实现了向Looper对象添加文件描述符和回调函数的逻辑。它会检查回调函数是否为空,并根据情况进行处理。在添加文件描述符时,会将文件描述符添加到epoll事件集合中,并将相应的信息保存到mRequests集合中。如果添加失败,则会根据错误类型进行处理。
处理监控请求
在pollInner中,当某个监控fd发生事件后,会调用对应的Request,也就是pollInner调用的pushResponse方法
system/core/libutils/Looper.cpp
void Looper::pushResponse(int events, const Request& request) {Response response;response.events = events;response.request = request;mResponses.push(response);
}
该方法将一个Response添加到响应队列中,用于后续处理
poolInner不是单独处理request,而是先收集request,等到NativeMessage消息处理完后再处理。这表明在处理逻辑上,NativeMessage的优先级高于监控fd的优先级
添加Native的Message
system/core/libutils/Looper.cpp
void Looper::sendMessage(const sp<MessageHandler>& handler, const Message& message) {nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);sendMessageAtTime(now, handler, message);
}
void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,const Message& message) {
#if DEBUG_CALLBACKSALOGD("%p ~ sendMessageAtTime - uptime=%" PRId64 ", handler=%p, what=%d",this, uptime, handler.get(), message.what);
#endifsize_t i = 0;{ // acquire lockAutoMutex _l(mLock);size_t messageCount = mMessageEnvelopes.size();while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {i += 1;}MessageEnvelope messageEnvelope(uptime, handler, message);mMessageEnvelopes.insertAt(messageEnvelope, i, 1);// Optimization: If the Looper is currently sending a message, then we can skip// the call to wake() because the next thing the Looper will do after processing// messages is to decide when the next wakeup time should be. In fact, it does// not even matter whether this code is running on the Looper thread.if (mSendingMessage) {return;}} // release lock// Wake the poll loop only when we enqueue a new message at the head.if (i == 0) {wake();}
}
sendMessageAtTime方法用于向消息队列中添加一个延迟发送的消息。在这段代码中,首先会打印调试信息(如果定义了DEBUG_CALLBACKS宏),包括uptime、handler和message.what的值。
然后,代码会获取锁,并根据uptime的值找到消息队列中合适的位置插入新的消息。具体地,代码会遍历消息队列,找到第一个uptime大于等于给定uptime的位置,并将新的消息插入到该位置。
在插入消息后,代码会进行一个优化判断。如果当前Looper正在发送消息(mSendingMessage为true),则可以跳过调用wake()方法唤醒轮询循环,因为在处理完消息后,Looper会决定下一次唤醒的时间。实际上,这段代码是否在Looper线程上运行并不重要。
最后,如果新插入的消息是队列中的第一个消息(即i == 0),则调用wake()方法唤醒轮询循环。
总的来说,这段代码实现了向消息队列中添加延迟发送的消息的逻辑。它会根据给定的uptime值找到合适的位置插入消息,并在必要时唤醒轮询循环。
1.4 MessageQueue总结
· Java层提供了Looper类和MessageQueue类,其中Looper类提供循环处理消息的机制,MessageQueue类提供一个消息队列,以及插入、删除和提取消息的函数接口。另外,Handler也是在Java层常用的与消息处理相关的类。
· MessageQueue内部通过mPtr变量保存一个Native层的NativeMessageQueue对象,mMessages保存来自Java层的Message消息。
· NativeMessageQueue保存一个native的Looper对象,该Looper从ALooper派生,提供pollOnce和addFd等函数。
· Java层有Message类和Handler类,而Native层对应也有Message类和MessageHandler抽象类。在编码时,一般使用的是MessageHandler的派生类WeakMessageHandler类。
MessageQueue处理流程总结
MessageQueue核心逻辑下移到Native层后,极大地拓展了消息处理的范围,总结一下有以下几点:
1.MessageQueue继续支持来自Java层的Message消息,也就是早期的Message加Handler的处理方式。
2.MessageQueue在Native层的代表NativeMessageQueue支持来自Native层的Message,是通过Native的Message和MessageHandler来处理的。
3.NativeMessageQueue还处理通过addFd添加的Request。在后面分析输入系统时,还会大量碰到这种方式。
4.从处理逻辑上看,先是Native的Message,然后是Native的Request,最后才是Java的Message。