当前位置: 移动技术网 > IT编程>移动开发>Android > Android7.0 MessageQueue详解

Android7.0 MessageQueue详解

2019年07月24日  | 移动技术网IT编程  | 我要评论

最美的时光歌曲,k446,民和牧业

android中的消息处理机制大量依赖于handler。每个handler都有对应的looper,用于不断地从对应的messagequeue中取出消息处理。

一直以来,觉得messagequeue应该是java层的抽象,然而事实上messagequeue的主要部分在native层中。
自己对messagequeue在native层的工作不太熟悉,借此机会分析一下。

一、messagequeue的创建

当需要使用looper时,我们会调用looper的prepare函数:

public static void prepare() {
 prepare(true);
}

private static void prepare(boolean quitallowed) {
 if (sthreadlocal.get() != null) {
 throw new runtimeexception("only one looper may be created per thread");
 }
 //sthreadlocal为线程本地存储区;每个线程仅有一个looper
 sthreadlocal.set(new looper(quitallowed));
}

private looper(boolean quitallowed) {
 //创建出messagequeue
 mqueue = new messagequeue(quitallowed);
 mthread = thread.currentthread();
}

1 nativemessagequeue

我们看看messagequeue的构造函数:

messagequeue(boolean quitallowed) {
 mquitallowed = quitallowed;
 //mptr的类型为long?
 mptr = nativeinit();
}

messagequeue的构造函数中就调用了native函数,我们看看android_os_messagequeue.cpp中的实现:

static jlong android_os_messagequeue_nativeinit(jnienv* env, jclass clazz) {
 //messagequeue的native层实体
 nativemessagequeue* nativemessagequeue = new nativemessagequeue();
 ............
 //这里应该类似与将指针转化成long类型,放在java层保存;估计java层使用时,会在native层将long变成指针,就可以操作队列了
 return reinterpret_cast<jlong>(nativemessagequeue);
}

我们跟进nativemessagequeue的构造函数:

nativemessagequeue::nativemessagequeue() :
 mpollenv(null), mpollobj(null), mexceptionobj(null) {
 //创建一个native层的looper,也是线程唯一的
 mlooper = looper::getforthread();
 if (mlooper == null) {
 mlooper = new looper(false);
 looper::setforthread(mlooper);
 }
}

从代码来看,native层和java层均有looper对象,应该都是操作messagequeue的。messagequeue在java层和native层有各自的存储结构,分别存储java层和native层的消息。

2 native层的looper

我们看看native层looper的构造函数:

looper::looper(bool allownoncallbacks) :
 mallownoncallbacks(allownoncallbacks), msendingmessage(false),
 mpolling(false), mepollfd(-1), mepollrebuildrequired(false),
 mnextrequestseq(0), mresponseindex(0), mnextmessageuptime(llong_max) {
 //此处创建了个fd
 mwakeeventfd = eventfd(0, efd_nonblock | efd_cloexec);
 .......
 rebuildepolllocked();
}

在native层中,messagequeue中的looper初始化时,还调用了rebuildepolllocked函数,我们跟进一下:

void looper::rebuildepolllocked() {
 // close old epoll instance if we have one.
 if (mepollfd >= 0) {
 close(mepollfd);
 }

 // allocate the new epoll instance and register the wake pipe.
 mepollfd = epoll_create(epoll_size_hint);
 ............
 struct epoll_event eventitem;
 memset(& eventitem, 0, sizeof(epoll_event)); // zero out unused members of data field union
 eventitem.events = epollin;
 eventitem.data.fd = mwakeeventfd;
 //在mepollfd上监听mwakeeventfd上是否有数据到来
 int result = epoll_ctl(mepollfd, epoll_ctl_add, mwakeeventfd, & eventitem);
 ...........
 for (size_t i = 0; i < mrequests.size(); i++) {
 const request& request = mrequests.valueat(i);
 struct epoll_event eventitem;
 request.initeventitem(&eventitem);
 //监听request对应fd上数据的到来
 int epollresult = epoll_ctl(mepollfd, epoll_ctl_add, request.fd, & eventitem);
 ............
 }
}

