[原]基于netty的心跳检测

标签: | 发表时间:2015-03-12 04:45 | 作者:asd13141718
出处:http://blog.csdn.net/asd13141718

这两天由于要给android系统的设备写一个心跳功能,所以在这里写一个基于netty的心跳检测功能。

实现的功能:

1.客户端网络空闲5秒没有进行写操作是,进行发送一次ping心跳给服务端;

2.客户端如果在下一个发送ping心跳周期来临时,还没有收到服务端pong的心跳应答,则失败心跳计数器加1;

3.每当客户端收到服务端的pong心跳应答后,失败心跳计数器清零;

4.如果连续超过3次没有收到服务端的心跳回复,则断开当前连接,在5秒后进行重连操作,直到重连成功,否则每隔5秒又会进行重连;

5.服务端网络空闲状态到达6秒后,服务端心跳失败计数器加1;

6.只要收到客户端的ping消息,服务端心跳失败计数器清零;

7.服务端连续3次没有收到客户端的ping消息后,将关闭链路,释放资源,等待客户端重连;


服务端代码:

package com.kg.netty.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

import com.kg.netty.msg.KeepAliveMessage;
import com.kg.utils.Constants;
import com.kg.utils.Utils;

public class KeepAliveServer {

	// 端口
	private int port ;

	public KeepAliveServer(int port) {
		this.port = port;
	}
	
	ChannelFuture f ;
	
	ServerBootstrap b ;
	
	// 设置6秒检测chanel是否接受过心跳数据
	private static final int READ_WAIT_SECONDS = 6;
	
	// 定义客户端没有收到服务端的pong消息的最大次数
	private static final int MAX_UN_REC_PING_TIMES = 3;

	public void startServer() {
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			b = new ServerBootstrap();
			b.group(bossGroup, workerGroup);
			b.channel(NioServerSocketChannel.class);
			b.childHandler(new KeepAliveServerInitializer());
			// 服务器绑定端口监听
			f = b.bind(port).sync();
			// 监听服务器关闭监听,此方法会阻塞
			f.channel().closeFuture().sync();

			// 可以简写为
			/* b.bind(portNumber).sync().channel().closeFuture().sync(); */
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
	}

	/**
	 * 消息处理器
	 * @author cullen edward
	 */
	private class KeepAliveServerInitializer extends ChannelInitializer<SocketChannel> {

		@Override
		protected void initChannel(SocketChannel ch) throws Exception {
			ChannelPipeline pipeline = ch.pipeline();
			
			/*
			 * 使用ObjectDecoder和ObjectEncoder
			 * 因为双向都有写数据和读数据,所以这里需要两个都设置
			 * 如果只读,那么只需要ObjectDecoder即可
			 */
			pipeline.addLast("decoder", new ObjectDecoder(ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));
			pipeline.addLast("encoder", new ObjectEncoder());
			
			/*
			 * 这里只监听读操作
			 * 可以根据需求,监听写操作和总得操作
			 */
			pipeline.addLast("pong", new IdleStateHandler(READ_WAIT_SECONDS, 0, 0,TimeUnit.SECONDS));
			
			pipeline.addLast("handler", new Heartbeat());
		}
	}
	
	private class Heartbeat extends SimpleChannelInboundHandler<KeepAliveMessage> { 
		
		// 失败计数器:未收到client端发送的ping请求
		private int unRecPingTimes = 0 ;
		
		// 每个chanel对应一个线程,此处用来存储对应于每个线程的一些基础数据,此处不一定要为KeepAliveMessage对象
		ThreadLocal<KeepAliveMessage> localMsgInfo = new ThreadLocal<KeepAliveMessage>(); 
		
		@Override
		protected void channelRead0(ChannelHandlerContext ctx, KeepAliveMessage msg) throws Exception {
			System.out.println(ctx.channel().remoteAddress() + " Say : sn=" + msg.getSn()+",reqcode="+msg.getReqCode());
	        // 收到ping消息后,回复
			if(Utils.notEmpty(msg.getSn())&&msg.getReqCode()==1){
				msg.setReqCode(Constants.RET_CODE);
				ctx.channel().writeAndFlush(msg);
				// 失败计数器清零
				unRecPingTimes = 0;
				if(localMsgInfo.get()==null){
					KeepAliveMessage localMsg = new KeepAliveMessage();
					localMsg.setSn(msg.getSn());
					localMsgInfo.set(localMsg);
					/*
					 * 这里可以将设备号放入一个集合中进行统一管理
					 */
					// TODO
				}
			}else{
				ctx.channel().close();
			}
		}
		
