MessageQueue 深入理解Android卷2 学习笔记

MessageQueue类封装了与消息队列有关的操作,一个以消息驱动的系统中,最重要部分就是消息队列和消息处理循环。
MessageQueue的核心代码在native层,可以处理java和native的事件

1.1MessageQueue创建

构造方法,调用nativeInit
frameworks/base/core/java/android/os/MessageQueue.java

MessageQueue(boolean quitAllowed) {mQuitAllowed = quitAllowed;mPtr = nativeInit();
}

nativeInit是一个native方法,方法中调用构造初始化了native层的MessageQueue
frameworks/base/core/jni/android_os_MessageQueue.cpp

static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();if (!nativeMessageQueue) {jniThrowRuntimeException(env, "Unable to allocate native queue");return 0;}

frameworks/base/core/jni/android_os_MessageQueue.cpp

NativeMessageQueue::NativeMessageQueue() :mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {mLooper = Looper::getForThread();if (mLooper == NULL) {mLooper = new Looper(false);Looper::setForThread(mLooper);}
}

如果mLooper为空,创建mLooper(是一种以线程为单位的单例模式)。通过setForThread与线程关联
一个线程会有一个Looper来循环处理消息队列的消息,下列一行代码是获取线程的本地存储空间中的
mLooper = Looper::getForThread();

1.2 消息提取

