image.png - 下面为代码示例,首先是执行器接口
* Copyright: Copyright (c) 2017 LanRu-Caifu
* @author xzg
* 2017年9月8日
* @ClassName: AsyncExecutor.java
* @Description: 执行器executor的三个关联的对象,1:传入的参数线程task,2:传入的保存结果状态的callback
* 3:返回值result。它也是整个模式的核心部分
* @version: v1.0.0
public interface AsyncExecutor {
// 开始执行任务,未持有callback则说明客户端不需要对返回结果做额外判断。返回异步结果
<T> AsyncResult<T> startProcess(Callable<T> task);
// 开始执行任务,持有callback则说明客户端自定义实现额外判断。返回异步结果
<T> AsyncResult<T> startProcess(Callable<T> task, AsyncCallback<T> callback);
// 结束异步任务,如果必要时阻塞当前的线程并返回结果结束任务
<T> T endProcess(AsyncResult<T> asyncResult) throws ExecutionException, InterruptedException;
- 异步执行返回结果接口
* Copyright: Copyright (c) 2017 LanRu-Caifu
* @author xzg
* 2017年9月8日
* @ClassName: AsyncResult.java
* @Description: executor执行器执行的返回结果。它应该提供执行状态、任务返回值、结果挂起
* @version: v1.0.0
public interface AsyncResult<T> {
// 线程任务是否完成
boolean isCompleted();
// 获取任务的返回值
T getValue() throws ExecutionException;
// 阻塞当前线程,直到异步任务完成,如果执行中断,抛出异常
void await() throws InterruptedException;
- 保存执行器executor执行结果(task任务状态,返回值),客户端可以进行自定义处理
* Copyright: Copyright (c) 2017 LanRu-Caifu
* @author xzg
* 2017年9月8日
* @ClassName: AsynCallback.java
* @Description: 保存执行器executor执行结果(task任务状态,返回值),可以由客户端进行自定义处理
* @version: v1.0.0
public interface AsynCallback<T> {
void onComplete(T val,Optional<Exception> ex);
- 执行器的具体实现
* Copyright: Copyright (c) 2017 LanRu-Caifu
* @author xzg
* 2017年9月8日
* @ClassName: ThreadAsyncExecutor.java
* @Description:
* @version: v1.0.0
public class ThreadAsyncExecutor implements AsyncExecutor {
// 为区别线程,为每个线程命名
private final AtomicInteger idx = new AtomicInteger(0);
public <T> AsyncResult<T> startProcess(Callable<T> task) {
return startProcess(task, null);
public <T> AsyncResult<T> startProcess(Callable<T> task, AsyncCallback<T> callback) {
// CompletableResult作为executor的返回结果,它会对callback传递参数让callback自行处理
CompletableResult<T> result = new CompletableResult<>(callback);
// 启动一个线程去处理任务线程,并将任务线程的返回结果设置到result中
new Thread(() -> {
try {
} catch (Exception ex) {
} , "executor-" + idx.incrementAndGet()).start();
return result;
// 结束任务,如果当前任务没有完成则让出cpu让其他任务使用。如果执行结束返回结果
public <T> T endProcess(AsyncResult<T> asyncResult) throws ExecutionException, InterruptedException {
if (!asyncResult.isCompleted()) {
return asyncResult.getValue();
* Simple implementation of async result that allows completing it successfully with a value or exceptionally with an
* exception. A really simplified version from its real life cousins FutureTask and CompletableFuture.
* @see java.util.concurrent.FutureTask
* @see java.util.concurrent.CompletableFuture
// 执行器executor的三个关联的对象,1:传入的参数线程task,2:传入的保存结果状态的callback,3:返回值result
// 异步执行的结果封装,持有callback对象(该对象可由客户端重写),这里是将执行的结果保存到callback中的value|exception
private static class CompletableResult<T> implements AsyncResult<T> {
// 几种执行的状态
static final int RUNNING = 1;
static final int FAILED = 2;
static final int COMPLETED = 3;
// 对象锁
final Object lock;
// Optional封装callback
final Optional<AsyncCallback<T>> callback;
// 初始状态
volatile int state = RUNNING;
// 执行结果
T value;
// 执行异常情况
Exception exception;
CompletableResult(AsyncCallback<T> callback) {
this.lock = new Object();
this.callback = Optional.ofNullable(callback);
* Sets the value from successful execution and executes callback if available. Notifies any thread waiting for
* completion.
* 封装任务的返回结果
* @param value
* value of the evaluated task
void setValue(T value) {
this.value = value;
this.state = COMPLETED;
this.callback.ifPresent(ac -> ac.onComplete(value, Optional.<Exception>empty()));
synchronized (lock) {
* Sets the exception from failed execution and executes callback if available. Notifies any thread waiting for
* completion.
* 设置异常
* @param exception
* exception of the failed task
void setException(Exception exception) {
this.exception = exception;
this.state = FAILED;
this.callback.ifPresent(ac -> ac.onComplete(null, Optional.of(exception)));
synchronized (lock) {
// 是否运行状态
public boolean isCompleted() {
return state > RUNNING;
// 取得任务结果
public T getValue() throws ExecutionException {
if (state == COMPLETED) {
return value;
} else if (state == FAILED) {
throw new ExecutionException(exception);
} else {
throw new IllegalStateException("Execution not completed yet");
// 未完成时不参与竞争
public void await() throws InterruptedException {
synchronized (lock) {
while (!isCompleted()) {
- 测试部分
public class App {
public static void main(String[] args) throws Exception {
// 新建一个executor执行器
AsyncExecutor executor = new ThreadAsyncExecutor();
// 开始执行一些任务
AsyncResult<Integer> asyncResult1 = executor.startProcess(lazyval(10, 500));
AsyncResult<String> asyncResult2 = executor.startProcess(lazyval("test", 300));
AsyncResult<Long> asyncResult3 = executor.startProcess(lazyval(50L, 700));
AsyncResult<Integer> asyncResult4 = executor.startProcess(lazyval(20, 400), callback("Callback result 4"));
AsyncResult<String> asyncResult5 = executor.startProcess(lazyval("callback", 600), callback("Callback result 5"));
// emulate processing in the current thread while async tasks are running in their own threads
Thread.sleep(350); // Oh boy I'm working hard here
log("Some hard work done");
// wait for completion of the tasks
Integer result1 = executor.endProcess(asyncResult1);
String result2 = executor.endProcess(asyncResult2);
Long result3 = executor.endProcess(asyncResult3);
// 下面的执行结果挂起
// 打印线程结果
log("Result 1: " + result1);
log("Result 2: " + result2);
log("Result 3: " + result3);
* Creates a callable that lazily evaluates to given value with artificial delay.
* 创建一个任务
* @param value
* value to evaluate
* @param delayMillis
* artificial delay in milliseconds
* @return new callable for lazy evaluation
private static <T> Callable<T> lazyval(T value, long delayMillis) {
return () -> {
log("Task completed with: " + value);
return value;
* 客户端自定义callback
private static <T> AsyncCallback<T> callback(String name) {
// 返回一个callback重写 void onComplete(T value, Optional<Exception> ex)的实现类对象
return (value, ex) -> {
if (ex.isPresent()) {
log(name + " failed: " + ex.map(Exception::getMessage).orElse(""));
} else {
log(name + ": " + value);
// 日志方法
private static void log(String msg) {
Task completed with: test
Some hard work done
Task completed with: 20
Callback result 4: 20
Task completed with: 10
Task completed with: callback
Callback result 5: callback
Task completed with: 50
Result 1: 10
Result 2: test
Result 3: 50