		public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
	        if (evt instanceof IdleStateEvent) {
	            IdleStateEvent event = (IdleStateEvent) evt;
	            if (event.state() == IdleState.READER_IDLE) {
	                /*读超时*/
	                System.out.println("===服务端===(READER_IDLE 读超时)");
	                // 失败计数器次数大于等于3次的时候,关闭链接,等待client重连
	                if(unRecPingTimes >= MAX_UN_REC_PING_TIMES){
	                	System.out.println("===服务端===(读超时,关闭chanel)");
	                	// 连续超过N次未收到client的ping消息,那么关闭该通道,等待client重连
	                	ctx.channel().close();
	                }else{
	                	// 失败计数器加1
	                	unRecPingTimes++;
	                }
	            } else if (event.state() == IdleState.WRITER_IDLE) {
	                /*写超时*/   
	                System.out.println("===服务端===(WRITER_IDLE 写超时)");
	            } else if (event.state() == IdleState.ALL_IDLE) {
	                /*总超时*/
	                System.out.println("===服务端===(ALL_IDLE 总超时)");
	            }
	        }
	    }
		
	    @Override
		public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
	    	System.out.println("错误原因:"+cause.getMessage());
	    	if(localMsgInfo.get()!=null){
				/*
				 * 从管理集合中移除设备号等唯一标示,标示设备离线
				 */
	    		// TODO
		    }
	    	ctx.channel().close();
		}

		@Override
	    public void channelActive(ChannelHandlerContext ctx) throws Exception {
	        System.out.println("Client active ");
	        super.channelActive(ctx);
	    }
		
		@Override
		public void channelInactive(ChannelHandlerContext ctx) throws Exception {
			// 关闭,等待重连
		    ctx.close();
			if(localMsgInfo.get()!=null){
				/*
				 * 从管理集合中移除设备号等唯一标示,标示设备离线
				 */
	    		// TODO
		    }
		    System.out.println("===服务端===(客户端失效)");
		}
	}
	
	public void stopServer(){
		if(f!=null){
			f.channel().close();
		}
	}

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		KeepAliveServer keepAliveServer = new KeepAliveServer(1666);
		keepAliveServer.startServer();
	}
}



客户端代码:

package com.kg.netty.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.kg.netty.msg.KeepAliveMessage;
import com.kg.utils.Constants;

public class KeepAliveClient {

	private String host ;
	private int port ;
	
	private EventLoopGroup group ;
	
	private Bootstrap b ;
	
	private Channel ch ;
	
	// 定义客户端没有收到服务端的pong消息的最大次数
	private static final int MAX_UN_REC_PONG_TIMES = 3;
	
	// 多长时间未请求后,发送心跳
	private static final int WRITE_WAIT_SECONDS = 5;
	
	// 隔N秒后重连
	private static final int RE_CONN_WAIT_SECONDS = 5;
	
	// 客户端连续N次没有收到服务端的pong消息  计数器
	private int unRecPongTimes = 0 ;
	
	private ScheduledExecutorService executorService ;
	
	// 是否停止
	private boolean isStop = false ;

	public KeepAliveClient(String host, int port) {
		this.host = host ;
		this.port = port ;
		group = new NioEventLoopGroup();
		b = new Bootstrap();
		b.group(group).channel(NioSocketChannel.class).handler(new HeartbeatInitializer());
	}

	public void start() {
		connServer();
	}
	
