基于Netty的异步Rpc调用的小框架
- - Java - 编程语言 - ITeye博客基于netty写的一个异步Rpc调用小框架,欢迎拍砖,新手. private String methodName;//调用的方法名称. private Class>[] types;//参数类型. private Object[] objects;//参数列表. 框架类,有两个静态方法,regist(在服务器上注册服务)和getobjt(获得接口的代理类).
基于netty写的一个异步Rpc调用小框架,欢迎拍砖,新手。
客户端与服务端通信的类
package cc.ymsoft.Framework; import java.io.Serializable; @SuppressWarnings("serial") public class MethodAndArgs implements Serializable{ private String methodName;//调用的方法名称 private Class<?>[] types;//参数类型 private Object[] objects;//参数列表 public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Class<?>[] getTypes() { return types; } public void setTypes(Class<?>[] types) { this.types = types; } public Object[] getObjects() { return objects; } public void setObjects(Object[] objects) { this.objects = objects; } public MethodAndArgs() { super(); // TODO Auto-generated constructor stub } public MethodAndArgs(String methodName, Class<?>[] types, Object[] objects) { this.methodName = methodName; this.types = types; this.objects = objects; } }
框架类,有两个静态方法,regist(在服务器上注册服务)和getobjt(获得接口的代理类)
/** * @author xulang */ package cc.ymsoft.Framework; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.HashMap; import java.util.Map; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; /** * 服务端处理 * @author hadoop * */ class TcpServerHandler extends ChannelInboundHandlerAdapter { private Object obj; private Object response; public TcpServerHandler(Object obj) { super(); this.obj = obj; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // TODO Auto-generated method stub MethodAndArgs methodAndArgs=(MethodAndArgs) msg; Method method=obj.getClass().getMethod(methodAndArgs.getMethodName(), methodAndArgs.getTypes()); ctx.writeAndFlush(method.invoke(obj, methodAndArgs.getObjects())); ctx.close(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub System.out.println("client die"); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub System.out.println("channelActive>>>>>>>>"); ctx.writeAndFlush("调用异常"); ctx.close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("服务器异常"); } } /** * 客户端处理 * @author hadoop * */ class TcpClientHander extends ChannelInboundHandlerAdapter { private Object response; public Object getResponse() { return response; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { response=msg; System.out.println("client接收到服务器返回的消息:" + msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("client exception is general"); } } public class RpcFramework { /** * 服务注册 * @param obj 需要注册的服务对象 * @param port 端口 * @param ip 地址 * @throws InterruptedException */ public static void regist(final Object obj,int port,String ip) throws InterruptedException { int BIZGROUPSIZE = Runtime.getRuntime().availableProcessors()*2; int BIZTHREADSIZE = 100; EventLoopGroup bossGroup = new NioEventLoopGroup(BIZGROUPSIZE); EventLoopGroup workerGroup = new NioEventLoopGroup(BIZTHREADSIZE); if (obj == null) throw new IllegalArgumentException("对象不能为null"); if (port <= 0 || port > 65535) throw new IllegalArgumentException("错误的端口" + port); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup); bootstrap.channel(NioServerSocketChannel.class); bootstrap.childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { // TODO Auto-generated method stub ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast(new LengthFieldPrepender(4)); pipeline.addLast("encoder", new ObjectEncoder()); pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))); // pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); // pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new TcpServerHandler(obj)); } }); ChannelFuture f = bootstrap.bind(ip, port).sync(); f.channel().closeFuture().sync(); System.out.println("TCP服务器已启动"); } @SuppressWarnings("unchecked") public static <T>T getObj(Class<T> interfaceClass,final String host,final int port) { if (interfaceClass == null) throw new IllegalArgumentException("接口类型不能为空"); if (!interfaceClass.isInterface()) throw new IllegalArgumentException("类名" + interfaceClass.getName() + "必须是接口"); if (host == null || host.length() == 0) throw new IllegalArgumentException("目标主机不能为空"); if (port <= 0 || port > 65535) throw new IllegalArgumentException("端口错误:" + port); return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { MethodAndArgs mArgs=new MethodAndArgs(method.getName(), method.getParameterTypes(), args); final TcpClientHander tcpClientHander=new TcpClientHander(); EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group); // b.channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true); b.channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true); b.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); pipeline.addLast("encoder", new ObjectEncoder()); pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))); pipeline.addLast("handler",tcpClientHander); } }); ChannelFuture f = b.connect(host, port).sync(); f.channel().writeAndFlush(mArgs).sync(); f.channel().closeFuture().sync(); } catch (Exception e) { } finally { group.shutdownGracefully(); } return tcpClientHander.getResponse(); } }); } }
测试
接口
package cc.ymsoft.test; interface HelloService { String SayHello(String name); }
接口实现类
package cc.ymsoft.test; public class HelloImp implements HelloService { @Override public String SayHello(String name) { // TODO Auto-generated method stub return "你好:"+name; } }
客户端
package cc.ymsoft.test; import cc.ymsoft.Framework.RpcFramework; public class HelloInvoke { public static void main(String[] args) throws Exception { final HelloService helloService = RpcFramework.getObj(HelloService.class, "127.0.0.1", 1717); System.out.println(helloService.SayHello("XL")); } }
服务端
package cc.ymsoft.test; import cc.ymsoft.Framework.RpcFramework; public class HelloPro { public static void main(String[] args) throws Exception { HelloService hello=new HelloImp(); RpcFramework.regist(hello, 1717, "127.0.0.1"); } }
完整代码在github https://github.com/xulang/NettyRpc