LiteGo

Additional

Language
Java
Version
v1.0 (Apr 1, 2016)
Created
Mar 24, 2016
Updated
Jul 6, 2016 (Retired)
Owner
马天宇 (litesuits)
Contributor
马天宇 (litesuits)
1
Activity
Badge
Generate
Download
Source code

Show card

LiteGo:「迷你」的Android异步并发类库

LiteGo是一款基于Java语言的「异步并发类库」,它的核心是一枚「迷你」并发器,它可以自由地设置同一时段的最大「并发」数量,等待「排队」线程数量,还可以设置「排队策略」和「超载策略」。 LiteGo可以直接投入Runnable、Callable、FutureTask 等类型的实现来运行一个任务,它的核心组件是「SmartExecutor」,它可以用来作为「App」内支持异步并发的唯一组件。 在一个App中「SmartExecutor」可以有多个实例,每个实例都有完全的「独立性」,比如独立的「核心并发」、「排队等待」指标,独立的「运行调度和满载处理」策略,但所有实例「共享一个线程池」。 这种机制既满足不同模块对线程控制和任务调度的独立需求,又共享一个池资源来节省开销,最大程度上节约资源复用线程,帮助提升性能。

官网 : litesuits.com

QQ群 : 42960650



上述代码设计了一个可同时并发「2」个线程,并发满载后等待队列可容纳「2」个线程排队,排队队列中后进的任务先执行,等待队列装满后新任务来到将抛弃队列中最老的任务。

测试多个线程并发的情况:

// 一次投入 4 个任务
for (int i = 0; i < 4; i++) {
    final int j = i;
    smallExecutor.execute(new Runnable() {
        @Override
        public void run() {
            HttpLog.i(TAG, " TASK " + j + " is running now ----------->");
            SystemClock.sleep(j * 200);
        }
    });
}

// 再投入1个可能需要取消的任务
Future future = smallExecutor.submit(new Runnable() {
    @Override
    public void run() {
        HttpLog.i(TAG, " TASK 4 will be canceled... ------------>");
        SystemClock.sleep(1000);
    }
});

// 合适的时机取消此任务
future.cancel(false);

上述代码,一次依次投入 0、1、2、3、4 五个任务,注意4任务是最后投入的,返回一个Future对象。

根据设置,0、1会立即执行,执行满载后2、3进入排队队列,排队满载后独立投入的任务4来到,队列中最老的任务2被移除,队列中为3、4 。

因为4随后被取消执行,所以最后输出:

TASK 0 is running now ----------->
TASK 1 is running now ----------->
TASK 3 is running now ----------->

LiteGO 基本原理

我们看 SmartExecutor 的几个主要方法:

public Future<?> submit(Runnable task)

public <T> Future<T> submit(Runnable task, T result)

public <T> Future<T> submit(Callable<T> task)

public <T> void submit(RunnableFuture<T> task)

public void execute(final Runnable command)

最主要的是 execute 方法,其他几个方法是将任务封装为 FutureTask 投入到 execute 方法执行。因为 FutureTask 本质就是一个 RunnableFuture 对象,兼具 Runnable 和 Future 的特性和功能。

那么重点就是看 execute 方法了:

@Override
public void execute(final Runnable command) {
    if (command == null) {
        return;
    }

    WrappedRunnable scheduler = new WrappedRunnable() {
        @Override
        public Runnable getRealRunnable() {
            return command;
        }

        public Runnable realRunnable;

        @Override
        public void run() {
            try {
                command.run();
            } finally {
                scheduleNext(this);
            }
        }
    };

    boolean callerRun = false;
    synchronized (lock) {
        if (runningList.size() < coreSize) {
            runningList.add(scheduler);
            threadPool.execute(scheduler);
        } else if (waitingList.size() < queueSize) {
            waitingList.addLast(scheduler);
        } else {
            switch (overloadPolicy) {
                case DiscardNewTaskInQueue:
                    waitingList.pollLast();
                    waitingList.addLast(scheduler);
                    break;
                case DiscardOldTaskInQueue:
                    waitingList.pollFirst();
                    waitingList.addLast(scheduler);
                    break;
                case CallerRuns:
                    callerRun = true;
                    break;
                case DiscardCurrentTask:
                    break;
                case ThrowExecption:
                    throw new RuntimeException("Task rejected from lite smart executor. " + command.toString());
                default:
                    break;
            }
        }
        //printThreadPoolInfo();
    }
    if (callerRun) {
        command.run();
    }
}

可以看到整个过程简单概括为:

  1. 把任务封装为一个类似“链表”的结构体,执行完一个,调度下一个。
  2. 加锁防止并发时抢夺资源,判断当前运行任务数量。
  3. 当前任务数少于并发最大数量则投入运行,若满载则投入等待队列尾部。
  4. 若等待队列未满新任务进入排队,若满则执行满载处理策略。
  5. 当一个任务执行完,其尾部通过“链接”的方式调度一个新任务执行。若没有任务,则结束。

其中「加锁」和将任务包装成「链表」是重点。