	private void connServer(){
		
		isStop = false;
		
		if(executorService!=null){
			executorService.shutdown();
		}
		executorService = Executors.newScheduledThreadPool(1);
		executorService.scheduleWithFixedDelay(new Runnable() {
			
			boolean isConnSucc = true;
			
			@Override
			public void run() {
				try {
					// 重置计数器
					unRecPongTimes = 0;
					// 连接服务端
					if(ch!=null&&ch.isOpen()){
						ch.close();
					}
					ch = b.connect(host, port).sync().channel();
					// 此方法会阻塞
//					ch.closeFuture().sync();
					System.out.println("connect server finish");
				} catch (Exception e) {
					e.printStackTrace();
					isConnSucc = false ;
				} finally{
					if(isConnSucc){
						if(executorService!=null){
							executorService.shutdown();
						}
					}
				}
			}
		}, RE_CONN_WAIT_SECONDS, RE_CONN_WAIT_SECONDS, TimeUnit.SECONDS);
	}
	
	public class HeartbeatInitializer extends ChannelInitializer<SocketChannel> {
		 
	    @Override
	    protected void initChannel(SocketChannel ch) throws Exception {
	        ChannelPipeline pipeline = ch.pipeline();
	 
			pipeline.addLast("decoder", new ObjectDecoder(ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));
	        pipeline.addLast("encoder", new ObjectEncoder());
	 
	        pipeline.addLast("ping", new IdleStateHandler(0, WRITE_WAIT_SECONDS, 0,TimeUnit.SECONDS));
	        // 客户端的逻辑
	        pipeline.addLast("handler", new ClientHandler());
	    }
	}

	public class ClientHandler extends SimpleChannelInboundHandler<KeepAliveMessage> {
		 
	    @Override
	    protected void channelRead0(ChannelHandlerContext ctx, KeepAliveMessage msg)
	            throws Exception {
	    	System.out.println("Server say : sn=" + msg.getSn()+",reqcode="+msg.getReqCode());
	        if (Constants.RET_CODE == msg.getReqCode()) {
	        	// 计数器清零
	        	unRecPongTimes = 0;
	        }
	    }
	    
	    @Override
	    public void channelActive(ChannelHandlerContext ctx) throws Exception {
	        System.out.println("Client active ");
	        super.channelActive(ctx);
	    }

	    @Override
	    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
	        System.out.println("Client close ");
	        super.channelInactive(ctx);
	        /*
	         * 重连
	         */
	        if(!isStop){
	        	connServer();
	        }
	    }
	    
	    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
	        if (evt instanceof IdleStateEvent) {
	            IdleStateEvent event = (IdleStateEvent) evt;
	            if (event.state() == IdleState.READER_IDLE) {
	                /*读超时*/
	                System.out.println("===服务端===(READER_IDLE 读超时)");
	            } else if (event.state() == IdleState.WRITER_IDLE) {
	                /*写超时*/   
	                System.out.println("===服务端===(WRITER_IDLE 写超时)");
	                if(unRecPongTimes < MAX_UN_REC_PONG_TIMES){
	                	ctx.channel().writeAndFlush(getSrcMsg()) ;
	                	unRecPongTimes++;
	                }else{
	                	ctx.channel().close();
	                }
	            } else if (event.state() == IdleState.ALL_IDLE) {
	                /*总超时*/
	                System.out.println("===服务端===(ALL_IDLE 总超时)");
	            }
	        }
	    }
	}
	
	private KeepAliveMessage getSrcMsg(){
		KeepAliveMessage keepAliveMessage = new KeepAliveMessage();
		// 设备码
        keepAliveMessage.setSn("sn_123456abcdfef");
        keepAliveMessage.setReqCode(Constants.REQ_CODE);
        return keepAliveMessage ;
	}
	
	public void stop(){
		isStop = true;
		if(ch!=null&&ch.isOpen()){
			ch.close();
		}
		if(executorService!=null){
			executorService.shutdown();
		}
	}
	
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		KeepAliveClient keepAliveServer = new KeepAliveClient("127.0.0.1",1666);
		keepAliveServer.start();
	}
}

参考网站:

http://coder.beitown.com/archives/1180


下载工程,请猛戳

http://download.csdn.net/detail/asd13141718/8492741

作者:asd13141718 发表于2015/3/11 20:45:29 原文链接
阅读:474 评论:0 查看评论

相关 [netty 心跳] 推荐:

[原]基于netty的心跳检测