next()方法用于从消息队列中获取下一条消息,并返回该消息对象
frameworks/base/core/java/android/os/MessageQueue.java

    @UnsupportedAppUsageMessage next() {// Return here if the message loop has already quit and been disposed.// This can happen if the application tries to restart a looper after quit// which is not supported.final long ptr = mPtr;if (ptr == 0) {return null;}int pendingIdleHandlerCount = -1; // -1 only during first iterationint nextPollTimeoutMillis = 0;for (;;) {if (nextPollTimeoutMillis != 0) {Binder.flushPendingCommands();}nativePollOnce(ptr, nextPollTimeoutMillis);synchronized (this) {// Try to retrieve the next message.  Return if found.final long now = SystemClock.uptimeMillis();Message prevMsg = null;Message msg = mMessages;if (msg != null && msg.target == null) {// Stalled by a barrier.  Find the next asynchronous message in the queue.do {prevMsg = msg;msg = msg.next;} while (msg != null && !msg.isAsynchronous());}if (msg != null) {if (now < msg.when) {// Next message is not ready.  Set a timeout to wake up when it is ready.nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);} else {// Got a message.mBlocked = false;if (prevMsg != null) {prevMsg.next = msg.next;} else {mMessages = msg.next;}msg.next = null;if (DEBUG) Log.v(TAG, "Returning message: " + msg);msg.markInUse();return msg;}} else {// No more messages.nextPollTimeoutMillis = -1;}// Process the quit message now that all pending messages have been handled.if (mQuitting) {dispose();return null;}// If first time idle, then get the number of idlers to run.// Idle handles only run if the queue is empty or if the first message// in the queue (possibly a barrier) is due to be handled in the future.if (pendingIdleHandlerCount < 0&& (mMessages == null || now < mMessages.when)) {pendingIdleHandlerCount = mIdleHandlers.size();}if (pendingIdleHandlerCount <= 0) {// No idle handlers to run.  Loop and wait some more.mBlocked = true;continue;}if (mPendingIdleHandlers == null) {mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];}mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);}// Run the idle handlers.// We only ever reach this code block during the first iteration.for (int i = 0; i < pendingIdleHandlerCount; i++) {final IdleHandler idler = mPendingIdleHandlers[i];mPendingIdleHandlers[i] = null; // release the reference to the handlerboolean keep = false;try {keep = idler.queueIdle();} catch (Throwable t) {Log.wtf(TAG, "IdleHandler threw exception", t);}if (!keep) {synchronized (this) {mIdleHandlers.remove(idler);}}}// Reset the idle handler count to 0 so we do not run them again.pendingIdleHandlerCount = 0;// While calling an idle handler, a new message could have been delivered// so go back and look again for a pending message without waiting.nextPollTimeoutMillis = 0;}}

首先,方法会检查消息循环是否已经退出并被销毁,如果是,则直接返回null。这种情况可能发生在应用程序在退出后尝试重新启动消息循环,但这是不被支持的。

接下来,方法会初始化两个变量pendingIdleHandlerCount和nextPollTimeoutMillis。pendingIdleHandlerCount用于记录待运行的空闲处理器(IdleHandler)的数量,初始值为-1,表示在第一次迭代时。nextPollTimeoutMillis用于设置下一次轮询的超时时间,初始值为0。

然后,方法进入一个无限循环,不断尝试从消息队列中获取下一条消息。

在每次循环中,首先检查nextPollTimeoutMillis的值,如果不为0,则调用Binder.flushPendingCommands()方法来刷新待处理的Binder命令。

接下来,调用nativePollOnce()方法来从底层获取下一条消息。该方法会阻塞当前线程,直到有消息到达或超时。

然后,使用synchronized关键字对this进行同步,以确保多线程环境下的安全访问。

在同步块中,首先尝试从消息队列中获取下一条消息。如果找到了消息,则判断该消息是否已经准备好处理。如果还未准备好,则计算出下一次轮询的超时时间,并更新nextPollTimeoutMillis的值。如果消息已经准备好处理,则将mBlocked标志设置为false,将该消息从消息队列中移除,并返回该消息。

如果没有找到消息,则将nextPollTimeoutMillis的值设置为-1,表示没有更多的消息。

接下来,检查mQuitting标志,如果为true,表示消息循环已经退出,需要进行清理操作,并返回null。

然后,检查是否是第一次空闲处理。如果是第一次空闲处理,并且消息队列为空或者下一条消息的时间还未到达,则获取待运行的空闲处理器的数量。

如果待运行的空闲处理器数量小于等于0,则将mBlocked标志设置为true,继续下一次循环。

如果待运行的空闲处理器数量大于0,则将空闲处理器列表转换为数组,并存储在mPendingIdleHandlers中。

接下来,运行空闲处理器。在第一次迭代中,会遍历所有待运行的空闲处理器,并调用它们的queueIdle()方法。如果queueIdle()方法返回false,则表示该空闲处理器不再需要运行,需要将其从空闲处理器列表中移除。

最后,将pendingIdleHandlerCount重置为0,以确保下一次循环不再运行空闲处理器。然后,将nextPollTimeoutMillis的值设置为0,以便立即进行下一次轮询。

总结起来,这段代码的作用是从消息队列中获取下一条消息,并返回该消息对象。它会根据消息的准备状态和时间戳来确定是否需要等待,同时还会处理空闲处理器的运行。这样,消息循环可以不断地处理消息,并在空闲时运行空闲处理器。

Java层投递Message
enqueueMessage()方法用于将消息添加到消息队列中。
frameworks/base/core/java/android/os/MessageQueue.java

    boolean enqueueMessage(Message msg, long when) {if (msg.target == null) {throw new IllegalArgumentException("Message must have a target.");}synchronized (this) {if (msg.isInUse()) {throw new IllegalStateException(msg + " This message is already in use.");}if (mQuitting) {IllegalStateException e = new IllegalStateException(msg.target + " sending message to a Handler on a dead thread");Log.w(TAG, e.getMessage(), e);msg.recycle();return false;}msg.markInUse();msg.when = when;Message p = mMessages;boolean needWake;if (p == null || when == 0 || when < p.when) {// New head, wake up the event queue if blocked.msg.next = p;mMessages = msg;needWake = mBlocked;} else {// Inserted within the middle of the queue.  Usually we don't have to wake// up the event queue unless there is a barrier at the head of the queue// and the message is the earliest asynchronous message in the queue.needWake = mBlocked && p.target == null && msg.isAsynchronous();Message prev;for (;;) {prev = p;p = p.next;if (p == null || when < p.when) {break;}if (needWake && p.isAsynchronous()) {needWake = false;}}msg.next = p; // invariant: p == prev.nextprev.next = msg;}// We can assume mPtr != 0 because mQuitting is false.if (needWake) {nativeWake(mPtr);}}return true;}

首先,方法会检查消息的target是否为null,如果是,则抛出IllegalArgumentException异常,表示消息必须具有目标。

接下来,使用synchronized关键字对this进行同步,以确保多线程环境下的安全访问。

在同步块中,首先检查消息是否已经在使用中,如果是,则抛出IllegalStateException异常,表示该消息已经在使用中。

然后,检查mQuitting标志,如果为true,表示消息循环已经退出,需要进行清理操作。在这种情况下,会抛出IllegalStateException异常,并将消息回收后返回false。

接下来,将消息标记为正在使用,并设置消息的时间戳为when。

然后,获取消息队列中的第一条消息p。

接下来,根据消息的时间戳when和当前消息队列中的消息的时间戳进行比较,确定消息的插入位置。

如果消息队列为空,或者when为0,或者when小于第一条消息的时间戳,则将消息作为新的头部插入到消息队列中,并将needWake标志设置为mBlocked的值。

如果消息需要插入到队列的中间位置,通常情况下不需要唤醒事件队列,除非队列头部有一个障碍(barrier)并且消息是队列中最早的异步消息。在这种情况下,将needWake标志设置为mBlocked的值,并遍历消息队列,找到合适的插入位置。

最后,将消息插入到队列中,并更新前后消息的关联关系。

最后,如果需要唤醒事件队列,则调用nativeWake()方法唤醒事件队列。

总结起来,这段代码的作用是将消息添加到消息队列中。它会根据消息的时间戳和队列中已有消息的时间戳来确定插入位置,并根据需要唤醒事件队列。这样,消息循环可以按照一定的顺序处理消息,并在需要时唤醒事件队列。

nativeWake
nativeWake是一个native函数,用于唤醒队列
frameworks/base/core/jni/android_os_MessageQueue.cpp

static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);nativeMessageQueue->wake();
}

通过reinterpret_cast获取NativeMessageQueue对象,然后调用wake函数,wake函数调用Looper的wake

void NativeMessageQueue::wake() {mLooper->wake();
}

wake
system/core/libutils/Looper.cpp

void Looper::wake() {
#if DEBUG_POLL_AND_WAKEALOGD("%p ~ wake", this);
#endifuint64_t inc = 1;ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));if (nWrite != sizeof(uint64_t)) {if (errno != EAGAIN) {LOG_ALWAYS_FATAL("Could not write wake signal to fd %d (returned %zd): %s",mWakeEventFd.get(), nWrite, strerror(errno));}}
}

向文件描述符写入一个inc的值来实现唤醒操作

1.3 nativePollOnce分析

nativePollOnce()方法用来从底层获取下一条消息,该方法会阻塞当前线程,直到有消息到达或超时
frameworks/base/core/jni/android_os_MessageQueue.cpp

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,jlong ptr, jint timeoutMillis) {NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}

调用了pollOnce方法
frameworks/base/core/jni/android_os_MessageQueue.cpp

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {mPollEnv = env;mPollObj = pollObj;mLooper->pollOnce(timeoutMillis);mPollObj = NULL;mPollEnv = NULL;if (mExceptionObj) {env->Throw(mExceptionObj);env->DeleteLocalRef(mExceptionObj);mExceptionObj = NULL;}
}

调用了Looper中的pollOnce方法
system/core/libutils/Looper.cpp

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {int result = 0;for (;;) {while (mResponseIndex < mResponses.size()) {const Response& response = mResponses.itemAt(mResponseIndex++);int ident = response.request.ident;if (ident >= 0) {int fd = response.request.fd;int events = response.events;void* data = response.request.data;
#if DEBUG_POLL_AND_WAKEALOGD("%p ~ pollOnce - returning signalled identifier %d: ""fd=%d, events=0x%x, data=%p",this, ident, fd, events, data);
#endifif (outFd != nullptr) *outFd = fd;if (outEvents != nullptr) *outEvents = events;if (outData != nullptr) *outData = data;return ident;}}if (result != 0) {
#if DEBUG_POLL_AND_WAKEALOGD("%p ~ pollOnce - returning result %d", this, result);
#endifif (outFd != nullptr) *outFd = 0;if (outEvents != nullptr) *outEvents = 0;if (outData != nullptr) *outData = nullptr;return result;}result = pollInner(timeoutMillis);}
}

首先,定义了一个int类型的变量result,并将其初始化为0。

然后,使用一个无限循环来进行轮询操作。在每次循环中,首先通过while循环遍历mResponses中的响应对象。

在遍历过程中,获取当前响应对象response,并从中提取出标识符ident、文件描述符fd、事件events和数据data。

如果标识符ident大于等于0,则表示该响应对象是一个有效的事件。在这种情况下,将文件描述符、事件和数据分别赋值给outFd、outEvents和outData指向的变量,并返回标识符ident。

如果遍历完所有响应对象后仍未找到有效的事件,则检查result的值。如果result不等于0,则表示在之前的轮询操作中发生了错误,直接返回result。

如果以上两种情况都不满足,则调用pollInner()方法进行实际的轮询操作,并将返回的结果赋值给result。

总结起来,这段代码的作用是在事件循环中进行一次轮询操作,等待并处理事件。它通过遍历响应对象来查找有效的事件,并将相关信息返回。如果没有找到有效的事件,则返回之前的轮询结果。

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData)

该方法接受四个参数:
timeoutMillis:超时等待事件,如果为-1,表示无线等到,直到有事件发生。为0表示无需等待立即返回
outFd:发生事件的那个文件描述符
outEvents:在该文件描述符上发生了哪些事件,包括可读、可写、错误和终端四个事件。四个事件都是从epoll转化而来
outData:存储上下文数据,由用户在添加监听句柄时传递,用于传递用户自定义的数据

该方法的返回值也有特殊意义:
返回值为ALOOPER_POLL_WAKE:这次返回由wake函数触发,也就是管道写端的那次写事件触发
ALOOPER_POLL_TIMEOUT:等待超时
ALOOPER_POLL_ERROR:等待过程中发生错误
ALOOPER_POLL_CALLBACK:表示某个被监听的句柄因某种原因被触发
pollInner
pollinner方法很长,截取关键部分

int Looper::pollInner(int timeoutMillis) {
...// Poll.int result = POLL_WAKE;mResponses.clear();mResponseIndex = 0;// We are about to idle.mPolling = true;struct epoll_event eventItems[EPOLL_MAX_EVENTS];int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);// No longer idling.mPolling = false;// Acquire lock.mLock.lock();// Rebuild epoll set if needed.if (mEpollRebuildRequired) {mEpollRebuildRequired = false;rebuildEpollLocked();goto Done;}// Check for poll error.if (eventCount < 0) {if (errno == EINTR) {goto Done;}ALOGW("Poll failed with an unexpected error: %s", strerror(errno));result = POLL_ERROR;goto Done;}// Check for poll timeout.if (eventCount == 0) {
...result = POLL_TIMEOUT;goto Done;}// Handle all events.
#if DEBUG_POLL_AND_WAKEALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endiffor (int i = 0; i < eventCount; i++) {int fd = eventItems[i].data.fd;uint32_t epollEvents = eventItems[i].events;if (fd == mWakeEventFd.get()) {if (epollEvents & EPOLLIN) {awoken();} else {ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);}} else {ssize_t requestIndex = mRequests.indexOfKey(fd);if (requestIndex >= 0) {int events = 0;if (epollEvents & EPOLLIN) events |= EVENT_INPUT;if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;if (epollEvents & EPOLLERR) events |= EVENT_ERROR;if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;pushResponse(events, mRequests.valueAt(requestIndex));} else {ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is ""no longer registered.", epollEvents, fd);}}}
Done: ;
...
}

在这段代码中,首先调用epoll_wait函数等待事件的发生,然后根据事件的类型进行相应的处理。

在事件处理过程中,代码会检查是否需要重新构建epoll集合。如果需要重新构建,会调用rebuildEpollLocked方法,并跳转到Done标签所在的位置。

接下来,代码会检查事件的数量。如果事件数量小于0,表示发生了错误,会输出相应的错误日志,并将结果设置为POLL_ERROR,然后跳转到Done标签。

如果事件数量为0,表示发生了超时,会将结果设置为POLL_TIMEOUT,然后跳转到Done标签。

如果事件数量大于0,表示有事件发生,代码会遍历所有的事件,并根据事件的类型进行相应的处理。如果事件是唤醒事件,会调用awoken方法进行处理。如果事件是注册的文件描述符上的事件,会将事件类型和相关的请求信息添加到响应队列中。

最后,代码会跳转到Done标签所在的位置,继续执行标签后面的代码。
根据事件类型进行相应的处理后,就该处理事件了,下面是Done之后的代码

int Looper::pollInner(int timeoutMillis) {
...// Invoke pending message callbacks.mNextMessageUptime = LLONG_MAX;while (mMessageEnvelopes.size() != 0) {nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);if (messageEnvelope.uptime <= now) {// Remove the envelope from the list.// We keep a strong reference to the handler until the call to handleMessage// finishes.  Then we drop it so that the handler can be deleted *before*// we reacquire our lock.{ // obtain handlersp<MessageHandler> handler = messageEnvelope.handler;Message message = messageEnvelope.message;mMessageEnvelopes.removeAt(0);mSendingMessage = true;mLock.unlock();#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKSALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",this, handler.get(), message.what);
#endifhandler->handleMessage(message);} // release handlermLock.lock();mSendingMessage = false;result = POLL_CALLBACK;} else {// The last message left at the head of the queue determines the next wakeup time.mNextMessageUptime = messageEnvelope.uptime;break;}}// Release lock.mLock.unlock();// Invoke all response callbacks.for (size_t i = 0; i < mResponses.size(); i++) {Response& response = mResponses.editItemAt(i);if (response.request.ident == POLL_CALLBACK) {int fd = response.request.fd;int events = response.events;void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKSALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",this, response.request.callback.get(), fd, events, data);
#endif// Invoke the callback.  Note that the file descriptor may be closed by// the callback (and potentially even reused) before the function returns so// we need to be a little careful when removing the file descriptor afterwards.int callbackResult = response.request.callback->handleEvent(fd, events, data);if (callbackResult == 0) {removeFd(fd, response.request.seq);}// Clear the callback reference in the response structure promptly because we// will not clear the response vector itself until the next poll.response.request.callback.clear();result = POLL_CALLBACK;}}return result;
}

在这段代码中,首先会设置mNextMessageUptime为一个很大的值,以确保在没有新消息到达时不会触发下一次唤醒。
然后,代码进入一个循环,遍历消息队列中的消息。对于每个消息,代码会检查消息的到达时间是否小于等于当前时间。如果是,表示该消息已经到达,需要处理该消息。

在处理消息之前,代码会获取消息的处理器(handler)和消息内容,并从消息队列中移除该消息。然后,代码会释放锁,并调用处理器的handleMessage方法来处理该消息。(处理Native的Message,调用Native Handler的handleMessage处理该Message)

处理完消息后,代码会重新获取锁,并将mSendingMessage设置为false,表示消息处理完成。然后,将结果设置为POLL_CALLBACK,表示有回调发生。

如果消息的到达时间大于当前时间,表示还有未到达的消息,代码会将下一次唤醒时间设置为该消息的到达时间,并跳出循环。

接下来,代码会释放锁,并遍历所有的响应(response)。对于每个响应,代码会检查响应的标识是否为POLL_CALLBACK,如果是,表示需要调用回调函数。

代码会获取响应中的文件描述符(fd)、事件(events)和数据(data),然后调用回调函数的handleEvent方法进行处理。如果回调函数返回值为0,表示需要移除该文件描述符。

最后,代码会清除响应结构中的回调引用,并将结果设置为POLL_CALLBACK。然后,返回结果。

总的来说,这段代码实现了处理消息和响应的逻辑。它会遍历消息队列中的消息,处理已到达的消息,并根据响应的标识调用相应的回调函数。处理完消息和响应后,返回相应的结果。
添加监控请求
添加监控请求就是调用epoll_ctl添加文件句柄,以Nativce的Activity的代码来分析mRequests
frameworks/base/core/jni/android_app_NativeActivity.cpp

loadNativeCode_native(JNIEnv* env, jobject clazz, jstring path, jstring funcName,jobject messageQueue, jstring internalDataDir, jstring obbDir,jstring externalDataDir, jint sdkVersion, jobject jAssetMgr,jbyteArray savedState, jobject classLoader, jstring libraryPath){
...        code->messageQueue->getLooper()->addFd(code->mainWorkRead, 0, ALOOPER_EVENT_INPUT, mainWorkCallback, code.get());     
...     }

调用Looper的addFd函数,第一个参数表示监听的fd,第二个参数0表示ident,第三个参数表示需要监听的事件,这里只监听可读事件,第四个参数为回调函数,当该fd发生指定事件,looper会回调该函数,第五个参数为回调函数的参数

system/core/libutils/Looper.cpp

int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
#if DEBUG_CALLBACKSALOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident,events, callback.get(), data);
#endifif (!callback.get()) {if (! mAllowNonCallbacks) {ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");return -1;}if (ident < 0) {ALOGE("Invalid attempt to set NULL callback with ident < 0.");return -1;}} else {ident = POLL_CALLBACK;}{ // acquire lockAutoMutex _l(mLock);Request request;request.fd = fd;request.ident = ident;request.events = events;request.seq = mNextRequestSeq++;request.callback = callback;request.data = data;if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number -1struct epoll_event eventItem;request.initEventItem(&eventItem);ssize_t requestIndex = mRequests.indexOfKey(fd);if (requestIndex < 0) {int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, fd, &eventItem);if (epollResult < 0) {ALOGE("Error adding epoll events for fd %d: %s", fd, strerror(errno));return -1;}mRequests.add(fd, request);} else {int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_MOD, fd, &eventItem);if (epollResult < 0) {if (errno == ENOENT) {// Tolerate ENOENT because it means that an older file descriptor was// closed before its callback was unregistered and meanwhile a new// file descriptor with the same number has been created and is now// being registered for the first time.  This error may occur naturally// when a callback has the side-effect of closing the file descriptor// before returning and unregistering itself.  Callback sequence number// checks further ensure that the race is benign.//// Unfortunately due to kernel limitations we need to rebuild the epoll// set from scratch because it may contain an old file handle that we are// now unable to remove since its file descriptor is no longer valid.// No such problem would have occurred if we were using the poll system// call instead, but that approach carries others disadvantages.
#if DEBUG_CALLBACKSALOGD("%p ~ addFd - EPOLL_CTL_MOD failed due to file descriptor ""being recycled, falling back on EPOLL_CTL_ADD: %s",this, strerror(errno));
#endifepollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, fd, &eventItem);if (epollResult < 0) {ALOGE("Error modifying or adding epoll events for fd %d: %s",fd, strerror(errno));return -1;}scheduleEpollRebuildLocked();} else {ALOGE("Error modifying epoll events for fd %d: %s", fd, strerror(errno));return -1;}}mRequests.replaceValueAt(requestIndex, request);}} // release lockreturn 1;
}

addFd方法接受一个sp对象作为回调函数。在这段代码中,首先会检查回调函数是否为空。如果为空,会根据mAllowNonCallbacks的值来判断是否允许设置空回调函数。如果不允许,则会返回错误。如果允许,并且ident小于0,则会返回错误。

如果回调函数不为空,则会将ident设置为POLL_CALLBACK,表示该文件描述符是一个回调文件描述符。

然后,代码会获取锁,并创建一个Request对象,设置相应的属性。接着,代码会初始化一个epoll_event结构体,并调用epoll_ctl函数将文件描述符添加到epoll事件集合中。

如果添加成功,则会将Request对象添加到mRequests集合中。如果添加失败,则会根据错误类型进行处理。如果错误类型是ENOENT,表示文件描述符在移除之前已经关闭,并且一个新的文件描述符与相同的编号已经创建并且正在首次注册。在这种情况下,代码会重新将文件描述符添加到epoll事件集合中,并调用scheduleEpollRebuildLocked方法重新构建epoll事件集合。如果是其他错误类型,则表示添加失败,会返回错误。

最后,代码会释放锁,并返回成功。

总的来说,这段代码实现了向Looper对象添加文件描述符和回调函数的逻辑。它会检查回调函数是否为空,并根据情况进行处理。在添加文件描述符时,会将文件描述符添加到epoll事件集合中,并将相应的信息保存到mRequests集合中。如果添加失败,则会根据错误类型进行处理。
处理监控请求
在pollInner中,当某个监控fd发生事件后,会调用对应的Request,也就是pollInner调用的pushResponse方法
system/core/libutils/Looper.cpp

void Looper::pushResponse(int events, const Request& request) {Response response;response.events = events;response.request = request;mResponses.push(response);
}

该方法将一个Response添加到响应队列中,用于后续处理
poolInner不是单独处理request,而是先收集request,等到NativeMessage消息处理完后再处理。这表明在处理逻辑上,NativeMessage的优先级高于监控fd的优先级

添加Native的Message
system/core/libutils/Looper.cpp

void Looper::sendMessage(const sp<MessageHandler>& handler, const Message& message) {nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);sendMessageAtTime(now, handler, message);
}
void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,const Message& message) {
#if DEBUG_CALLBACKSALOGD("%p ~ sendMessageAtTime - uptime=%" PRId64 ", handler=%p, what=%d",this, uptime, handler.get(), message.what);
#endifsize_t i = 0;{ // acquire lockAutoMutex _l(mLock);size_t messageCount = mMessageEnvelopes.size();while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {i += 1;}MessageEnvelope messageEnvelope(uptime, handler, message);mMessageEnvelopes.insertAt(messageEnvelope, i, 1);// Optimization: If the Looper is currently sending a message, then we can skip// the call to wake() because the next thing the Looper will do after processing// messages is to decide when the next wakeup time should be.  In fact, it does// not even matter whether this code is running on the Looper thread.if (mSendingMessage) {return;}} // release lock// Wake the poll loop only when we enqueue a new message at the head.if (i == 0) {wake();}
}

