利用netty中的future获取异步执行的结果

标签: 利用 netty future | 发表时间:2015-04-06 00:43 | 作者:dong_ming
出处:http://www.iteye.com
      前段时间使用netty3,感受到其对于future的设计在写异步操作时的高效与便捷,通过future与futurelistener的组合实现异步的通知。这个在平时写异步执行代码的中经常用到。

      其实JDK也有Future这个接口,是active object模式的一种实现。最主要的思想就是让任务的调度和任务的执行分离。在一个主线程中发起一个任务,将这个任务有另一个线程去异步的执行,主线程继续执行其他的逻辑,当需要那个异步执行的结果的时候从Future中去get()这个结果。
public class FutureTest {
	public static void main(String[] args) throws InterruptedException {

		// 执行异步任务的线程池
		ExecutorService executor = Executors.newCachedThreadPool();
		Future<String> future = executor.submit(new Callable<String>() {
			@Override
			public String call() throws Exception {
				return "hello, world!";
			}
		});
		// 做其他的任务balabalbala...

		//需要刚刚执行的那个结果,从future中获取结果
		try {
			String result = future.get();
			System.out.println("got the resulit:" + result);
		} catch (ExecutionException e) {
			e.printStackTrace();
		}finally{
			executor.shutdown();
		}
	}
}

      上面的代码举个不恰当的例子,是这样一种情况:你在逛街的时候买奶茶,和营业员说我要一杯奶茶,营业员收到你的钱,然后给你一个小票,等奶茶好了你到时候来取。你觉得可以利用这个时间走开又去隔壁的面包店逛逛。有一点要注意的是当奶茶好的时候营业员不会打电话叫你,而是你自己去询问营业员奶茶好了没有,如果好了了你就可以立马拿走,如果没有就要麻烦你稍微等一下了,更糟糕的情况是制作奶茶的机器出了问题(产生异常了)。
       netty的future主要是配合FutureListener的使用,来达到异步通知的功能。也就是你买奶茶的时候,如果奶茶制作完成,营业员会通过电话来通知你。代码对直接吧Nettty直接拿了过来,做了少许的改动。
ComputeFuture接口
public interface ComputeFuture {

	boolean isDone();

	boolean isCancelled();

	boolean isSuccess();

	Throwable getCause();

	boolean cancel();

	boolean setSuccess(Object result);

	boolean setFailure(Throwable cause);

	void addListener(ComputeFutureListener listener);

	void removeListener(ComputeFutureListener listener);

	ComputeFuture sync() throws InterruptedException;

	ComputeFuture syncUninterruptibly();

	ComputeFuture await() throws InterruptedException;
	
	ComputeFuture awaitUninterruptibly();

	boolean await(long timeout, TimeUnit unit) throws InterruptedException;

	boolean await(long timeoutMillis) throws InterruptedException;

	boolean awaitUninterruptibly(long timeout, TimeUnit unit);

	boolean awaitUninterruptibly(long timeoutMillis);
	
	 Object getResult() ;

}

ComputeFuture实现类
public class DefaultComputeFutureImpl implements ComputeFuture {

	private static final Throwable CANCELLED = new Throwable();

	private final boolean cancellable;

	private ComputeFutureListener firstListener;
	private List<ComputeFutureListener> otherListeners;
	private boolean done;
	private Throwable cause;
	private int waiters;
	
	private Object result;

	public Object getResult() {
		return result;
	}

	public DefaultComputeFutureImpl(boolean cancellable) {
		this.cancellable = cancellable;
	}

	@Override
	public synchronized boolean isDone() {
		return done;
	}

	@Override
	public synchronized boolean isSuccess() {
		return done && cause == null;
	}

	@Override
	public synchronized Throwable getCause() {
		if (cause != CANCELLED) {
			return cause;
		} else {
			return null;
		}
	}

	@Override
	public synchronized boolean isCancelled() {
		return cause == CANCELLED;
	}