从native层的looper来看,我们知道native层依赖于epoll来驱动事件处理。此处我们先保留一下大致的映像,后文详细分析。

二、使用messagequeue

1 写入消息
 android中既可以在java层向messagequeue写入消息,也可以在native层向messagequeue写入消息。我们分别看一下对应的操作流程。

1.1 java层写入消息
java层向messagequeue写入消息,依赖于enqueuemessage函数:

boolean enqueuemessage(message msg, long when) {
 if (msg.target == null) {
 throw new illegalargumentexception("message must have a target.");
 }
 if (msg.isinuse()) {
 throw new illegalstateexception(msg + " this message is already in use.");
 }

 synchronized (this) {
 if (mquitting) {
  .....
  return false;
 }

 msg.markinuse();
 msg.when = when;
 message p = mmessages;
 boolean needwake;
 if (p == null || when == 0 || when < p.when) {
  // new head, wake up the event queue if blocked.
  msg.next = p;
  mmessages = msg;
  //在头部插入数据,如果之前messagequeue是阻塞的,那么现在需要唤醒
  needwake = mblocked;
 } else {
  // inserted within the middle of the queue. usually we don't have to wake
  // up the event queue unless there is a barrier at the head of the queue
  // and the message is the earliest asynchronous message in the queue.
  needwake = mblocked && p.target == null && msg.isasynchronous();
  message prev;
  for (;;) {
  prev = p;
  p = p.next;
  if (p == null || when < p.when) {
   break;
  }
  //不是第一个异步消息时,needwake置为false
  if (needwake && p.isasynchronous()) {
   needwake = false;
  }
  }
  msg.next = p; // invariant: p == prev.next
  prev.next = msg;
 }
 // we can assume mptr != 0 because mquitting is false.
 if (needwake) {
  nativewake(mptr);
 }
 }
 return true;
}

上述代码比较简单,主要就是将新加入的message按执行时间插入到原有的队列中,然后根据情况调用nativeawake函数。

我们跟进一下nativeawake:

void nativemessagequeue::wake() {
 mlooper->wake();
}

void looper::wake() {
 uint64_t inc = 1;
 //就是向mwakeeventfd写入数据
 ssize_t nwrite = temp_failure_retry(write(mwakeeventfd, &inc, sizeof(uint64_t)));
 .............
}

在native层的looper初始化时,我们提到过native层的looper将利用epoll来驱动事件,其中构造出的epoll句柄就监听了mwakeeventfd。
实际上从messagequeue中取出数据时,若没有数据到来,就会利用epoll进行等待;因此当java层写入消息时,将会将唤醒处于等待状态的messagequeue。
在后文介绍从messagequeue中提取消息时,将再次分析这个问题。

1.2 native层写入消息
native层写入消息,依赖于native层looper的sendmessage函数:

void looper::sendmessage(const sp<messagehandler>& handler, const message& message) {
 nsecs_t now = systemtime(system_time_monotonic);
 sendmessageattime(now, handler, message);
}

void looper::sendmessageattime(nsecs_t uptime, const sp<messagehandler>& handler,
 const message& message) {
 size_t i = 0;
 {
 automutex _l(mlock);

 //同样需要按时间插入
 size_t messagecount = mmessageenvelopes.size();
 while (i < messagecount && uptime >= mmessageenvelopes.itemat(i).uptime) {
  i += 1;
 }

 //将message包装成一个messageenvelope对象
 messageenvelope messageenvelope(uptime, handler, message);
 mmessageenvelopes.insertat(messageenvelope, i, 1);

 // optimization: if the looper is currently sending a message, then we can skip
 // the call to wake() because the next thing the looper will do after processing
 // messages is to decide when the next wakeup time should be. in fact, it does
 // not even matter whether this code is running on the looper thread.
 if (msendingmessage) {
  return;
 }
 }
 // wake the poll loop only when we enqueue a new message at the head.
 if (i == 0) {
 //若插入在队列头部,同样利用wake函数触发epoll唤醒
 wake();
 }
}