sendMessageAtTime方法用于向消息队列中添加一个延迟发送的消息。在这段代码中,首先会打印调试信息(如果定义了DEBUG_CALLBACKS宏),包括uptime、handler和message.what的值。

然后,代码会获取锁,并根据uptime的值找到消息队列中合适的位置插入新的消息。具体地,代码会遍历消息队列,找到第一个uptime大于等于给定uptime的位置,并将新的消息插入到该位置。

在插入消息后,代码会进行一个优化判断。如果当前Looper正在发送消息(mSendingMessage为true),则可以跳过调用wake()方法唤醒轮询循环,因为在处理完消息后,Looper会决定下一次唤醒的时间。实际上,这段代码是否在Looper线程上运行并不重要。

最后,如果新插入的消息是队列中的第一个消息(即i == 0),则调用wake()方法唤醒轮询循环。

总的来说,这段代码实现了向消息队列中添加延迟发送的消息的逻辑。它会根据给定的uptime值找到合适的位置插入消息,并在必要时唤醒轮询循环。

1.4 MessageQueue总结

在这里插入图片描述

· Java层提供了Looper类和MessageQueue类,其中Looper类提供循环处理消息的机制,MessageQueue类提供一个消息队列,以及插入、删除和提取消息的函数接口。另外,Handler也是在Java层常用的与消息处理相关的类。

