异步处理
Handler
A Handler allows you to send and process Message and Runnable objects associated with a thread's MessageQueue. Each Handler instance is associated with a single thread and that thread's message queue.
从描述可以总结出Handler一些特性:
- 每个Handler实例都会与一个线程以及线程的messageQueue关联.
- 发送消息.
- 处理消息,处理Runnable.
Handler的处理机制示意图
从上图可以看出其实,Handler机制就是:Looper不断从MessageQueue中获取Message,然后Hanlder处理消息
从上图可以看出要分析的问题:
- looper如何从MessageQueue中获取需要处理的message
- Handler如何处理消息
- Handler如何发送消息
再讲前面的问题之前还是要先了解一个问题,就是Handler实例如何与线程,线程的messageQueue关联的?
Handler与线程,MessageQueue关联
下面以默认构造函数来说明.
public Handler() {
this(null, false);
}
public Handler(Callback callback, boolean async) {
...
mLooper = Looper.myLooper(); //获取looper
if (mLooper == null) {
throw new RuntimeException(
"Can't create handler inside thread that has not called Looper.prepare()");
}
mQueue = mLooper.mQueue; //从looper中获取MessageQueue
mCallback = callback;
...
}
从上面看能看出是looper中获取MessageQueue,并没有与什么线程关联? 其实玄机在于Looper.myLooper()中. 下面是Looper类用于说明原因.
public final class Looper {
static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}
public static Looper myLooper() {
return sThreadLocal.get(); //从sThreadLocal获取中looper,而添加是在prepare中.
}
}
其实每个Thread只能对应一个Looper,也可以不对应Looper,而每个Handler必须对应一个Looper.
Looper如何从MessageQueue中获取需要处理的message
对于Looper从MessageQueue是在looper类的loope()方法中,loope()函数一直从MessageQueue中获取Message,获取到了message然后调用message.target.dispatchMessage(),message.target其实就是Handler本身,message.target的设置是在发送消息时设置的.
Handler如何处理消息
其实Handler机制图中已经很清楚了,就是dispatchMessage()分发消息,调用恰当的函数处理.
public void dispatchMessage(Message msg) {
if (msg.callback != null) { //基本是通过Handler的post的相关的函数来发送的消息时调用
handleCallback(msg);
} else {
if (mCallback != null) { //通过基本的Handler的构造函数
if (mCallback.handleMessage(msg)) {
return;
}
}
handleMessage(msg); //上述的都为null,最后交有handleMessage()方法处理
}
}
Handler如何发送消息
通过两种方式post()相关的函数,sendMessage()相关的函数.而这两种方式最终都会调用到sendMessageAtTime()发送message.
- post()传入的是一个Runnable,然后有调用sendMessageAtTime()并把Runnable封装成message,message.callback为Runnable,有Handler消息处理机制可以知道,处理消息时优先调用message.callback,从而Runnable得以处理.
- sendMessage()的方法最终会调用sendMessageAtTime()方法,然后调用enqueueMessage()方法,而在enqueueMessage()方法中会设置message.target为此Handler,这样looper获取到的message可以调用message.target来处理了.
Thread
Thread类对于开发者并不陌生,两种实现方式,然后调用start()方法就开启了线程.而在Android中Thread经常与Looper的结合使用.下面以Android中的HandlerThread为例:
主要看run()方法.
public void run() {
...
Looper.prepare(); //1. 做好准备工作 主要是创建looper,并创建MssageQueue,获取当前线程
synchronized (this) {
mLooper = Looper.myLooper();
notifyAll();
}
onLooperPrepared();
Looper.loop(); //2. 开始循环,获取MessageQueue中的message
...
}
在Android创建线程,如果涉及到长期处理消息,就使用HandlerThread,然后使用Handler处理消息,效果很佳,对于休眠,退出的设计都不需要自己处理.
Notices:
- 不需要线程时,要调用quit()相关的方法.
- 一定要调用start()方法以后,再获取线程中的looper,不然获取的looper为null,因为创建looper是在run()方法中.
AsyncQueryHandler
AsyncQueryHandler主要用途是使ContentProvider查询等操作更容易.
AsyncQueryHandler继承自Handler,并且内部还有工作线程WorkHandler,工作原理就是AsyncQueryHandler(主线程中) 发送消息到WorkHandler(工作线程中)查询,查询完成,发送消息到AsyncQueryHandler(主线程中)从而更新界面等操作. 现在从源码分析原理.
先分析构造函数
public AsyncQueryHandler(ContentResolver cr) {
super();
synchronized (AsyncQueryHandler.class) {
//启动线程
if (sLooper == null) {
HandlerThread thread = new HandlerThread("AsyncQueryWorker");
thread.start();
sLooper = thread.getLooper();
}
}
mWorkerThreadHandler = createHandler(sLooper);
}
启动一个工作线程(HandlerThread上面有说过),获取looper,然后通过createHandler()创建一个Handler对象. 分析createHandler()方法
protected Handler createHandler(Looper looper) {
return new WorkerHandler(looper);
}
只是创建了一个WorkerHandler对象,并且关联获取的looper,这样handler处理消息,就在工作线程里了.对于WorkerHandler比较简单,就不分析了.
那startQuery(),startInsert(),如何工作的呢?
下面已startQuery()分析为例.
public void startQuery(int token, Object cookie, Uri uri,
String[] projection, String selection, String[] selectionArgs,
String orderBy) {
...
mWorkerThreadHandler.sendMessage(msg);
}
很简单,使用mWorkerThreadHandler发送消息.
然后mWorkerThreadHandler关联的looper取出消息,交由mWorkerThreadHandler处理.
protected class WorkerHandler extends Handler {
@Override
public void handleMessage(Message msg) {
WorkerArgs args = (WorkerArgs) msg.obj;
......
switch (event) {
case EVENT_ARG_QUERY:
Cursor cursor;
......
cursor = resolver.query(args.uri, args.projection,
args.selection, args.selectionArgs,
args.orderBy);
......
args.result = cursor;
break;
}
Message reply = args.handler.obtainMessage(token);
reply.obj = args;
reply.arg1 = msg.arg1;
reply.sendToTarget();
}
}
查询数据库,组织数据,从AsyncQueueHandler中获取message,函数中设置了message.target,然后通过sendToTarget()发送消息.这样就轮到AsyncQueueHandler处理消息了.
public void handleMessage(Message msg) {
switch (event) {
case EVENT_ARG_QUERY:
onQueryComplete(token, args.cookie, (Cursor) args.result);
break;
}
只是调用了onQueryComplete()方法,而onQueryComplete()什么都没做.可以覆写,实现自己的逻辑.
AsyncTask
对于AsyncTask也是Thread,Handler执行task,只是线程的创建方式,返回值做了修改。
Thread如何获取返回值
一般我们创建线程覆写Thread的run方法,或者是通过runnable接口创建线程,这样不会获取Thread的返回值,而想要获取Thread的返回值,就需要使用Future,callable接口。
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class CallableTest {
public static void main(String[] args){
FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
@Override
public String call() throws Exception {
return "Test";
}
});
Thread thread = new Thread(futureTask);
thread.start();
try {
String test = futureTask.get();
System.out.print(test);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
FutureTask实现Future接口 FutureTask中的get()这个方法会产生阻塞,会一直等到任务执行完毕才返回
使用Executor创建线程
Executor只是Java1.5引入的Executor框架的一个接口。
public interface Executor {
void execute(Runnable command);
}
可以在execute中构造线程池进行创建线程。
AsyncTask分析
- FutureTask
public AsyncTask() {
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
return postResult(doInBackground(mParams));
}
};
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
......
postResultIfNotInvoked(get());
......
};
mWork是WorkerRunnable的实现的对象,WorkerRunnable实现了Callable接口,在call()方法中,会调用 doInBackground()处理,然后通过postResult()通知InnerHandler处理消息。
private void finish(Result result) {
if (isCancelled()) {
onCancelled(result);
} else {
onPostExecute(result);
}
mStatus = Status.FINISHED;
}
private static class InternalHandler extends Handler {
public InternalHandler() {
super(Looper.getMainLooper());
}
public void handleMessage(Message msg) {
AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
switch (msg.what) {
case MESSAGE_POST_RESULT:
// There is only one result
result.mTask.finish(result.mData[0]);
break;
case MESSAGE_POST_PROGRESS:
result.mTask.onProgressUpdate(result.mData);
break;
}
}
}
对于MESSAGE_POST_RESULT消息,会调用AsyncTask的finish()方法,在finish()中会调用onPostExecute(result),可以覆写此方法,来更新界面等操作。
- Executor
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
private static class SerialExecutor implements Executor {
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
Runnable mActive;
public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (mActive == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
默认的Executor是sDefaultExecutor, sDefaultExecutor其实是SERIAL_EXECUTOR,而SERIAL_EXECUTOR是SerialExecutor对象,SerialExecutor的executor()方法中会使用THREAD_POOL_EXECUTOR线程池来创建线程。
THREAD_POOL_EXECUTOR
public static final Executor THREAD_POOL_EXECUTOR
= new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE,
TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);
创建线程池,其中指出了最大的线程数,最大的队列数,线程工厂对象等。
AsyncTask运行
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
return executeOnExecutor(sDefaultExecutor, params);
}
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
Params... params) {
.......
onPreExecute();
mWorker.mParams = params;
exec.execute(mFuture);
return this;
}
调用onPreExecute(),可以覆写此方法,然后在线程运行前做些准备,比如进度条运行等。
然后调用Executor的execute()方法来开始运行线程。