利用netty中的future获取异步执行的结果
- - Java - 编程语言 - ITeye博客 前段时间使用netty3,感受到其对于future的设计在写异步操作时的高效与便捷,通过future与futurelistener的组合实现异步的通知. 这个在平时写异步执行代码的中经常用到. 其实JDK也有Future这个接口,是active object模式的一种实现. 最主要的思想就是让任务的调度和任务的执行分离.
public class FutureTest { public static void main(String[] args) throws InterruptedException { // 执行异步任务的线程池 ExecutorService executor = Executors.newCachedThreadPool(); Future<String> future = executor.submit(new Callable<String>() { @Override public String call() throws Exception { return "hello, world!"; } }); // 做其他的任务balabalbala... //需要刚刚执行的那个结果,从future中获取结果 try { String result = future.get(); System.out.println("got the resulit:" + result); } catch (ExecutionException e) { e.printStackTrace(); }finally{ executor.shutdown(); } } }
public interface ComputeFuture { boolean isDone(); boolean isCancelled(); boolean isSuccess(); Throwable getCause(); boolean cancel(); boolean setSuccess(Object result); boolean setFailure(Throwable cause); void addListener(ComputeFutureListener listener); void removeListener(ComputeFutureListener listener); ComputeFuture sync() throws InterruptedException; ComputeFuture syncUninterruptibly(); ComputeFuture await() throws InterruptedException; ComputeFuture awaitUninterruptibly(); boolean await(long timeout, TimeUnit unit) throws InterruptedException; boolean await(long timeoutMillis) throws InterruptedException; boolean awaitUninterruptibly(long timeout, TimeUnit unit); boolean awaitUninterruptibly(long timeoutMillis); Object getResult() ; }
public class DefaultComputeFutureImpl implements ComputeFuture { private static final Throwable CANCELLED = new Throwable(); private final boolean cancellable; private ComputeFutureListener firstListener; private List<ComputeFutureListener> otherListeners; private boolean done; private Throwable cause; private int waiters; private Object result; public Object getResult() { return result; } public DefaultComputeFutureImpl(boolean cancellable) { this.cancellable = cancellable; } @Override public synchronized boolean isDone() { return done; } @Override public synchronized boolean isSuccess() { return done && cause == null; } @Override public synchronized Throwable getCause() { if (cause != CANCELLED) { return cause; } else { return null; } } @Override public synchronized boolean isCancelled() { return cause == CANCELLED; } @Override public void addListener(ComputeFutureListener listener) { if (listener == null) { throw new NullPointerException("listener"); } boolean notifyNow = false; synchronized (this) { if (done) { notifyNow = true; } else { if (firstListener == null) { firstListener = listener; } else { if (otherListeners == null) { otherListeners = new ArrayList<ComputeFutureListener>(1); } otherListeners.add(listener); } } } if (notifyNow) { notifyListener(listener); } } @Override public void removeListener(ComputeFutureListener listener) { if (listener == null) { throw new NullPointerException("listener"); } synchronized (this) { if (!done) { if (listener == firstListener) { if (otherListeners != null && !otherListeners.isEmpty()) { firstListener = otherListeners.remove(0); } else { firstListener = null; } } else if (otherListeners != null) { otherListeners.remove(listener); } } } } @Override public ComputeFuture sync() throws InterruptedException { await(); rethrowIfFailed0(); return this; } @Override public ComputeFuture syncUninterruptibly() { awaitUninterruptibly(); rethrowIfFailed0(); return this; } private void rethrowIfFailed0() { Throwable cause = getCause(); if (cause == null) { return; } if (cause instanceof RuntimeException) { throw (RuntimeException) cause; } if (cause instanceof Error) { throw (Error) cause; } throw new RuntimeErrorException((Error) cause); } @Override public ComputeFuture await() throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } synchronized (this) { while (!done) { waiters++; try { wait(); } finally { waiters--; } } } return this; } @Override public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return await0(unit.toNanos(timeout), true); } public boolean await(long timeoutMillis) throws InterruptedException { return await0(MILLISECONDS.toNanos(timeoutMillis), true); } @Override public ComputeFuture awaitUninterruptibly() { boolean interrupted = false; synchronized (this) { while (!done) { waiters++; try { wait(); } catch (InterruptedException e) { interrupted = true; } finally { waiters--; } } } if (interrupted) { Thread.currentThread().interrupt(); } return this; } @Override public boolean awaitUninterruptibly(long timeout, TimeUnit unit) { try { return await0(unit.toNanos(timeout), false); } catch (InterruptedException e) { throw new InternalError(); } } @Override public boolean awaitUninterruptibly(long timeoutMillis) { try { return await0(MILLISECONDS.toNanos(timeoutMillis), false); } catch (InterruptedException e) { throw new InternalError(); } } private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException { if (interruptable && Thread.interrupted()) { throw new InterruptedException(); } long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime(); long waitTime = timeoutNanos; boolean interrupted = false; try { synchronized (this) { if (done || waitTime <= 0) { return done; } waiters++; try { for (;;) { try { wait(waitTime / 1000000, (int) (waitTime % 1000000)); } catch (InterruptedException e) { if (interruptable) { throw e; } else { interrupted = true; } } if (done) { return true; } else { waitTime = timeoutNanos - (System.nanoTime() - startTime); if (waitTime <= 0) { return done; } } } } finally { waiters--; } } } finally { if (interrupted) { Thread.currentThread().interrupt(); } } } @Override public boolean setSuccess(Object result) { synchronized (this) { // Allow only once. if (done) { return false; } done = true; this.result = result; if (waiters > 0) { notifyAll(); } } notifyListeners(); return true; } @Override public boolean setFailure(Throwable cause) { if (cause == null) { throw new NullPointerException("cause"); } synchronized (this) { // Allow only once. if (done) { return false; } this.cause = cause; done = true; if (waiters > 0) { notifyAll(); } } notifyListeners(); return true; } @Override public boolean cancel() { if (!cancellable) { return false; } synchronized (this) { // Allow only once. if (done) { return false; } cause = CANCELLED; done = true; if (waiters > 0) { notifyAll(); } } notifyListeners(); return true; } private void notifyListeners() { if (firstListener != null) { notifyListener(firstListener); firstListener = null; if (otherListeners != null) { for (ComputeFutureListener l : otherListeners) { notifyListener(l); } otherListeners = null; } } } private void notifyListener(ComputeFutureListener l) { try { l.operationComplete(this); } catch (Throwable t) { t.printStackTrace(); } } }
public interface ComputeFutureListener { void operationComplete(ComputeFuture future) throws Exception; }
public interface BasicCompute { ComputeFuture add(int a ,int b); }
public class BasicComputeImpl implements BasicCompute { private ComputeFuture future; // 执行异步任务的线程池 private ExecutorService executor = Executors.newCachedThreadPool(); public BasicComputeImpl(ComputeFuture future) { this.future = future; } @Override public ComputeFuture add(int a, int b) { executor.execute(new Runnable() { @Override public void run() { try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + ",start to compute......"); int result = a+b; System.out.println(Thread.currentThread().getName() +",got the result:" + result); future.setSuccess(result); } }); return future; } }
public static void main(String[] args) { BasicCompute computeActor = new BasicComputeImpl( new DefaultComputeFutureImpl(false)); ComputeFuture future = computeActor.add(2, 3); future.addListener(new ComputeFutureListener() { @Override public void operationComplete(ComputeFuture future) throws Exception { if (future.isSuccess()) { System.out.println(Thread.currentThread().getName() + ",the result is:" + future.getResult()); } } }); System.out.println(Thread.currentThread().getName()+ ",let's wait the result..."); } }