· MessageQueue内部通过mPtr变量保存一个Native层的NativeMessageQueue对象,mMessages保存来自Java层的Message消息。

· NativeMessageQueue保存一个native的Looper对象,该Looper从ALooper派生,提供pollOnce和addFd等函数。

· Java层有Message类和Handler类,而Native层对应也有Message类和MessageHandler抽象类。在编码时,一般使用的是MessageHandler的派生类WeakMessageHandler类。

MessageQueue处理流程总结
MessageQueue核心逻辑下移到Native层后,极大地拓展了消息处理的范围,总结一下有以下几点:
1.MessageQueue继续支持来自Java层的Message消息,也就是早期的Message加Handler的处理方式。

2.MessageQueue在Native层的代表NativeMessageQueue支持来自Native层的Message,是通过Native的Message和MessageHandler来处理的。

3.NativeMessageQueue还处理通过addFd添加的Request。在后面分析输入系统时,还会大量碰到这种方式。

4.从处理逻辑上看,先是Native的Message,然后是Native的Request,最后才是Java的Message。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/119843.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CVE-2021-44228 Apache log4j 远程命令执行漏洞

一、漏洞原理 log4j(log for java)是由Java编写的可靠、灵活的日志框架&#xff0c;是Apache旗下的一个开源项目&#xff0c;使用Log4j&#xff0c;我们更加方便的记录了日志信息&#xff0c;它不但能控制日志输出的目的地&#xff0c;也能控制日志输出的内容格式&#xff1b;…