以上就是向messagequeue中加入消息的主要流程,接下来我们看看从messagequeue中取出消息的流程。

2、提取消息
当java层的looper对象调用loop函数时,就开始使用messagequeue提取消息了:

public static void loop() {
 final looper me = mylooper();
 .......
 for (;;) {
 message msg = queue.next(); // might block
 .......
 try {
  //调用message的处理函数进行处理
  msg.target.dispatchmessage(msg);
 }........
 }
}

此处我们看看messagequeue的next函数:

message next() {
 //mptr保存了nativemessagequeue的指针
 final long ptr = mptr;
 .......
 int pendingidlehandlercount = -1; // -1 only during first iteration
 int nextpolltimeoutmillis = 0;

 for (;;) {
 if (nextpolltimeoutmillis != 0) {
  //会调用native函数,最终调用ipcthread的talkwithdriver,将数据写入binder驱动或者读取一次数据
  //不知道在此处进行这个操作的理由?
  binder.flushpendingcommands();
 }

 //处理native层的数据,此处会利用epoll进行blocked
 nativepollonce(ptr, nextpolltimeoutmillis);

 synchronized (this) {
  final long now = systemclock.uptimemillis();
  message prevmsg = null;
  message msg = mmessages;
  //下面其实就是找出下一个异步处理类型的消息;异步处理类型的消息,才含有对应的执行函数
  if (msg != null && msg.target == null) {
  // stalled by a barrier. find the next asynchronous message in the queue.
  do {
   prevmsg = msg;
   msg = msg.next;
  } while (msg != null && !msg.isasynchronous());
  }

  if (msg != null) {
  if (now < msg.when) {
   // next message is not ready. set a timeout to wake up when it is ready.
   nextpolltimeoutmillis = (int) math.min(msg.when - now, integer.max_value);
  } else {
   // got a message.
   mblocked = false;
   //完成next记录的存储
   if (prevmsg != null) {
   prevmsg.next = msg.next;
   } else {
   mmessages = msg.next;
   }
   msg.next = null;
   if (debug) log.v(tag, "returning message: " + msg);
   msg.markinuse();
   return msg;
  }
  } else {
  // no more messages.
  nextpolltimeoutmillis = -1;
  }

  // process the quit message now that all pending messages have been handled.
  if (mquitting) {
  dispose();
  return null;
  }

  //messagequeue中引入了idlehandler接口,即当messagequeue没有数据处理时,调用idlehandler进行一些工作

  //pendingidlehandlercount表示待处理的idlehandler,初始为-1
  if (pendingidlehandlercount < 0
   && (mmessages == null || now < mmessages.when)) {
  //midlehandlers的size默认为0,调用接口addidlehandler才能增加
  pendingidlehandlercount = midlehandlers.size();
  }

  if (pendingidlehandlercount <= 0) {
  // no idle handlers to run. loop and wait some more.
  mblocked = true;
  continue;
  }

  //将待处理的idlehandler加入到pendingidlehandlers中
  if (mpendingidlehandlers == null) {
  mpendingidlehandlers = new idlehandler[math.max(pendingidlehandlercount, 4)];
  }
  //调用arraylist.toarray(t[])节省每次分配的开销;毕竟对于message.next这样调用频率较高的函数,能省一点就是一点
  mpendingidlehandlers = midlehandlers.toarray(mpendingidlehandlers);
 }

 for (int i = 0; i < pendingidlehandlercount; i++) {
  final idlehandler idler = mpendingidlehandlers[i];
  mpendingidlehandlers[i] = null; // release the reference to the handler

  boolean keep = false;
  try {
  //执行实现类的queueidle函数,返回值决定是否继续保留
  keep = idler.queueidle();
  } catch (throwable t) {
  log.wtf(tag, "idlehandler threw exception", t);
  }

  if (!keep) {
  synchronized (this) {
   midlehandlers.remove(idler);
  }
  }
 }
 pendingidlehandlercount = 0;
 nextpolltimeoutmillis = 0;
 }
}

