Android-MessageQueue

Android-MessageQueue

1. 认识MessageQueue

在分析 MessageQueue 之前,首先要了解到一个消息队列的误区:MessageQueue 从名字上看来是一个“队列”,主要是其处理消息的方式比较像一个队列。实际上,因为会频繁地出现 Message 的插入和删除等操作,如果是用传统的“队列”来存放 Message,插入、删除的效率会很低,因此 MessageQueue 采用单链表的形式(单链表插入、删除只需要变动两个节点对象的连接目标即可)来管理 Message。

MessageQueue 源码里的注释对其一些性质进行了说明:

1
2
3
4
5
6
7
8
/**
* Low-level class holding the list of messages to be dispatched by a
* {@link Looper}. Messages are not added directly to a MessageQueue,
* but rather through {@link Handler} objects associated with the Looper.
*
* <p>You can retrieve the MessageQueue for the current thread with
* {@link Looper#myQueue() Looper.myQueue()}.
*/

大概意思是:

MessageQueue 是一个持有 Message 的低等级类,它所持有的 Message 由 Looper 分发。Message 并不是直接被添加到 MessageQueue 中的,而是通过 Handler 的对象与 Looper 绑定起来。

你可以通过 Looper.myQueue() 方法取回当前线程的 MessageQueue。

这段注释特别关键,以至于我觉得已经足以说明整个 MessageQueue 的工作方式了,上半段概括了 MessageQueue 如何管理 Message,下半段概括了 MessageQueue、Looper 以及线程的关系。


2. MessageQueue管理Message

2.1 添加Message

(1)当一个 Message 产生并且被发送时,通过 Handler 的对象将这个 Message 和当前线程绑定,再插入到 MessageQueue 单链表中。

通过 Handler 源码可知,一个消息被发送后,Handler 最终会调用 MessageQueue.enqueueMessage(Message msg, long when)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
boolean enqueueMessage(Message msg, long when) {
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}
if (msg.isInUse()) {
throw new IllegalStateException(msg + " This message is already in use.");
}

synchronized (this) {
if (mQuitting) {
IllegalStateException e = new IllegalStateException(
msg.target + " sending message to a Handler on a dead thread");
Log.w(TAG, e.getMessage(), e);
msg.recycle();
return false;
}

msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
if (p == null || when == 0 || when < p.when) {
// New head, wake up the event queue if blocked.
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
// Inserted within the middle of the queue. Usually we don't have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}

// We can assume mPtr != 0 because mQuitting is false.
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}

关键点:

1
2
3
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}

这句也就是手动将 Message.target 设置为 null 会报异常的原因。其余部分也就是根据 Message 的 when 来将 Message 插入到 MessageQueue 的指定位置,并且根据情况唤醒 Native 层的消息队列。

2.2 移除Message

移除消息有两个不同参数的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// 使用默认回调初始化 Handler 时移除消息
void removeMessages(Handler h, int what, Object object) {
if (h == null) {
return;
}

synchronized (this) {
Message p = mMessages;

// Remove all messages at front.
while (p != null && p.target == h && p.what == what
&& (object == null || p.obj == object)) {
Message n = p.next;
mMessages = n;
p.recycleUnchecked();
p = n;
}

// Remove all messages after front.
while (p != null) {
Message n = p.next;
if (n != null) {
if (n.target == h && n.what == what
&& (object == null || n.obj == object)) {
Message nn = n.next;
n.recycleUnchecked();
p.next = nn;
continue;
}
}
p = n;
}
}
}

// 使用自定义回调初始化 Handler 时移除消息
void removeMessages(Handler h, Runnable r, Object object) {
if (h == null || r == null) {
return;
}

synchronized (this) {
Message p = mMessages;

// Remove all messages at front.
while (p != null && p.target == h && p.callback == r
&& (object == null || p.obj == object)) {
Message n = p.next;
mMessages = n;
p.recycleUnchecked();
p = n;
}

// Remove all messages after front.
while (p != null) {
Message n = p.next;
if (n != null) {
if (n.target == h && n.callback == r
&& (object == null || n.obj == object)) {
Message nn = n.next;
n.recycleUnchecked();
p.next = nn;
continue;
}
}
p = n;
}
}
}

其实对应的是初始化 Handler 时,一个是使用默认回调,一个是使用自定义回调的情况,具体移除消息的过程几乎是一样的,都是单链表从中移除一个节点的逻辑。

2.3 移除BarrierMessage