CSDN学院 < 华为战略方法论进阶课 > 正式上线!

目录 你将收获 适用人群 课程内容 内容目录 CSDN学院 作者简介 你将收获 提升职场技能提升战略规划的能力实现多元化发展综合能力进阶 适用人群 主要适合公司中高层、创业者、产品经理、咨询顾问&#xff0c;以及致力于改变现状的学员。 课程内容 本期课程主要介绍华为…

不同网段的IP怎么互通

最近在整理工作的时候发现一个不同网段无法互通的问题&#xff0c;就是我们大家熟知的一级路由和二级路由无法互通的问题。由于需要记录整个过程的完整性&#xff0c;这里也需要详细记录下整个过程&#xff0c;明白的人不用看&#xff0c;可以直接跳过&#xff0c;到解决方法去…

Windows 和 Linux 这2个系统在进行编程实现的时候的一些区别:

很惭愧&#xff0c;学了很多年才意识到&#xff0c;噢&#xff0c;原来这两个系统实现一些功能的时候会使用到不同的库&#xff0c;使用不同的函数。 那么&#xff0c;也会延伸出一些问题&#xff1a; 比如&#xff0c;如何实现版本的迁移。一个在Linux上运行的代码如何可以比…

C#WPF嵌入字体实例

本文介绍C#WPF嵌入字体实例。 首先创建项目 添加Resources文件夹,添加字体文件,字体文件属性:生成操作为Resources,复制到输出目录:不复制 字体的使用可以采用以下两种方法: 方式一 直接引用 FontFamily="./Resources/#幼圆" 方式二 定义资源 <Applica…

