异步处理一

异步处理

Handler

A Handler allows you to send and process Message and Runnable objects associated with a thread's MessageQueue. Each Handler instance is associated with a single thread and that thread's message queue.

从描述可以总结出Handler一些特性:

  1. 每个Handler实例都会与一个线程以及线程的messageQueue关联.
  2. 发送消息.
  3. 处理消息,处理Runnable.

Handler的处理机制示意图
Handler处理机制

从上图可以看出其实,Handler机制就是:Looper不断从MessageQueue中获取Message,然后Hanlder处理消息
从上图可以看出要分析的问题:

  1. looper如何从MessageQueue中获取需要处理的message
  2. Handler如何处理消息
  3. Handler如何发送消息

再讲前面的问题之前还是要先了解一个问题,就是Handler实例如何与线程,线程的messageQueue关联的?

Handler与线程,MessageQueue关联

下面以默认构造函数来说明.

public Handler() {
        this(null, false);
    }

	public Handler(Callback callback, boolean async) {
		...
        mLooper = Looper.myLooper();  //获取looper
        if (mLooper == null) {
            throw new RuntimeException(
                "Can't create handler inside thread that has not called Looper.prepare()");
        }
        mQueue = mLooper.mQueue;   //从looper中获取MessageQueue
        mCallback = callback;
		...
    }

从上面看能看出是looper中获取MessageQueue,并没有与什么线程关联? 其实玄机在于Looper.myLooper()中. 下面是Looper类用于说明原因.

public final class Looper {
	static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();

	private static void prepare(boolean quitAllowed) {
        if (sThreadLocal.get() != null) {
            throw new RuntimeException("Only one Looper may be created per thread");
        }
        sThreadLocal.set(new Looper(quitAllowed));
    }

	public static Looper myLooper() {
        return sThreadLocal.get();   //从sThreadLocal获取中looper,而添加是在prepare中.
    }
}

其实每个Thread只能对应一个Looper,也可以不对应Looper,而每个Handler必须对应一个Looper.

Looper如何从MessageQueue中获取需要处理的message

对于Looper从MessageQueue是在looper类的loope()方法中,loope()函数一直从MessageQueue中获取Message,获取到了message然后调用message.target.dispatchMessage(),message.target其实就是Handler本身,message.target的设置是在发送消息时设置的.

Handler如何处理消息

其实Handler机制图中已经很清楚了,就是dispatchMessage()分发消息,调用恰当的函数处理.

public void dispatchMessage(Message msg) {
        if (msg.callback != null) {    //基本是通过Handler的post的相关的函数来发送的消息时调用
            handleCallback(msg);
        } else {
            if (mCallback != null) {   //通过基本的Handler的构造函数
                if (mCallback.handleMessage(msg)) {
                    return;
                }
            }
            handleMessage(msg);       //上述的都为null,最后交有handleMessage()方法处理
        }
    }
Handler如何发送消息

通过两种方式post()相关的函数,sendMessage()相关的函数.而这两种方式最终都会调用到sendMessageAtTime()发送message.

  • post()传入的是一个Runnable,然后有调用sendMessageAtTime()并把Runnable封装成message,message.callback为Runnable,有Handler消息处理机制可以知道,处理消息时优先调用message.callback,从而Runnable得以处理.
  • sendMessage()的方法最终会调用sendMessageAtTime()方法,然后调用enqueueMessage()方法,而在enqueueMessage()方法中会设置message.target为此Handler,这样looper获取到的message可以调用message.target来处理了.

Thread

Thread类对于开发者并不陌生,两种实现方式,然后调用start()方法就开启了线程.而在Android中Thread经常与Looper的结合使用.下面以Android中的HandlerThread为例:
主要看run()方法.

public void run() {
		...
        Looper.prepare();      //1. 做好准备工作 主要是创建looper,并创建MssageQueue,获取当前线程
        synchronized (this) {
            mLooper = Looper.myLooper();
            notifyAll();
        }
        onLooperPrepared();
        Looper.loop();  //2. 开始循环,获取MessageQueue中的message
		...
    }

在Android创建线程,如果涉及到长期处理消息,就使用HandlerThread,然后使用Handler处理消息,效果很佳,对于休眠,退出的设计都不需要自己处理.

Notices:

  • 不需要线程时,要调用quit()相关的方法.
  • 一定要调用start()方法以后,再获取线程中的looper,不然获取的looper为null,因为创建looper是在run()方法中.

AsyncQueryHandler

AsyncQueryHandler主要用途是使ContentProvider查询等操作更容易.
AsyncQueryHandler继承自Handler,并且内部还有工作线程WorkHandler,工作原理就是AsyncQueryHandler(主线程中) 发送消息到WorkHandler(工作线程中)查询,查询完成,发送消息到AsyncQueryHandler(主线程中)从而更新界面等操作. 现在从源码分析原理.
先分析构造函数

public AsyncQueryHandler(ContentResolver cr) {
        super();
        synchronized (AsyncQueryHandler.class) {
        //启动线程
            if (sLooper == null) {
                HandlerThread thread = new HandlerThread("AsyncQueryWorker");
                thread.start();

                sLooper = thread.getLooper();
            }
        }
        mWorkerThreadHandler = createHandler(sLooper);
    }

启动一个工作线程(HandlerThread上面有说过),获取looper,然后通过createHandler()创建一个Handler对象. 分析createHandler()方法

protected Handler createHandler(Looper looper) {
        return new WorkerHandler(looper);
    }