- - 一切没有被记录的,终将被遗忘
这两天由于要给android系统的设备写一个心跳功能,所以在这里写一个基于netty的心跳检测功能. 1.客户端网络空闲5秒没有进行写操作是,进行发送一次ping心跳给服务端;. 2.客户端如果在下一个发送ping心跳周期来临时,还没有收到服务端pong的心跳应答,则失败心跳计数器加1;. 3.每当客户端收到服务端的pong心跳应答后,失败心跳计数器清零;.

Netty 那些事儿 ——— 心跳机制

- - 神刀安全网
本文是Netty文集中“Netty 那些事儿”系列的文章. 主要结合在开发实战中,我们遇到的一些“奇奇怪怪”的问题,以及如何正确且更好的使用Netty框架,并会对Netty中涉及的重要设计理念进行介绍. 心跳说的是在客户端和服务端在互相建立ESTABLISH状态的时候,如何通过发送一个最简单的包来 保持连接的存活,还有 监控另一边服务的可用性等.

netty实现tcp长连接和心跳检测

- - 开源软件 - ITeye博客
       通过netty实现服务端与客户端的长连接通讯,及心跳检测.        基本思路:netty服务端通过一个Map保存所有连接上来的客户端SocketChannel,客户端的Id作为Map的key. 每次服务器端如果要向某个客户端发送消息,只需根据ClientId取出对应的SocketChannel,往里面写入message即可.

Netty 4.0 实现心跳检测和断线重连

- - ITeye博客
原理:当服务端每隔一段时间就会向客户端发送心跳包,客户端收到心跳包后同样也会回一个心跳包给服务端. 一般情况下,客户端与服务端在指定时间内没有任何读写请求,就会认为连接是idle(空闲的)的. 此时,客户端需要向服务端发送心跳消息,来维持服务端与客户端的链接. 那么怎么判断客户端在指定时间里没有任何读写请求呢.

Netty 超时机制及心跳程序实现

- - ImportNew
本文介绍了 Netty 超时机制的原理,以及如何在连接闲置时发送一个心跳来维持连接. Netty 超时机制的介绍. Netty 的超时类型 IdleState 主要分为:. ALL_IDLE : 一段时间内没有数据接收或者发送. READER_IDLE : 一段时间内没有数据接收. WRITER_IDLE : 一段时间内没有数据发送.

HTTP/2 in Netty

- -
Here, we created a context for the server with a JDK SSL provider, added a couple of ciphers, and configured the Application-Layer Protocol Negotiation for HTTP/2..

Netty系列之Netty高性能之道

- - CSDN博客推荐文章
最近一个圈内朋友通过私信告诉我,通过使用Netty4 + Thrift压缩二进制编解码技术,他们实现了10W TPS(1K的复杂POJO对象)的跨节点远程服务调用. 相比于传统基于Java序列化+BIO(同步阻塞IO)的通信框架,性能提升了8倍多. 事实上,我对这个数据并不感到惊讶,根据我5年多的NIO编程经验,通过选择合适的NIO框架,加上高性能的压缩二进制编解码技术,精心的设计Reactor线程模型,达到上述性能指标是完全有可能的.

Netty代码分析

- LightingMan - 淘宝JAVA中间件团队博客
Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序[官方定义],整体来看其包含了以下内容:1.提供了丰富的协议编解码支持,2.实现自有的buffer系统,减少复制所带来的消耗,3.整套channel的实现,4.基于事件的过程流转以及完整的网络事件响应与扩展,5.丰富的example.

Netty 5用户指南

- - 并发编程网 - ifeve.com
原文地址: http://netty.io/wiki/user-guide-for-5.x.html     译者:光辉勇士       校对:郭蕾. 现如今我们使用通用的应用程序或者类库来实现系统之间地互相访问,比如我们经常使用一个HTTP客户端来从web服务器上获取信息,或者通过web service来执行一个远程的调用.

Netty 用户指南4.x

- - CSDN博客推荐文章
现在我们经常使用程序或者库和其他人交流信息.例如,我们经常使用http程序库去从一个web server接收信息,或者调用一个远程的web服务.然而,一个通用的传输协议或者实现有的时候不能适应我们自己的场景.例如,我们不会用http server来传输一些大的文件,Email和一些实时性的信息,例如金融方面或者有些游戏数据方面的信息.这些需要一个高度优化的协议,为了使用某一种特定的应用场景.