	@Override
	public void addListener(ComputeFutureListener listener) {
		if (listener == null) {
			throw new NullPointerException("listener");
		}

		boolean notifyNow = false;
		synchronized (this) {
			if (done) {
				notifyNow = true;
			} else {
				if (firstListener == null) {
					firstListener = listener;
				} else {
					if (otherListeners == null) {
						otherListeners = new ArrayList<ComputeFutureListener>(1);
					}
					otherListeners.add(listener);
				}
			}
		}

		if (notifyNow) {
			notifyListener(listener);
		}
	}

	@Override
	public void removeListener(ComputeFutureListener listener) {
		if (listener == null) {
			throw new NullPointerException("listener");
		}

		synchronized (this) {
			if (!done) {
				if (listener == firstListener) {
					if (otherListeners != null && !otherListeners.isEmpty()) {
						firstListener = otherListeners.remove(0);
					} else {
						firstListener = null;
					}
				} else if (otherListeners != null) {
					otherListeners.remove(listener);
				}
			}
		}
	}

	@Override
	public ComputeFuture sync() throws InterruptedException {
		await();
		rethrowIfFailed0();
		return this;
	}

	@Override
	public ComputeFuture syncUninterruptibly() {
		awaitUninterruptibly();
		rethrowIfFailed0();
		return this;
	}

	private void rethrowIfFailed0() {
		Throwable cause = getCause();
		if (cause == null) {
			return;
		}

		if (cause instanceof RuntimeException) {
			throw (RuntimeException) cause;
		}

		if (cause instanceof Error) {
			throw (Error) cause;
		}

		throw new RuntimeErrorException((Error) cause);
	}

	@Override
	public ComputeFuture await() throws InterruptedException {
		if (Thread.interrupted()) {
			throw new InterruptedException();
		}

		synchronized (this) {
			while (!done) {
				waiters++;
				try {
					wait();
				} finally {
					waiters--;
				}
			}
		}
		return this;
	}

	@Override
	public boolean await(long timeout, TimeUnit unit)
			throws InterruptedException {
		return await0(unit.toNanos(timeout), true);
	}

	public boolean await(long timeoutMillis) throws InterruptedException {
		return await0(MILLISECONDS.toNanos(timeoutMillis), true);
	}
	@Override
	public ComputeFuture awaitUninterruptibly() {
		boolean interrupted = false;
		synchronized (this) {
			while (!done) {
				waiters++;
				try {
					wait();
				} catch (InterruptedException e) {
					interrupted = true;
				} finally {
					waiters--;
				}
			}
		}

		if (interrupted) {
			Thread.currentThread().interrupt();
		}

		return this;
	}
	@Override
	public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
		try {
			return await0(unit.toNanos(timeout), false);
		} catch (InterruptedException e) {
			throw new InternalError();
		}
	}
	@Override
	public boolean awaitUninterruptibly(long timeoutMillis) {
		try {
			return await0(MILLISECONDS.toNanos(timeoutMillis), false);
		} catch (InterruptedException e) {
			throw new InternalError();
		}
	}

	private boolean await0(long timeoutNanos, boolean interruptable)
			throws InterruptedException {
		if (interruptable && Thread.interrupted()) {
			throw new InterruptedException();
		}

		long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime();
		long waitTime = timeoutNanos;
		boolean interrupted = false;

		try {
			synchronized (this) {
				if (done || waitTime <= 0) {
					return done;
				}

				waiters++;
				try {
					for (;;) {
						try {
							wait(waitTime / 1000000, (int) (waitTime % 1000000));
						} catch (InterruptedException e) {
							if (interruptable) {
								throw e;
							} else {
								interrupted = true;
							}
						}

						if (done) {
							return true;
						} else {
							waitTime = timeoutNanos
									- (System.nanoTime() - startTime);
							if (waitTime <= 0) {
								return done;
							}
						}
					}
				} finally {
					waiters--;
				}
			}
		} finally {
			if (interrupted) {
				Thread.currentThread().interrupt();
			}
		}
	}
	@Override
	public boolean setSuccess(Object result) {
		synchronized (this) {
			// Allow only once.
			if (done) {
				return false;
			}

			done = true;
			this.result = result;
			if (waiters > 0) {
				notifyAll();
			}
		}

		notifyListeners();
		return true;
	}
	@Override
	public boolean setFailure(Throwable cause) {
		if (cause == null) {
			throw new NullPointerException("cause");
		}

		synchronized (this) {
			// Allow only once.
			if (done) {
				return false;
			}

			this.cause = cause;
			done = true;
			if (waiters > 0) {
				notifyAll();
			}
		}

		notifyListeners();
		return true;
	}
	@Override
	public boolean cancel() {
		if (!cancellable) {
			return false;
		}

		synchronized (this) {
			// Allow only once.
			if (done) {
				return false;
			}

			cause = CANCELLED;
			done = true;
			if (waiters > 0) {
				notifyAll();
			}
		}

		notifyListeners();
		return true;
	}

	private void notifyListeners() {
		if (firstListener != null) {
			notifyListener(firstListener);
			firstListener = null;

			if (otherListeners != null) {
				for (ComputeFutureListener l : otherListeners) {
					notifyListener(l);
				}
				otherListeners = null;
			}
		}
	}

	private void notifyListener(ComputeFutureListener l) {
		try {
			l.operationComplete(this);
		} catch (Throwable t) {
			t.printStackTrace();
		}
	}

}


