利用netty中的future获取异步执行的结果
- - Java - 编程语言 - ITeye博客 前段时间使用netty3,感受到其对于future的设计在写异步操作时的高效与便捷,通过future与futurelistener的组合实现异步的通知. 这个在平时写异步执行代码的中经常用到. 其实JDK也有Future这个接口,是active object模式的一种实现. 最主要的思想就是让任务的调度和任务的执行分离.
public class FutureTest {
public static void main(String[] args) throws InterruptedException {
// 执行异步任务的线程池
ExecutorService executor = Executors.newCachedThreadPool();
Future<String> future = executor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return "hello, world!";
}
});
// 做其他的任务balabalbala...
//需要刚刚执行的那个结果,从future中获取结果
try {
String result = future.get();
System.out.println("got the resulit:" + result);
} catch (ExecutionException e) {
e.printStackTrace();
}finally{
executor.shutdown();
}
}
}
public interface ComputeFuture {
boolean isDone();
boolean isCancelled();
boolean isSuccess();
Throwable getCause();
boolean cancel();
boolean setSuccess(Object result);
boolean setFailure(Throwable cause);
void addListener(ComputeFutureListener listener);
void removeListener(ComputeFutureListener listener);
ComputeFuture sync() throws InterruptedException;
ComputeFuture syncUninterruptibly();
ComputeFuture await() throws InterruptedException;
ComputeFuture awaitUninterruptibly();
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
boolean await(long timeoutMillis) throws InterruptedException;
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
boolean awaitUninterruptibly(long timeoutMillis);
Object getResult() ;
}
public class DefaultComputeFutureImpl implements ComputeFuture {
private static final Throwable CANCELLED = new Throwable();
private final boolean cancellable;
private ComputeFutureListener firstListener;
private List<ComputeFutureListener> otherListeners;
private boolean done;
private Throwable cause;
private int waiters;
private Object result;
public Object getResult() {
return result;
}
public DefaultComputeFutureImpl(boolean cancellable) {
this.cancellable = cancellable;
}
@Override
public synchronized boolean isDone() {
return done;
}
@Override
public synchronized boolean isSuccess() {
return done && cause == null;
}
@Override
public synchronized Throwable getCause() {
if (cause != CANCELLED) {
return cause;
} else {
return null;
}
}
@Override
public synchronized boolean isCancelled() {
return cause == CANCELLED;
}
@Override
public void addListener(ComputeFutureListener listener) {
if (listener == null) {
throw new NullPointerException("listener");
}
boolean notifyNow = false;
synchronized (this) {
if (done) {
notifyNow = true;
} else {
if (firstListener == null) {
firstListener = listener;
} else {
if (otherListeners == null) {
otherListeners = new ArrayList<ComputeFutureListener>(1);
}
otherListeners.add(listener);
}
}
}
if (notifyNow) {
notifyListener(listener);
}
}
@Override
public void removeListener(ComputeFutureListener listener) {
if (listener == null) {
throw new NullPointerException("listener");
}
synchronized (this) {
if (!done) {
if (listener == firstListener) {
if (otherListeners != null && !otherListeners.isEmpty()) {
firstListener = otherListeners.remove(0);
} else {
firstListener = null;
}
} else if (otherListeners != null) {
otherListeners.remove(listener);
}
}
}
}
@Override
public ComputeFuture sync() throws InterruptedException {
await();
rethrowIfFailed0();
return this;
}
@Override
public ComputeFuture syncUninterruptibly() {
awaitUninterruptibly();
rethrowIfFailed0();
return this;
}
private void rethrowIfFailed0() {
Throwable cause = getCause();
if (cause == null) {
return;
}
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
if (cause instanceof Error) {
throw (Error) cause;
}
throw new RuntimeErrorException((Error) cause);
}
@Override
public ComputeFuture await() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
synchronized (this) {
while (!done) {
waiters++;
try {
wait();
} finally {
waiters--;
}
}
}
return this;
}
@Override
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return await0(unit.toNanos(timeout), true);
}
public boolean await(long timeoutMillis) throws InterruptedException {
return await0(MILLISECONDS.toNanos(timeoutMillis), true);
}
@Override
public ComputeFuture awaitUninterruptibly() {
boolean interrupted = false;
synchronized (this) {
while (!done) {
waiters++;
try {
wait();
} catch (InterruptedException e) {
interrupted = true;
} finally {
waiters--;
}
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
return this;
}
@Override
public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
try {
return await0(unit.toNanos(timeout), false);
} catch (InterruptedException e) {
throw new InternalError();
}
}
@Override
public boolean awaitUninterruptibly(long timeoutMillis) {
try {
return await0(MILLISECONDS.toNanos(timeoutMillis), false);
} catch (InterruptedException e) {
throw new InternalError();
}
}
private boolean await0(long timeoutNanos, boolean interruptable)
throws InterruptedException {
if (interruptable && Thread.interrupted()) {
throw new InterruptedException();
}
long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime();
long waitTime = timeoutNanos;
boolean interrupted = false;
try {
synchronized (this) {
if (done || waitTime <= 0) {
return done;
}
waiters++;
try {
for (;;) {
try {
wait(waitTime / 1000000, (int) (waitTime % 1000000));
} catch (InterruptedException e) {
if (interruptable) {
throw e;
} else {
interrupted = true;
}
}
if (done) {
return true;
} else {
waitTime = timeoutNanos
- (System.nanoTime() - startTime);
if (waitTime <= 0) {
return done;
}
}
}
} finally {
waiters--;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
@Override
public boolean setSuccess(Object result) {
synchronized (this) {
// Allow only once.
if (done) {
return false;
}
done = true;
this.result = result;
if (waiters > 0) {
notifyAll();
}
}
notifyListeners();
return true;
}
@Override
public boolean setFailure(Throwable cause) {
if (cause == null) {
throw new NullPointerException("cause");
}
synchronized (this) {
// Allow only once.
if (done) {
return false;
}
this.cause = cause;
done = true;
if (waiters > 0) {
notifyAll();
}
}
notifyListeners();
return true;
}
@Override
public boolean cancel() {
if (!cancellable) {
return false;
}
synchronized (this) {
// Allow only once.
if (done) {
return false;
}
cause = CANCELLED;
done = true;
if (waiters > 0) {
notifyAll();
}
}
notifyListeners();
return true;
}
private void notifyListeners() {
if (firstListener != null) {
notifyListener(firstListener);
firstListener = null;
if (otherListeners != null) {
for (ComputeFutureListener l : otherListeners) {
notifyListener(l);
}
otherListeners = null;
}
}
}
private void notifyListener(ComputeFutureListener l) {
try {
l.operationComplete(this);
} catch (Throwable t) {
t.printStackTrace();
}
}
}
public interface ComputeFutureListener {
void operationComplete(ComputeFuture future) throws Exception;
}
public interface BasicCompute {
ComputeFuture add(int a ,int b);
}
public class BasicComputeImpl implements BasicCompute {
private ComputeFuture future;
// 执行异步任务的线程池
private ExecutorService executor = Executors.newCachedThreadPool();
public BasicComputeImpl(ComputeFuture future) {
this.future = future;
}
@Override
public ComputeFuture add(int a, int b) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ",start to compute......");
int result = a+b;
System.out.println(Thread.currentThread().getName() +",got the result:" + result);
future.setSuccess(result);
}
});
return future;
}
}
public static void main(String[] args) {
BasicCompute computeActor = new BasicComputeImpl(
new DefaultComputeFutureImpl(false));
ComputeFuture future = computeActor.add(2, 3);
future.addListener(new ComputeFutureListener() {
@Override
public void operationComplete(ComputeFuture future)
throws Exception {
if (future.isSuccess()) {
System.out.println(Thread.currentThread().getName()
+ ",the result is:" + future.getResult());
}
}
});
System.out.println(Thread.currentThread().getName()+ ",let's wait the result...");
}
}