源码是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* Removes a synchronization barrier.
*
* @param token The synchronization barrier token that was returned by
* {@link #postSyncBarrier}.
*
* @throws IllegalStateException if the barrier was not found.
*
* @hide
*/
@TestApi
public void removeSyncBarrier(int token) {
// Remove a sync barrier token from the queue.
// If the queue is no longer stalled by a barrier then wake it.
synchronized (this) {
Message prev = null;
Message p = mMessages;
while (p != null && (p.target != null || p.arg1 != token)) {
prev = p;
p = p.next;
}
if (p == null) {
throw new IllegalStateException("The specified message queue synchronization "
+ " barrier token has not been posted or has already been removed.");
}
final boolean needWake;
if (prev != null) {
prev.next = p.next;
needWake = false;
} else {
mMessages = p.next;
needWake = mMessages == null || mMessages.target != null;
}
p.recycleUnchecked();

// If the loop is quitting then it is already awake.
// We can assume mPtr != 0 when mQuitting is false.
if (needWake && !mQuitting) {
nativeWake(mPtr);
}
}
}

和发送一个障栅消息一样,这个方法也加了 @hide 注解,移除的过程和移除普通消息比较类似,也是单链表移除节点的逻辑。


3. MessageQueue和线程的交互

MessageQueue 中有一个接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Callback interface for discovering when a thread is going to block
* waiting for more messages.
*/
public static interface IdleHandler {
/**
* Called when the message queue has run out of messages and will now
* wait for more. Return true to keep your idle handler active, false
* to have it removed. This may be called if there are still messages
* pending in the queue, but they are all scheduled to be dispatched
* after the current time.
*/
boolean queueIdle();
}

通过注释可以知道:

  • 这个接口的作用是:找出一个线程什么时候阻塞并等待更多(新的)消息。
  • queueIdle() 的作用是:当一个消息队列处理完全部消息,并且将进入等待的状态时调用。返回 true 则激活 Idle Handler,返回 false 则在执行完后将其移除。如果一个消息队列中还存在消息,但在当前时间之后的所有消息,均已经被计划好了分发的时间(也即这些消息都属于 Delay 型的消息,因此在执行到这些消息之前,线程也处在没有消息需要处理的状态),则该方法仍然可能被调用。

具体的意义如下:

Handler 本身除了可以用来发送消息之外,另一个很重要的功能就是去处理接收到的消息,也就是 handleMessage 方法,普通的 Handler 只会处理人为指定的消息,这些消息对应的事务优先级比较高,因此当消息队列中存在 Message 时,线程会优先处理。但是不论是 Dalvik 还是 JVM,都有一些自己的管理事务,这些事务的优先级不是特别高,或者是不可预期的,但依然需要占用线程才能处理(毕竟线程是 CPU 调度的基本单位),比如 GC,内存碎片整理等,这些事务在非紧急情况下,只会在线程中没有消息时处理,此时处理这些事务的就是 Idle Handler。Idle Handler 并不是一个独立的类型,只需要实现 IdleHandler 接口即可。

3.1 添加IdleHandler

既然 IdleHandler 只是一个接口,那如何让 MessageQueue 在空闲时可以唤起呢?以下方法即可添加一个 IdleHandler 到 IdleHandler 的 ArrayList 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Add a new {@link IdleHandler} to this message queue. This may be
* removed automatically for you by returning false from
* {@link IdleHandler#queueIdle IdleHandler.queueIdle()} when it is
* invoked, or explicitly removing it with {@link #removeIdleHandler}.
*
* <p>This method is safe to call from any thread.
*
* @param handler The IdleHandler to be added.
*/
public void addIdleHandler(@NonNull IdleHandler handler) {
if (handler == null) {
throw new NullPointerException("Can't add a null IdleHandler");
}
synchronized (this) {
mIdleHandlers.add(handler);
}
}

注释的大致意思是:将一个 IdleHandler 添加到 MessageQueue 中,如果 IdleHandler.queueIdle() 返回 false,或者直接调用 removeIdleHandler() 方法,则会移除这个 IdleHandler。

3.2 移除IdleHandler

移除 IdleHandler 的方法也很简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Remove an {@link IdleHandler} from the queue that was previously added
* with {@link #addIdleHandler}. If the given object is not currently
* in the idle list, nothing is done.
*
* <p>This method is safe to call from any thread.
*
* @param handler The IdleHandler to be removed.
*/
public void removeIdleHandler(@NonNull IdleHandler handler) {
synchronized (this) {
mIdleHandlers.remove(handler);
}
}

3.3 调用IdleHandler

添加和移除 Idle Handler 都已经清楚了,Idle Handler 具体使用的地方,在 MessageQueue 实际处理消息的方法 next() 里,在默认情况下,每一次调用 next 就会返回一个 Message,这个 Message 就是下一个需要分发的消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
@UnsupportedAppUsage
Message next() {
// Return here if the message loop has already quit and been disposed.
// This can happen if the application tries to restart a looper after quit
// which is not supported.


// mPtr 是通过调用 nativeInit() 获取的返回值
// 可以认为这个是 Native 层 MessageQueue 初始化后的指针地址
final long ptr = mPtr;
if (ptr == 0) {
return null;
}

int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}

nativePollOnce(ptr, nextPollTimeoutMillis);

synchronized (this) {
// Try to retrieve the next message. Return if found.
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;


// 重点是 msg.target == null,如果满足说明这个 msg 是一个障栅消息
// 那么就要阻塞这条消息之后的所有同步消息,而放行异步消息
if (msg != null && msg.target == null) {
// Stalled by a barrier. Find the next asynchronous message in the queue.


// 循环结束后有两种情况:
// 1. 整个消息队列中都没有异步消息,则终止条件为:msg == null
// 2. 找到了异步消息,则终止条件为:msg.isAsynchronous() == true
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
// msg != null 有两种可能:
// 1. 消息队列没阻塞,msg 是一条同步消息
// 2. 消息队列被障栅消息阻塞了,但是找到了异步消息


if (now < msg.when) {
// 说明还没到需要分发消息的时候,
// 则计算到需要分发消息中间的间隔,
// 在间隔时间后再唤醒消息队里进行分发


// Next message is not ready. Set a timeout to wake up when it is ready.
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// 说明到了需要分发消息的时候


// Got a message.


// 开始分发消息了,不是阻塞状态
mBlocked = false;


// 由于将这条消息分发出去了,相当于要从链表里去掉这个消息
// 需要判断当前消息是不是第一个消息
// prevMsg 表示该 msg 的前一个消息
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}


// 由于该消息即将被分发出去,因此该消息的下一条就没用了
// 置为空,去掉无用引用,防止内存溢出
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);


// 把该 msg 标识为“使用中”
msg.markInUse();


// 返回这条 msg 作为需要处理的消息
return msg;
}
} else {
// 说明 msg == null,此时有两种可能:
// 1. 已经没有消息需要分发
// 2. 当前队列被障栅消息阻塞了,却又没有异步消息(不论是否有同步消息)


// No more messages.
nextPollTimeoutMillis = -1;
}

// Process the quit message now that all pending messages have been handled.
if (mQuitting) {
// 如果当前消息队列是正在退出的状态
// 则调用 dispose() 销毁 Native 层的消息队列
dispose();
return null;
}


// ======================
// 以下为 IdleHandler 部分
// ======================


// If first time idle, then get the number of idlers to run.
// Idle handles only run if the queue is empty or if the first message
// in the queue (possibly a barrier) is due to be handled in the future.
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {
// No idle handlers to run. Loop and wait some more.
mBlocked = true;
continue;
}

if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}

// Run the idle handlers.
// We only ever reach this code block during the first iteration.
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler

boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}

if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}