再来一个listener接口
public interface ComputeFutureListener {
	
	void operationComplete(ComputeFuture future) throws Exception;
}


一个最简单的计算接口,实现一个int的加法
public interface BasicCompute {
	
	ComputeFuture add(int a ,int b);
}

计算接口的实现类
public class BasicComputeImpl implements BasicCompute {

	private ComputeFuture future;
	// 执行异步任务的线程池
	private ExecutorService executor = Executors.newCachedThreadPool();

	public BasicComputeImpl(ComputeFuture future) {
		this.future = future;
	}

	@Override
	public ComputeFuture add(int a, int b) {
		
		executor.execute(new Runnable() {
			
			@Override
			public void run() {
				try {
					Thread.sleep(10000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println(Thread.currentThread().getName() + ",start to compute......");
				int result = a+b;
				System.out.println(Thread.currentThread().getName()  +",got the result:" + result);
				future.setSuccess(result);
			}
		});
		return future;
	}
}


测试类
	public static void main(String[] args) {

		BasicCompute computeActor = new BasicComputeImpl(
				new DefaultComputeFutureImpl(false));
		ComputeFuture future = computeActor.add(2, 3);
		future.addListener(new ComputeFutureListener() {

			@Override
			public void operationComplete(ComputeFuture future)
					throws Exception {

				if (future.isSuccess()) {
					System.out.println(Thread.currentThread().getName()
							+ ",the result is:" + future.getResult());
				}
			}
		});
		
		System.out.println(Thread.currentThread().getName()+ ",let's wait the result...");
	}
}