整个提取消息的过程,大致上如上图所示。
可以看到在java层,looper除了要取出messagequeue的消息外,还会在队列空闲期执行idlehandler定义的函数。

2.1 nativepollonce
现在唯一的疑点是nativepollonce是如何处理native层数据的,我们看看对应的native函数:

static void android_os_messagequeue_nativepollonce(jnienv* env, jobject obj,
 jlong ptr, jint timeoutmillis) {
 //果然java层调用native层messagequeue时,将long类型的ptr变为指针
 nativemessagequeue* nativemessagequeue = reinterpret_cast<nativemessagequeue*>(ptr);
 nativemessagequeue->pollonce(env, obj, timeoutmillis);
}

void nativemessagequeue::pollonce(jnienv* env, jobject pollobj, int timeoutmillis) {
 mpollenv = env;
 mpollobj = pollobj;
 //最后还是进入到native层looper的pollonce函数
 mlooper->pollonce(timeoutmillis);
 mpollobj = null;
 mpollenv = null;

 if (mexceptionobj) {
 .........
 }
}

看看native层looper的pollonce函数:

//timeoutmillis为超时等待时间。值为-1时,表示无限等待直到有事件到来;值为0时,表示无需等待
//outfd此时为null,含义是:存储产生事件的文件句柄
//outevents此时为null,含义是:存储outfd上发生了哪些事件,包括可读、可写、错误和中断
//outdata此时为null,含义是:存储上下文数据,其实调用时传入的参数
int looper::pollonce(int timeoutmillis, int* outfd, int* outevents, void** outdata) {
 int result = 0;
 for (;;) {
 //处理response,目前我们先不关注response的内含
 while (mresponseindex < mresponses.size()) {
  const response& response = mresponses.itemat(mresponseindex++);
  int ident = response.request.ident;
  if (ident >= 0) {
  int fd = response.request.fd;
  int events = response.events;
  void* data = response.request.data;

  if (outfd != null) *outfd = fd;
  if (outevents != null) *outevents = events;
  if (outdata != null) *outdata = data;
  return ident;
  }
 }

 //根据pollinner的结果,进行操作
 if (result != 0) {
  if (outfd != null) *outfd = 0;
  if (outevents != null) *outevents = 0;
  if (outdata != null) *outdata = null;
  return result;
 }

 //主力还是靠pollinner
 result = pollinner(timeoutmillis);
 }
}

跟进一下pollinner函数:

