基于Netty的异步Rpc调用的小框架

标签: netty 异步 rpc | 发表时间:2016-05-06 19:01 | 作者:xulang
出处:http://www.iteye.com

基于netty写的一个异步Rpc调用小框架,欢迎拍砖,新手。

 

客户端与服务端通信的类

package cc.ymsoft.Framework;

import java.io.Serializable;

@SuppressWarnings("serial")
public class MethodAndArgs implements Serializable{
	private String methodName;//调用的方法名称
	private Class<?>[] types;//参数类型
	private Object[] objects;//参数列表
	public String getMethodName() {
		return methodName;
	}
	public void setMethodName(String methodName) {
		this.methodName = methodName;
	}
	public Class<?>[] getTypes() {
		return types;
	}
	public void setTypes(Class<?>[] types) {
		this.types = types;
	}
	public Object[] getObjects() {
		return objects;
	}
	public void setObjects(Object[] objects) {
		this.objects = objects;
	}
	public MethodAndArgs() {
		super();
		// TODO Auto-generated constructor stub
	}
	public MethodAndArgs(String methodName, Class<?>[] types, Object[] objects) {
		
		this.methodName = methodName;
		this.types = types;
		this.objects = objects;
	}
	
	
}

 

 框架类,有两个静态方法,regist(在服务器上注册服务)和getobjt(获得接口的代理类)

/**
 * @author xulang
 */
package cc.ymsoft.Framework;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.HashMap;
import java.util.Map;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
/**
 * 服务端处理
 * @author hadoop
 *
 */
 class TcpServerHandler extends ChannelInboundHandlerAdapter {
    
    private Object obj;
    private Object response;
    
    public TcpServerHandler(Object obj) {
		super();
		this.obj = obj;
		
	}
	@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        // TODO Auto-generated method stub
		MethodAndArgs methodAndArgs=(MethodAndArgs) msg;
		Method method=obj.getClass().getMethod(methodAndArgs.getMethodName(), methodAndArgs.getTypes());
		ctx.writeAndFlush(method.invoke(obj, methodAndArgs.getObjects()));
        ctx.close();
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    	// TODO Auto-generated method stub
    	System.out.println("client die");
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // TODO Auto-generated method stub
        System.out.println("channelActive>>>>>>>>");
        ctx.writeAndFlush("调用异常");
        ctx.close();
    }
      @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("服务器异常");
    }
}
/**
 * 客户端处理
 * @author hadoop
 *
 */
 class TcpClientHander extends ChannelInboundHandlerAdapter {
	 private Object response;
	 
		public Object getResponse() {
		return response;
	}

		@Override
	    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
			response=msg;
	        System.out.println("client接收到服务器返回的消息:" + msg);
	    }
	    
		@Override
	    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
	        System.out.println("client exception is general");
	    }
	}
 
public class RpcFramework {