【面试经典150 | 栈】简化路径

文章目录 Tag题目来源题目解读解题思路方法一&#xff1a;字符串数组模拟栈 其他语言python3 写在最后 Tag 【栈】【字符串】 题目来源 71. 简化路径 题目解读 将 Unix 风格的绝对路径转化成更加简洁的规范路径。字符串中会出现 字母、数字、/、_、. 和 .. 这几种字符&#…

阿里云企业邮箱基于Spring Boot快速实现发送邮件功能

邮件在项目中经常会被用到&#xff0c;比如用邮件发送通知。比如&#xff0c;通过邮件注册、认证、找回密码、系统报警通知、报表信息等。本篇文章带大家通过SpringBoot快速实现一个发送邮件的功能。 邮件协议 下面先简单了解一下常见的邮件协议。常用的电子邮件协议有SMTP、…

基于springboot,vue学生宿舍管理系统

开发工具&#xff1a;IDEA 服务器&#xff1a;Tomcat9.0&#xff0c; jdk1.8 项目构建&#xff1a;maven 数据库&#xff1a;mysql5.7 系统分前后台&#xff0c;项目采用前后端分离 前端技术&#xff1a;vuevue-element-admin 服务端技术&#xff1a;springboot,mybatis…

express session JWT JSON Web Token