只是创建了一个WorkerHandler对象,并且关联获取的looper,这样handler处理消息,就在工作线程里了.对于WorkerHandler比较简单,就不分析了.
那startQuery(),startInsert(),如何工作的呢?
下面已startQuery()分析为例.

public void startQuery(int token, Object cookie, Uri uri,
            String[] projection, String selection, String[] selectionArgs,
            String orderBy) {
		...
        mWorkerThreadHandler.sendMessage(msg);
    }

很简单,使用mWorkerThreadHandler发送消息.
然后mWorkerThreadHandler关联的looper取出消息,交由mWorkerThreadHandler处理.

protected class WorkerHandler extends Handler {
        @Override
        public void handleMessage(Message msg) {
            WorkerArgs args = (WorkerArgs) msg.obj;
			......
            switch (event) {
                case EVENT_ARG_QUERY:
                    Cursor cursor;
                    ......
                    cursor = resolver.query(args.uri, args.projection,
                            args.selection, args.selectionArgs,
                            args.orderBy);
					......
                    args.result = cursor;
                    break;
			}
			Message reply = args.handler.obtainMessage(token);
            reply.obj = args;
            reply.arg1 = msg.arg1;
            reply.sendToTarget();
		}
}

查询数据库,组织数据,从AsyncQueueHandler中获取message,函数中设置了message.target,然后通过sendToTarget()发送消息.这样就轮到AsyncQueueHandler处理消息了.

 public void handleMessage(Message msg) {
        switch (event) {
            case EVENT_ARG_QUERY:
                onQueryComplete(token, args.cookie, (Cursor) args.result);
                break;
    }

只是调用了onQueryComplete()方法,而onQueryComplete()什么都没做.可以覆写,实现自己的逻辑.

AsyncTask

对于AsyncTask也是Thread,Handler执行task,只是线程的创建方式,返回值做了修改。
Thread如何获取返回值
一般我们创建线程覆写Thread的run方法,或者是通过runnable接口创建线程,这样不会获取Thread的返回值,而想要获取Thread的返回值,就需要使用Future,callable接口。

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class CallableTest {
    public static void main(String[] args){
        FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "Test";
            }
        });
        Thread thread = new Thread(futureTask);
        thread.start();
        try {
            String test = futureTask.get();
            System.out.print(test);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

FutureTask实现Future接口 FutureTask中的get()这个方法会产生阻塞,会一直等到任务执行完毕才返回

使用Executor创建线程
Executor只是Java1.5引入的Executor框架的一个接口。

 public interface Executor {
	     void execute(Runnable command);
 }

可以在execute中构造线程池进行创建线程。

AsyncTask分析

  • FutureTask
 public AsyncTask() {
        mWorker = new WorkerRunnable<Params, Result>() {
            public Result call() throws Exception {
                mTaskInvoked.set(true);

                Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
                //noinspection unchecked
                return postResult(doInBackground(mParams));
            }
        };

        mFuture = new FutureTask<Result>(mWorker) {
            @Override
            protected void done() {
	           ......
               postResultIfNotInvoked(get());
			   ......
        };

mWork是WorkerRunnable的实现的对象,WorkerRunnable实现了Callable接口,在call()方法中,会调用 doInBackground()处理,然后通过postResult()通知InnerHandler处理消息。

    private void finish(Result result) {
        if (isCancelled()) {
            onCancelled(result);
        } else {
            onPostExecute(result);
        }
        mStatus = Status.FINISHED;
    }

    private static class InternalHandler extends Handler {
        public InternalHandler() {
            super(Looper.getMainLooper());
        }

        public void handleMessage(Message msg) {
            AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
            switch (msg.what) {
                case MESSAGE_POST_RESULT:
                    // There is only one result
                    result.mTask.finish(result.mData[0]);
                    break;
                case MESSAGE_POST_PROGRESS:
                    result.mTask.onProgressUpdate(result.mData);
                    break;
            }
        }
    }

对于MESSAGE_POST_RESULT消息,会调用AsyncTask的finish()方法,在finish()中会调用onPostExecute(result),可以覆写此方法,来更新界面等操作。

  • Executor
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;  

public static final Executor SERIAL_EXECUTOR = new SerialExecutor();  

private static class SerialExecutor implements Executor {
    final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
    Runnable mActive;

    public synchronized void execute(final Runnable r) {
        mTasks.offer(new Runnable() {
            public void run() {
                try {
                    r.run();
                } finally {
                    scheduleNext();
                }
            }
        });
        if (mActive == null) {
            scheduleNext();
        }
    }
    protected synchronized void scheduleNext() {
        if ((mActive = mTasks.poll()) != null) {
            THREAD_POOL_EXECUTOR.execute(mActive);
        }
    }
}

默认的ExecutorsDefaultExecutorsDefaultExecutor其实是SERIAL_EXECUTOR,而SERIAL_EXECUTORSerialExecutor对象,SerialExecutor的executor()方法中会使用THREAD_POOL_EXECUTOR线程池来创建线程。

THREAD_POOL_EXECUTOR

    public static final Executor THREAD_POOL_EXECUTOR
            = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE,
                    TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);

创建线程池,其中指出了最大的线程数,最大的队列数,线程工厂对象等。

AsyncTask运行

public final AsyncTask<Params, Progress, Result> execute(Params... params) {
        return executeOnExecutor(sDefaultExecutor, params);
    }  

public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
        Params... params) {
        .......
        onPreExecute();

        mWorker.mParams = params;
        exec.execute(mFuture);

        return this;
    }

调用onPreExecute(),可以覆写此方法,然后在线程运行前做些准备,比如进度条运行等。
然后调用Executor的execute()方法来开始运行线程。

updatedupdated2020-06-132020-06-13