	/**
	 * 服务注册
	 * @param obj 需要注册的服务对象
	 * @param port 端口
	 * @param ip 地址
	 * @throws InterruptedException 
	 */
	public static void regist(final Object obj,int port,String ip) throws InterruptedException {
		int BIZGROUPSIZE = Runtime.getRuntime().availableProcessors()*2;
		    
		int BIZTHREADSIZE = 100;
		 EventLoopGroup bossGroup = new NioEventLoopGroup(BIZGROUPSIZE);
		  EventLoopGroup workerGroup = new NioEventLoopGroup(BIZTHREADSIZE);
		if (obj == null)
			throw new IllegalArgumentException("对象不能为null");
		if (port <= 0 || port > 65535)
			throw new IllegalArgumentException("错误的端口" + port);
		 ServerBootstrap bootstrap = new ServerBootstrap();
	        bootstrap.group(bossGroup, workerGroup);
	        bootstrap.channel(NioServerSocketChannel.class);
	        bootstrap.childHandler(new ChannelInitializer<Channel>() {

	            @Override
	            protected void initChannel(Channel ch) throws Exception {
	                // TODO Auto-generated method stub
	                ChannelPipeline pipeline = ch.pipeline();
	                 pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
	                    pipeline.addLast(new LengthFieldPrepender(4));
	                    pipeline.addLast("encoder", new ObjectEncoder());  
	                    pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
	                  //  pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
	                   // pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
	                    pipeline.addLast(new TcpServerHandler(obj));
	            }       
	        });
	          ChannelFuture f = bootstrap.bind(ip, port).sync();
	          f.channel().closeFuture().sync();
	          System.out.println("TCP服务器已启动");
	}
	@SuppressWarnings("unchecked")
	public static <T>T getObj(Class<T> interfaceClass,final String host,final int port) {
		if (interfaceClass == null)
			throw new IllegalArgumentException("接口类型不能为空");
		if (!interfaceClass.isInterface())
			throw new IllegalArgumentException("类名" + interfaceClass.getName() + "必须是接口");
		if (host == null || host.length() == 0)
			throw new IllegalArgumentException("目标主机不能为空");
		if (port <= 0 || port > 65535)
			throw new IllegalArgumentException("端口错误:" + port);
		
	return (T)	Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass}, new InvocationHandler() {
			
			@Override
			public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
				
				MethodAndArgs mArgs=new MethodAndArgs(method.getName(), method.getParameterTypes(), args);
				 final TcpClientHander tcpClientHander=new TcpClientHander();
				  EventLoopGroup group = new NioEventLoopGroup();
		          try {
		              Bootstrap b = new Bootstrap();
		              b.group(group);
		         //     b.channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true);
		              b.channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true);
		             
		              b.handler(new ChannelInitializer<SocketChannel>() {
		                  @Override
		                  protected void initChannel(SocketChannel ch) throws Exception {
		                      ChannelPipeline pipeline = ch.pipeline();
		                      pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
		                      pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
		                      pipeline.addLast("encoder", new ObjectEncoder());  
		                      pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
		                      pipeline.addLast("handler",tcpClientHander);
		                  }
		              });
		              
		                   ChannelFuture f = b.connect(host, port).sync();    
		                
		                  f.channel().writeAndFlush(mArgs).sync();
		                   f.channel().closeFuture().sync();
		                 
		          } catch (Exception e) {

		          } finally {
		        	  
		              group.shutdownGracefully();
		          }
				return tcpClientHander.getResponse();
		     
				
			}
		});
	}
}

  测试

接口

package cc.ymsoft.test;

interface HelloService {
	String SayHello(String name);
}

 接口实现类

package cc.ymsoft.test;

public class HelloImp implements HelloService {

	@Override
	public String SayHello(String name) {
		// TODO Auto-generated method stub
		
		return "你好:"+name;
	}


}

 客户端

package cc.ymsoft.test;

import cc.ymsoft.Framework.RpcFramework;

public class HelloInvoke {
	
	public static void main(String[] args) throws Exception {
		final HelloService helloService = RpcFramework.getObj(HelloService.class, "127.0.0.1", 1717);
	
		
					System.out.println(helloService.SayHello("XL"));
				
		
	}

}

 服务端

 

package cc.ymsoft.test;

import cc.ymsoft.Framework.RpcFramework;

public class HelloPro {
	public static void main(String[] args) throws Exception {
		HelloService hello=new HelloImp();
		RpcFramework.regist(hello, 1717, "127.0.0.1");
	}
	
}

 

完整代码在github https://github.com/xulang/NettyRpc



已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [netty 异步 rpc] 推荐:

基于Netty的异步Rpc调用的小框架

- - Java - 编程语言 - ITeye博客
基于netty写的一个异步Rpc调用小框架,欢迎拍砖,新手. private String methodName;//调用的方法名称. private Class[] types;//参数类型. private Object[] objects;//参数列表.  框架类,有两个静态方法,regist(在服务器上注册服务)和getobjt(获得接口的代理类).

基于hessian和netty的RPC框架设计和实现