了解 Session 认证的局限性 Session 认证机制需要配合 cookie 才能实现。由于 Cookie 默认不支持跨域访问&#xff0c;所以&#xff0c;当涉及到前端跨域请求后端接口的时候&#xff0c;需要做很多额外的配置&#xff0c;才能实现跨域 Session 认证。 注意&#xff1a; 当前端…

层次式架构的设计理论与实践

层次式架构的设计理论与实践 层次式架构概述 层次式架构的定义和特性 定义 特性 层次式架构的一般组成(表现层、中间层、数据访问层和数据层) 表现层框架设计 设计模式 MVC MVP MVVM XML技术 UIP设计思想 表现层动态生成设计思想(基于XML界面管理技术) 中间层架构设计 业务…

macOS 12 Monterey v12.7.1正式版:开启全新的操作系统体验

macOS 12 Monterey已经向所有兼容的Mac设备推出&#xff0c;为您带来了一系列强大的新功能和改进。这个全新的操作系统版本&#xff0c;不仅带来了更流畅的用户体验&#xff0c;还增强了与iOS设备的无缝集成&#xff0c;让您的设备使用更加高效&#xff0c;更加便捷。 macOS 1…

NVM 安装及使用

1.安装 我使用的是解压版&#xff0c;免安装 github下载压缩包 下载后放在一个【没有中文】的文件夹下&#xff0c;解压 然后需要配环境变量&#xff0c; 首先添加两个变量&#xff0c;分别是刚刚nvm解压的路径&#xff0c;和当前node安装的路径。 然后编辑path变量&#x…

