参考 Android消息机制1-Handler(Java层)
Android消息机制2-Handler(Native层)
Android应用程序消息处理机制(Looper、Handler)分析
管道(Unix)
Epoll
文件描述符
Handler 机制主要由四个部分组成:
Looper
MessageQueue
Message
Handler
典型用法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 class LooperThread extends Thread { public Handler mHandler; public void run () { Looper.prepare(); mHandler = new Handler () { public void handleMessage (Message msg) { } }; Looper.loop(); } }
Looper 不断循环执行 Looper.loop
,按分发机制将消息分发给目标处理者
1 2 3 4 5 6 7 private static void prepare (boolean quitAllowed) { if (sThreadLocal.get() != null ) { throw new RuntimeException ("Only one Looper may be created per thread" ); } sThreadLocal.set(new Looper (quitAllowed)); }
sThreadLocal
是一个 ThreadLocal 类型的静态变量
ThreadLocal:线程本地存储区(Thread Local Storage),每个线程都有自己的私有的本地存储区域,不同线程之间彼此不能访问对方的 TLS 区域
1 2 3 4 5 private Looper (boolean quitAllowed) { mQueue = new MessageQueue (quitAllowed); mThread = Thread.currentThread(); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 public static void loop () { final Looper me = myLooper(); if (me == null ) { throw new RuntimeException ("No Looper; Looper.prepare() wasn't called on this thread." ); } final MessageQueue queue = me.mQueue; Binder.clearCallingIdentity(); final long ident = Binder.clearCallingIdentity(); for (;;) { Message msg = queue.next(); if (msg == null ) { return ; } final Printer logging = me.mLogging; if (logging != null ) { logging.println(">>>>> Dispatching to " + msg.target + " " + msg.callback + ": " + msg.what); } final long slowDispatchThresholdMs = me.mSlowDispatchThresholdMs; final long start = (slowDispatchThresholdMs == 0 ) ? 0 : SystemClock.uptimeMillis(); final long end; try { msg.target.dispatchMessage(msg); end = (slowDispatchThresholdMs == 0 ) ? 0 : SystemClock.uptimeMillis(); } finally { if (traceTag != 0 ) { Trace.traceEnd(traceTag); } } if (slowDispatchThresholdMs > 0 ) { final long time = end - start; if (time > slowDispatchThresholdMs) { Slog.w(TAG, "Dispatch took " + time + "ms on " + Thread.currentThread().getName() + ", h=" + msg.target + " cb=" + msg.callback + " msg=" + msg.what); } } if (logging != null ) { logging.println("<<<<< Finished to " + msg.target + " " + msg.callback); } final long newIdent = Binder.clearCallingIdentity(); if (ident != newIdent) { Log.wtf(TAG, "Thread identity changed from 0x" + Long.toHexString(ident) + " to 0x" + Long.toHexString(newIdent) + " while dispatching to " + msg.target.getClass().getName() + " " + msg.callback + " what=" + msg.what); } msg.recycleUnchecked(); } }
1 2 3 4 5 6 7 8 public void quit () { mQueue.quit(false ); } public void quitSafely () { mQueue.quit(true ); }
MessageQueue 在构造方法中,会调用 native 方法 nativeInit
方法,在 NativeMessageQueue 的构造方法中,会构造一个 JNI 层的 Looper
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 Looper::Looper (bool allowNonCallbacks) : mAllowNonCallbacks (allowNonCallbacks), mResponseIndex (0 ) { int wakeFds[2 ]; int result = pipe (wakeFds); ...... mWakeReadPipeFd = wakeFds[0 ]; mWakeWritePipeFd = wakeFds[1 ]; ...... #ifdef LOOPER_USES_EPOLL mEpollFd = epoll_create (EPOLL_SIZE_HINT); ...... struct epoll_event eventItem; memset (& eventItem, 0 , sizeof (epoll_event)); eventItem.events = EPOLLIN; eventItem.data.fd = mWakeReadPipeFd; result = epoll_ctl (mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem); ...... #else ...... #endif ...... }
管道:Linux 系统中的一种进程间通信机制。简单来说,管道就是一个文件,在管道的两端,分别是两个打开文件的文件描述符,这两个打开文件描述符都是对应同一个文件,其中一个是用来读的,别一个是用来写的,一般的使用方式就是,一个线程通过读文件描述符中来读管道的内容,当管道没有内容时,这个线程就会进入等待状态,而另外一个线程通过写文件描述符来向管道中写入内容,写入内容的时候,如果另一端正有线程正在等待管道中的内容,那么这个线程就会被唤醒。
epoll:Linux 系统中的 epoll 机制为处理大批量句柄而作了改进的 poll,是 Linux 下多路复用 IO 接口select/poll 的增强版本,它能显著减少程序在大量并发连接中只有少量活跃的情况下的系统 CPU 利用率。
pipe 是 Linux 系统中的管道机制,用于 IPC,在管道机制的实现中,又使用 epoll 机制来监听读写事件。
以上在 Android 上的应用为,当 Java 层的消息队列中没有消息时,就使 Android 应用程序主线程进入等待状态,而当 Java 层的消息队列中来了新的消息后,就唤醒 Android 应用程序的主线程来处理这个消息。
Handler 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public Handler (Callback callback, boolean async) { if (FIND_POTENTIAL_LEAKS) { final Class<? extends Handler > klass = getClass(); if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) && (klass.getModifiers() & Modifier.STATIC) == 0 ) { Log.w(TAG, "The following Handler class should be static or leaks might occur: " + klass.getCanonicalName()); } } mLooper = Looper.myLooper(); if (mLooper == null ) { throw new RuntimeException ( "Can't create handler inside thread that has not called Looper.prepare()" ); } mQueue = mLooper.mQueue; mCallback = callback; mAsynchronous = async; }
在 Looper.loop
中,当存在 Message 需要处理时,会调用 dispatchMessage
来进行分发:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public void dispatchMessage (Message msg) { if (msg.callback != null ) { handleCallback(msg); } else { if (mCallback != null ) { if (mCallback.handleMessage(msg)) { return ; } } handleMessage(msg); } }
通过 Handler 发送消息:
最终所有的方法都会调用到 MessageQueue.enqueueMessage
MessageQueue
消息机制中 Java 层和 C++ 层的连接纽带,大部分核心方法都交给 native 层来处理
1 2 3 4 5 MessageQueue(boolean quitAllowed) { mQuitAllowed = quitAllowed; mPtr = nativeInit(); }
MessageQueue 的初始化工作主要由 native 方法来执行
1 2 3 4 5 6 7 8 9 10 11 12 static void android_os_MessageQueue_nativeInit (JNIEnv* env, jobject obj) { NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue (); if (! nativeMessageQueue) { jniThrowRuntimeException (env, "Unable to allocate native queue" ); return ; } android_os_MessageQueue_setNativeMessageQueue (env, obj, nativeMessageQueue); }
nativeInit
中主要是在 JNI 层创建一个 NativeMessageQueue 并将偏移量保存在 MessageQueue 中的 mPtr
,关联了 NativeMessageQueue 和 MessageQueue
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 Message next () { final long ptr = mPtr; if (ptr == 0 ) { return null ; } int pendingIdleHandlerCount = -1 ; int nextPollTimeoutMillis = 0 ; for (;;) { if (nextPollTimeoutMillis != 0 ) { Binder.flushPendingCommands(); } nativePollOnce(ptr, nextPollTimeoutMillis); synchronized (this ) { final long now = SystemClock.uptimeMillis(); Message prevMsg = null ; Message msg = mMessages; if (msg != null && msg.target == null ) { do { prevMsg = msg; msg = msg.next; } while (msg != null && !msg.isAsynchronous()); } if (msg != null ) { if (now < msg.when) { nextPollTimeoutMillis = (int ) Math.min(msg.when - now, Integer.MAX_VALUE); } else { mBlocked = false ; if (prevMsg != null ) { prevMsg.next = msg.next; } else { mMessages = msg.next; } msg.next = null ; if (DEBUG) Log.v(TAG, "Returning message: " + msg); msg.markInUse(); return msg; } } else { nextPollTimeoutMillis = -1 ; } if (mQuitting) { dispose(); return null ; } if (pendingIdleHandlerCount < 0 && (mMessages == null || now < mMessages.when)) { pendingIdleHandlerCount = mIdleHandlers.size(); } if (pendingIdleHandlerCount <= 0 ) { mBlocked = true ; continue ; } if (mPendingIdleHandlers == null ) { mPendingIdleHandlers = new IdleHandler [Math.max(pendingIdleHandlerCount, 4 )]; } mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers); } for (int i = 0 ; i < pendingIdleHandlerCount; i++) { final IdleHandler idler = mPendingIdleHandlers[i]; mPendingIdleHandlers[i] = null ; boolean keep = false ; try { keep = idler.queueIdle(); } catch (Throwable t) { Log.wtf(TAG, "IdleHandler threw exception" , t); } if (!keep) { synchronized (this ) { mIdleHandlers.remove(idler); } } } pendingIdleHandlerCount = 0 ; nextPollTimeoutMillis = 0 ; } }
在 next
方法中,nativePollOnce
是阻塞操作,其中 nextPollTimeoutMillis
代表下一个消息到来之前,还需要等待的时长;nextPollTimeoutMillis == -1
表示当前没有更多消息。nativePollOnce
调用结束后,从 mMessages
中提取一个消息
当处于空闲时,执行 IdleHandler
中的回调方法。
1 2 3 4 5 6 7 8 static void android_os_MessageQueue_nativePollOnce (JNIEnv* env, jobject obj, jint ptr, jint timeoutMillis) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast <NativeMessageQueue*>(ptr); nativeMessageQueue->pollOnce (timeoutMillis); }
1 2 3 4 void NativeMessageQueue::pollOnce (int timeoutMillis) { mLooper->pollOnce (timeoutMillis); }
pollOnce
会调用 pollnner
来进一步操作,如果 pollnner
返回值不等于 0,则返回
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 int Looper::pollInner (int timeoutMillis) { ...... int result = ALOOPER_POLL_WAKE; ...... #ifdef LOOPER_USES_EPOLL struct epoll_event eventItems[EPOLL_MAX_EVENTS]; int eventCount = epoll_wait (mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); bool acquiredLock = false ; #else ...... #endif if (eventCount < 0 ) { if (errno == EINTR) { goto Done; } LOGW ("Poll failed with an unexpected error, errno=%d" , errno); result = ALOOPER_POLL_ERROR; goto Done; } if (eventCount == 0 ) { ...... result = ALOOPER_POLL_TIMEOUT; goto Done; } ...... #ifdef LOOPER_USES_EPOLL for (int i = 0 ; i < eventCount; i++) { int fd = eventItems[i].data.fd; uint32_t epollEvents = eventItems[i].events; if (fd == mWakeReadPipeFd) { if (epollEvents & EPOLLIN) { awoken (); } else { LOGW ("Ignoring unexpected epoll events 0x%x on wake read pipe." , epollEvents); } } else { ...... } } if (acquiredLock) { mLock.unlock (); } Done: ; #else ...... #endif ...... return result; }
1 2 3 4 5 6 7 8 9 void Looper::awoken () { ...... char buffer[16 ]; ssize_t nRead; do { nRead = read (mWakeReadPipeFd, buffer, sizeof (buffer)); } while ((nRead == -1 && errno == EINTR) || nRead == sizeof (buffer)); }
总结上面的代码,Looper 通过 loop
调用 MessageQueue 的 next
,next
中又会调用到 native 方法 nativePollOnce
,在这个方法中,会调用到 NativeMessageQueue 的 pollInner
,这里会通过在 JNI 层 Looper 的构造方法中,使用 epoll 监听管道 EPOLLIN 事件,如果存在调用 awoken
,清空管道中的内容,以便下次再调用pollInner函数时,知道自从上次处理完消息队列中的消息后,有没有新的消息加进来。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 boolean enqueueMessage (Message msg, long when) { if (msg.target == null ) { throw new IllegalArgumentException ("Message must have a target." ); } if (msg.isInUse()) { throw new IllegalStateException (msg + " This message is already in use." ); } synchronized (this ) { if (mQuitting) { IllegalStateException e = new IllegalStateException ( msg.target + " sending message to a Handler on a dead thread" ); Log.w(TAG, e.getMessage(), e); msg.recycle(); return false ; } msg.markInUse(); msg.when = when; Message p = mMessages; boolean needWake; if (p == null || when == 0 || when < p.when) { msg.next = p; mMessages = msg; needWake = mBlocked; } else { needWake = mBlocked && p.target == null && msg.isAsynchronous(); Message prev; for (;;) { prev = p; p = p.next; if (p == null || when < p.when) { break ; } if (needWake && p.isAsynchronous()) { needWake = false ; } } msg.next = p; prev.next = msg; } if (needWake) { nativeWake(mPtr); } } return true ; }
MessageQueue
是按照消息触发时间的先后顺序排列的,队列头部的消息是最早触发的。当有消息加入,会从队列头部开始遍历,插入到合适的位置,以保证所有消息的时间顺序。
如果当前线程处于空闲等待状态,那么还需要调用 nativeWake
来唤醒:
1 2 3 4 5 6 static void android_os_MessageQueue_nativeWake (JNIEnv* env, jobject obj, jint ptr) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast <NativeMessageQueue*>(ptr); return nativeMessageQueue->wake (); }
这里将唤醒请求转发到 Looper wake
:
1 2 3 4 5 6 7 8 9 10 11 12 void Looper::wake () { ...... ssize_t nWrite; do { nWrite = write (mWakeWritePipeFd, "W" , 1 ); } while (nWrite == -1 && errno == EINTR); ....... }
往管道写入内容,从而唤醒线程,因为当消息队列中没有消息处理时,线程会进入空闲等待状态,具体是通过 Looper 的 polllnner
中调用 epoll_wait
进入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 void removeMessages (Handler h, int what, Object object) { if (h == null ) { return ; } synchronized (this ) { Message p = mMessages; while (p != null && p.target == h && p.what == what && (object == null || p.obj == object)) { Message n = p.next; mMessages = n; p.recycleUnchecked(); p = n; } while (p != null ) { Message n = p.next; if (n != null ) { if (n.target == h && n.what == what && (object == null || n.obj == object)) { Message nn = n.next; n.recycleUnchecked(); p.next = nn; continue ; } } p = n; } } }
postSyncBarrier
提交一个同步屏障,这将会阻止队列中消息的执行,直到手动调用 removeSyncBarrier
当 MessageQueue 退出时,需要 dispose
:
1 2 3 4 5 6 7 8 9 10 private void dispose () { if (mPtr != 0 ) { nativeDestroy(mPtr); mPtr = 0 ; } }
nativeDestroy
最终会调用 RefBase 的 decStrong
:
1 2 3 4 5 6 7 8 9 10 11 12 13 void RefBase::decStrong (const void * id) const { weakref_impl* const refs = mRefs; refs->removeStrongRef (id); const int32_t c = android_atomic_dec (&refs->mStrong); if (c == 1 ) { refs->mBase->onLastStrongRef (id); if ((refs->mFlags&OBJECT_LIFETIME_MASK) == OBJECT_LIFETIME_STRONG) { delete this ; } } refs->decWeak (id); }
Message 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 void recycleUnchecked () { flags = FLAG_IN_USE; what = 0 ; arg1 = 0 ; arg2 = 0 ; obj = null ; replyTo = null ; sendingUid = -1 ; when = 0 ; target = null ; callback = null ; data = null ; synchronized (sPoolSync) { if (sPoolSize < MAX_POOL_SIZE) { next = sPool; sPool = this ; sPoolSize++; } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public static Message obtain () { synchronized (sPoolSync) { if (sPool != null ) { Message m = sPool; sPool = m.next; m.next = null ; m.flags = 0 ; sPoolSize--; return m; } } return new Message (); }
总结
Java 层:
Handler 通过 sendMessage
,将 Message 通过 MessageQueue.enqueueMessage
添加到队列中
Looper 通过 loop
提取需要执行的 Message,并交与 Message.target
的 Handler 进行 dispatchMessage
分发
将 Message 添加到 MessageQueue 时,会唤醒 Looper 线程;如果 MessageQueue 中没有 Message 时,并处于 Idle 状态,则会执行 IdelHandler
JNI 层:
线程在进入循环之前,会在 JNI 创建管道(Pipe) ,当消息队列为空时,线程处于空闲等待状态
通过 epoll 机制监听 EPOLLIN
事件,当有新事件进入消息队列时,并且当前线程处于空闲状态,通过向管道写入数据,来唤醒线程
消息分发的优先级:
Message.callback.run()
Handler.mCallback.handleMessage()
Handler.handleMessage()
EPOLL:Linux 内核的可扩展 I/O 事件通知机制
PIPE:管道是一系列将标准输入输出链接起来的进程,其中每个进程的输出被直接作为下一个进程的输入
文件描述符(File descriptor):用于表述指向文件 的引用的抽象化概念