- - Java - 编程语言 - ITeye博客
基于hessian和netty的RPC框架设计和实现.         对系统进行服务化改造,或者构建一个分布式系统,RPC是核心的组件,目前主流的RPC框架有hessian\thrift\ avro等,如果不考虑跨语言的话thrift\ avro使用起来稍显复杂,要写IDL序列化配置,hessian又依赖servlet容器,于是使用netty和hessian构建了一个的RPC框 架.

利用netty中的future获取异步执行的结果

- - Java - 编程语言 - ITeye博客
      前段时间使用netty3,感受到其对于future的设计在写异步操作时的高效与便捷,通过future与futurelistener的组合实现异步的通知. 这个在平时写异步执行代码的中经常用到.       其实JDK也有Future这个接口,是active object模式的一种实现. 最主要的思想就是让任务的调度和任务的执行分离.

同步转异步+RPC的一个POS行业应用-业务模型介绍

- - 行业应用 - ITeye博客
最近在做一个挺有意思的POS消费项目,工作量不太大,但涉及的技术运用还挺有意思的. 可能有人奇怪,POS项目怎么用到JAVA语言了,先来简单介绍下这个项目背景:. 改造前:收银机下单,POS机下单并刷卡支付. 改造后:收银机跟POS连线,收银台直接下单并触发POS刷卡支付动作. 这里就涉及一个关键问题,POS机只能单线程工作,就是一个时刻只能干一件事情,比如打印,刷卡,跟卡主机通讯,都必须是一件件做.

同步转异步+RPC的一个POS行业应用-关键技术实现

- - 行业应用 - ITeye博客
简单回顾下业务模型:收银台<=>POS代理服务器<=>POS机,三者之间进行通讯,POS代理作为一个报文转换和同步转异步角色. 下面介绍下几个关键技术实现:. 这里的报文,指的是POS代理服务器跟POS通讯之间约定的报文. 根据POS交易类型,支付、刷卡、打印等,约定每个交易报文包含什么字段信息和长度,其中一个比较特别字段是UUID,这个字段是每个报文的关键字段,.

Hadoop RPC机制

- - 企业架构 - ITeye博客
RPC(Remote Procedure Call Protocol)远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议. Hadoop底层的交互都是通过 rpc进行的. 例如:datanode和namenode 、tasktracker和jobtracker、secondary namenode和namenode之间的通信都是通过rpc实现的.

HTTP/2 in Netty

- -
Here, we created a context for the server with a JDK SSL provider, added a couple of ciphers, and configured the Application-Layer Protocol Negotiation for HTTP/2..

Netty系列之Netty高性能之道

- - CSDN博客推荐文章
最近一个圈内朋友通过私信告诉我,通过使用Netty4 + Thrift压缩二进制编解码技术,他们实现了10W TPS(1K的复杂POJO对象)的跨节点远程服务调用. 相比于传统基于Java序列化+BIO(同步阻塞IO)的通信框架,性能提升了8倍多. 事实上,我对这个数据并不感到惊讶,根据我5年多的NIO编程经验,通过选择合适的NIO框架,加上高性能的压缩二进制编解码技术,精心的设计Reactor线程模型,达到上述性能指标是完全有可能的.

Netty代码分析

- LightingMan - 淘宝JAVA中间件团队博客
Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序[官方定义],整体来看其包含了以下内容:1.提供了丰富的协议编解码支持,2.实现自有的buffer系统,减少复制所带来的消耗,3.整套channel的实现,4.基于事件的过程流转以及完整的网络事件响应与扩展,5.丰富的example.

JAVA RPC 通讯框架

- - 经验沉淀 知识结晶
Bison 是一个JAVA 程间的通信框架,基于apache mina 实现,对mina进行了byteBuffer 缓冲区重用以及半包出处时减少拷贝. 客户端(bison-client) 功能点. 2 支持高用性:高可用的一个基本原则,可以接受快速的失败,但不能接受长时间的等待. Githup地址:https://github.com/gavenpeng/Bison.