     执行的结果如下:
     main,let's wait the result...
     pool-1-thread-1,start to compute......
     pool-1-thread-1,got the result:5
     pool-1-thread-1,the result is:5
     这里之所以把线程的名称打出来,是为了搞清楚这个监听器的执行是在哪个线程中执行的。当我们需要开发异步的程序的时候就可以使用这个类来完成通知获取结果的操作。


已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [利用 netty future] 推荐:

利用netty中的future获取异步执行的结果

- - Java - 编程语言 - ITeye博客
      前段时间使用netty3,感受到其对于future的设计在写异步操作时的高效与便捷,通过future与futurelistener的组合实现异步的通知. 这个在平时写异步执行代码的中经常用到.       其实JDK也有Future这个接口,是active object模式的一种实现. 最主要的思想就是让任务的调度和任务的执行分离.

Future 与 FutureTask

- - 码蜂笔记
来自 Java DOC 文档:Future 表示异步计算的结果. 它提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果. 计算完成后只能使用 get 方法来获取结果,如有必要,计算完成前可以阻塞此方法. 取消则由 cancel 方法来执行. 还提供了其他方法,以确定任务是正常完成还是被取消了.

java异步计算Future

- - 互联网 - ITeye博客
从jdk1.5开始我们可以利用Future来跟踪异步计算的结果. 在此之前主线程要想获得工作线程(异步计算线程)的结果是比较麻烦的事情,需要我们进行特殊的程序结构设计,比较繁琐而且容易出错. 有了Future我们就可以设计出比较优雅的异步计算程序结构模型:根据分而治之的思想,我们可以把异步计算的线程按照职责分为3类:.

Netty系列之Netty高性能之道

- - CSDN博客推荐文章
最近一个圈内朋友通过私信告诉我,通过使用Netty4 + Thrift压缩二进制编解码技术,他们实现了10W TPS(1K的复杂POJO对象)的跨节点远程服务调用. 相比于传统基于Java序列化+BIO(同步阻塞IO)的通信框架,性能提升了8倍多. 事实上,我对这个数据并不感到惊讶,根据我5年多的NIO编程经验,通过选择合适的NIO框架,加上高性能的压缩二进制编解码技术,精心的设计Reactor线程模型,达到上述性能指标是完全有可能的.

拍摄回你的前半生:Back to the Future

- 仙人掌 - cnBeta.COM
每逢看看自己的旧照片,不禁为到岁月飞逝而怀缅过去. 阿根廷摄影师Irina Werning 组织了以Back to the Future(回到未来)为主题的拍摄项目,摄影师要求被拍者找到当年的故人,并且重现几十年前的场景.

java 线程池使用 Runnable&Callable&Future

- - Java - 编程语言 - ITeye博客
执行一次线程,调用Runnable接口实现.  当线程池执行Runnable后,返回的Future.get()总是null. DefaultRunnable代码如下:. 执行一次线程,调用Callable接口实现.  当线程池执行Callable时,返回的Future.get() 会返回Callable的返回值.

Java中的Runnable、Callable、Future、FutureTask的区别

- - IT瘾-geek
Java中存在Runnable、Callable、Future、FutureTask这几个与线程相关的类或者接口,在Java中也是比较重要的几个概念,我们通过下面的简单示例来了解一下它们的作用于区别. 其中Runnable应该是我们最熟悉的接口,它只有一个run()函数,用于将耗时操作写在其中, 该函数没有返回值.

Netty代码分析

- LightingMan - 淘宝JAVA中间件团队博客
Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序[官方定义],整体来看其包含了以下内容:1.提供了丰富的协议编解码支持,2.实现自有的buffer系统,减少复制所带来的消耗,3.整套channel的实现,4.基于事件的过程流转以及完整的网络事件响应与扩展,5.丰富的example.

Netty 5用户指南

- - 并发编程网 - ifeve.com
原文地址: http://netty.io/wiki/user-guide-for-5.x.html     译者:光辉勇士       校对:郭蕾. 现如今我们使用通用的应用程序或者类库来实现系统之间地互相访问,比如我们经常使用一个HTTP客户端来从web服务器上获取信息,或者通过web service来执行一个远程的调用.

Netty 用户指南4.x

- - CSDN博客推荐文章
现在我们经常使用程序或者库和其他人交流信息.例如,我们经常使用http程序库去从一个web server接收信息,或者调用一个远程的web服务.然而,一个通用的传输协议或者实现有的时候不能适应我们自己的场景.例如,我们不会用http server来传输一些大的文件,Email和一些实时性的信息,例如金融方面或者有些游戏数据方面的信息.这些需要一个高度优化的协议,为了使用某一种特定的应用场景.