牛客网刷题-(5)

&#x1f308;write in front&#x1f308; &#x1f9f8;大家好&#xff0c;我是Aileen&#x1f9f8;.希望你看完之后&#xff0c;能对你有所帮助&#xff0c;不足请指正&#xff01;共同学习交流. &#x1f194;本文由Aileen_0v0&#x1f9f8; 原创 CSDN首发&#x1f412; 如…

贝锐花生壳内网穿透推出全新功能,远程业务连接更安全

贝锐旗下内网穿透兼动态域名解析品牌花生壳目前推出了全新的“访问控制”功能&#xff0c;可精确设置访问权限&#xff0c;充分保障信息安全&#xff0c;满足更多用户安全远程访问内网服务的需求。 通过这一功能&#xff0c;可实现指定时间、IP、地区等条件下才能远程访问映射的…

算法通过村第十六关-滑动窗口|黄金笔记|结合堆的应用

文章目录 前言堆与滑动窗口结合的问题总结 前言 提示&#xff1a;不论记忆多么痛苦&#xff0c;它属于过去&#xff0c;已经逝去了&#xff0c;我们为什么还执着于它并让它代表我们&#xff1f;我们就这样&#xff0c;所以&#xff0c;我们受苦。 --丹津葩默 这个还是一个比较重…

进程/线程/PCB

进程&#xff1a;正在运行中的程序&#xff08;进程是驻留在内存中的&#xff09; 是系统执行资源分配和调度的独立单位每一个进程都有属于自己的存储空间和系统资源注意&#xff1a;进程A 和 进程B 的内存独立不共享 使用jdk自带的工具&#xff0c;jconsole查看当前Java进程中…

CentOS 编译安装 nginx

CentOS 编译安装 nginx 修改 yum 源地址为 阿里云 curl -o /etc/yum.repos.d/CentOS-Base.repo https://mirrors.aliyun.com/repo/Centos-7.repoyum makecache升级内核和软件 yum -y update安装常用软件和依赖 yum -y install gcc gcc-c make cmake zlib zlib-devel openss…

electron27+react18集成搭建跨平台应用|electron窗口多开

基于Electron27集成React18创建一个桌面端exe程序。 electron27-vite4-react18基于electron27结合vite4构建工具快速创建react18跨端应用实践。 版本列表 "vite": "^4.4.5" "react": "^18.2.0" "electron": "^27.0.1&…

CloudQuery + StarRocks:打造高效、安全的数据库管控新模式

随着技术的迅速发展&#xff0c;各种多元化的数据库产品应运而生&#xff0c;它们不仅类型众多&#xff0c;而且形式各异&#xff0c;国产化数据库千余套&#xff0c;开源数据库百余套 OceanBase 、PolarDB 、StarRocks…还有一些像 Oracle、MySQL 这些传统数据库。这些数据库产…

太极v14.0.4 免ROOT用Xposed

一个帮助你免 Root、免解锁免刷机使用 Xposed 模块的 APP 框架。 模块通过它改变系统和应用的行为&#xff0c;既能以传统的 Root/ 刷机方式运作&#xff0c; 也能免 Root/ 免刷机运行&#xff1b;并且它支持 Android 5.0 ~ 11。 简单来说&#xff0c;太极就是个 Xposed 框架…