int looper::pollinner(int timeoutmillis) {
 // adjust the timeout based on when the next message is due.
 //timeoutmillis是java层事件等待事件
 //native层维持了native message的等待时间
 //此处其实就是选择最小的等待时间
 if (timeoutmillis != 0 && mnextmessageuptime != llong_max) {
  nsecs_t now = systemtime(system_time_monotonic);
  int messagetimeoutmillis = tomillisecondtimeoutdelay(now, mnextmessageuptime);
  if (messagetimeoutmillis >= 0
  && (timeoutmillis < 0 || messagetimeoutmillis < timeoutmillis)) {
  timeoutmillis = messagetimeoutmillis;
 }
 }

 int result = poll_wake;
 //pollinner初始就清空response
 mresponses.clear();
 mresponseindex = 0;

 // we are about to idle.
 mpolling = true;

 //利用epoll等待mepollfd监控的句柄上事件到达
 struct epoll_event eventitems[epoll_max_events];
 int eventcount = epoll_wait(mepollfd, eventitems, epoll_max_events, timeoutmillis);

 // no longer idling.
 mpolling = false;

 // acquire lock.
 mlock.lock();

 //重新调用rebuildepolllocked时,将使得epoll句柄能够监听新加入request对应的fd
 if (mepollrebuildrequired) {
 mepollrebuildrequired = false;
 rebuildepolllocked();
 goto done;
 }

 // check for poll error.
 if (eventcount < 0) {
 if (errno == eintr) {
  goto done;
 }
 ......
 result = poll_error;
 goto done;
 }

 // check for poll timeout.
 if (eventcount == 0) {
 result = poll_timeout;
 goto done;
 }

 for (int i = 0; i < eventcount; i++) {
 if (fd == mwakeeventfd) {
  if (epollevents & epollin) {
  //前面已经分析过,当java层或native层有数据写入队列时,将写mwakeeventfd,以触发epoll唤醒
  //awoken将读取并清空mwakeeventfd上的数据
  awoken();
  } else {
  .........
  }
 } else {
  //epoll同样监听的request对应的fd
  ssize_t requestindex = mrequests.indexofkey(fd);
  if (requestindex >= 0) {
  int events = 0;
  if (epollevents & epollin) events |= event_input;
  if (epollevents & epollout) events |= event_output;
  if (epollevents & epollerr) events |= event_error;
  if (epollevents & epollhup) events |= event_hangup;
  //存储这个fd对应的response
  pushresponse(events, mrequests.valueat(requestindex));
  } else {
  ..........
  }
 }
 }

done:

 // invoke pending message callbacks.
 mnextmessageuptime = llong_max;
 //处理native层的message
 while (mmessageenvelopes.size() != 0) {
 nsecs_t now = systemtime(system_time_monotonic);
 const messageenvelope& messageenvelope = mmessageenvelopes.itemat(0);
 if (messageenvelope.uptime <= now) {
  // remove the envelope from the list.
  // we keep a strong reference to the handler until the call to handlemessage
  // finishes. then we drop it so that the handler can be deleted *before*
  // we reacquire our lock.
  {
  sp<messagehandler> handler = messageenvelope.handler;
  message message = messageenvelope.message;
  mmessageenvelopes.removeat(0);
  msendingmessage = true;
  mlock.unlock();

  //处理native message
  handler->handlemessage(message);
  }
  mlock.lock();
  msendingmessage = false;
  result = poll_callback;
 } else {
  // the last message left at the head of the queue determines the next wakeup time.
  mnextmessageuptime = messageenvelope.uptime;
  break;
 }
 }

 // release lock.
 mlock.unlock();

 //处理带回调函数的response
 for (size_t i = 0; i < mresponses.size(); i++) {
 response& response = mresponses.edititemat(i);
 if (response.request.ident == poll_callback) {
  int fd = response.request.fd;
  int events = response.events;
  void* data = response.request.data;

  //调用response的callback
  int callbackresult = response.request.callback->handleevent(fd, events, data);
  if (callbackresult == 0) {
  removefd(fd, response.request.seq);
  }

  response.request.callback.clear();
  result = poll_callback;
 }
 }
 return result;
}

说实话native层的代码写的很乱,该函数的功能比较多。
如上图所示,在nativepollonce中利用epoll监听是否有数据到来,然后处理native message、native response。

最后,我们看看如何在native层中加入request。

3 添加监控请求
native层增加request依赖于looper的接口addfd:

//fd表示需要监听的句柄
//ident的含义还没有搞明白
//events表示需要监听的事件,例如event_input、event_output、event_error和event_hangup中的一个或多个
//callback为事件发生后的回调函数
//data为回调函数对应的参数
int looper::addfd(int fd, int ident, int events, looper_callbackfunc callback, void* data) {
 return addfd(fd, ident, events, callback ? new simpleloopercallback(callback) : null, data);
}

结合上文native层轮询队列的操作,我们大致可以知道:addfd的目的,就是让native层的looper监控新加入的fd上是否有指定事件发生。
如果发生了指定的事件,就利用回调函数及参数构造对应的response。
native层的looper处理response时,就可以执行对应的回调函数了。

看看实际的代码:

int looper::addfd(int fd, int ident, int events, const sp<loopercallback>& callback, void* data) {
 ........
 {
 automutex _l(mlock);

 //利用参数构造一个request
 request request;
 request.fd = fd;
 request.ident = ident;
 request.events = events;
 request.seq = mnextrequestseq++;
 request.callback = callback;
 request.data = data;
 if (mnextrequestseq == -1) mnextrequestseq = 0; // reserve sequence number -1

 struct epoll_event eventitem;
 request.initeventitem(&eventitem);

 //判断之前是否已经利用该fd构造过request
 ssize_t requestindex = mrequests.indexofkey(fd);
 if (requestindex < 0) {
  //mepollfd新增一个需监听fd
  int epollresult = epoll_ctl(mepollfd, epoll_ctl_add, fd, & eventitem);
  .......
  mrequests.add(fd, request);
 } else {
  //mepollfd修改旧的fd对应的监听事件
  int epollresult = epoll_ctl(mepollfd, epoll_ctl_mod, fd, & eventitem);
  if (epollresult < 0) {
  if (errno == enoent) {
   // tolerate enoent because it means that an older file descriptor was
   // closed before its callback was unregistered and meanwhile a new
   // file descriptor with the same number has been created and is now
   // being registered for the first time. 
   epollresult = epoll_ctl(mepollfd, epoll_ctl_add, fd, & eventitem);
   .......
  }
  //发生错误重新加入时,安排epollrebuildlocked,将让epollfd重新添加一次待监听的fd
  scheduleepollrebuildlocked();
  }
  mrequests.replacevalueat(requestindex, request);
 }
 }
}

对加入监控请求的处理,在上文介绍pollinner函数时已做分析,此处不再赘述。

三、总结

1、流程总结


messagequeue的整个流程包括了java部分和native部分,从图中可以看出native层的比重还是很大的。我们结合上图回忆一下整个messagequeue对应的处理流程:
1、java层创建looper对象时,将会创建java层的messagequeue;java层的messagequeue初始化时,将利用native函数创建出native层的messagequeue。

2、native层的messagequeue初始化后,将创建对应的native looper对象。native对象初始化时,将创建对应epollfd和wakeeventfd。其中,epollfd将作为epoll的监听句柄,初始时epollfd仅监听wakeeventfd。

3、图中红色线条为looper从messagequeue中取消息时,处理逻辑的流向。
3.1、当java层的looper开始循环时,首先需要通过jni函数调用native looper进行pollonce的操作。

3.2、native looper开始运行后,需要等待epollfd被唤醒。当epollfd等待超时或监听的句柄有事件到来,native looper就可以开始处理事件了。

3.3、在native层,native looper将先处理native messagequeue中的消息,再调用response对应的回调函数。

3.4、本次循环中,native层事件处理完毕后,才开始处理java层中messagequeue的消息。若messagequeue中没有消息需要处理,并且messagequeue中存在idlehandler时,将调用idlehandler定义的处理函数。

图中蓝色部分为对应的函数调用:
在java层:
利用messagequeue的addidlehandler,可以为messagequeue增加idlehandler;
利用messagequeue的enqueuemessage,可以向messagequeue增加消息;必要时将利用native函数向native层的wakeeventfd写入消息,以唤醒epollfd。

在native层:
利用looper:sendmessage,可以为native messagequeue增加消息;同样,要时将向native层的wakeeventfd写入消息,以唤醒epollfd;
利用looper:addfd,可以向native looper注册监听请求,监听请求包含需监听的fd、监听的事件及对应的回调函数等,监听请求对应的fd将被成为epollfd监听的对象。当被监听的fd发生对应的事件后,将会唤醒epollfd,此时将生成对应response加入的response list中,等待处理。一旦response被处理,就会调用对应的回调函数。

2、注意事项
messagequeue在java层和native层有各自的存储结构,可以分别增加消息。从处理逻辑来看,会优先处理native层的message,然后处理native层生成的response,最后才是处理java层的message。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持移动技术网。

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网