// Reset the idle handler count to 0 so we do not run them again.
pendingIdleHandlerCount = 0;

// While calling an idle handler, a new message could have been delivered
// so go back and look again for a pending message without waiting.
nextPollTimeoutMillis = 0;
}
}

基本的解读都在代码注释中了,还是比较通俗易懂的。重点注意专门加的这句注释:// 以下为 IdleHandler 部分,可以看看这句注释以上的部分,首先是判断了下一条消息,然后判断了当前消息队列是否正在退出,这个 next() 方法基本上要么返回了下一条需要分发的 Message,要么返回了 null,只有以下几种情况时才能执行到这句注释的地方:

  1. 当前消息队列没有任何消息了,且不在退出状态。
  2. 当前消息队列没有被障栅消息阻塞,且没有同步消息,也没有异步消息,且不在退出状态。
  3. 当前消息队列没有被障栅消息阻塞,没有同步消息,虽然有异步消息,但还没有到需要分发的时间,且不在退出状态。
  4. 当前消息队列被障栅消息阻塞了,但没有异步消息(不论是否有同步消息),且不在退出状态。
  5. 当前消息队列被障栅消息阻塞了,虽然有异步消息(不论是否有同步消息),但还没有到需要分发的时间,且不在退出状态。

以上几种情况中,(2)和(3)其实可以归到一起,(3)和(4)也可以归到一起,再进一步合并逻辑,其实就代表了这么一种状态:消息队列当前没有任务,而且也不在退出状态。只有这个状态下,才会调用 IdleHandler.queueIdle,具体的任务则由 IdleHandler 接口的具体实现决定了。消息队列中对 IdleHandler 的处理比较简单,注释也已经讲得比较详细,就不另行分析了。


4. MessageQueue总结

通过以上的源码分析,MessageQueue 的工作流程也就清楚了:

  1. 初始化 MessageQueue,同时初始化 Native 层的 MessageQueue。
  2. 通过 Handler 添加或移除 Message。
  3. 线程执行的过程中,不断调用 next() 方法来获取需要分发处理的消息。
  4. 如果线程中有需要分发处理的消息,则正常分发。
  5. 如果线程中已经没有或暂时没有需要分发处理的普通消息,则去处理 Idle 事务。
  6. 如果既没有普通消息需要分发处理,也没有 Idle 事务,则阻塞消息队列,使其进入等待新消息的状态。
  7. 如果需要退出消息队列,则调用 dispose() 方法同步销毁 Native 层的 MessageQueue。