ZeroMQ(java)中监控Socket
基本上ZeroMQ(java)中基本的代码都算是过了一遍了吧,不过觉得它在日志这一块貌似基本没有做什么工作,也就是我们通过日志来知道ZeroMQ都发生了什么事情。。
而且由于ZeroMQ中将连接的建立和重连接都进行了隔离,用户不需要做什么事情来维护连接,当然这样做的好处是使程序员的编码工作变少了,但是当然也有不好的地方,那就是用户失去了对ZeroMQ整个运行阶段的控制。。
例如,当我们主动去连接一个远程地址,或者连接中断之后,没有日志告诉我们都这些事情发生了。。。当时对于一个消息通信系统来说,这些日志,或者说事件的监控都是至关重要的。
当然没有日志来记录这些内容,并不代表我们就无法知道当前运行情况都发生了什么事情,ZeroMQ中采用了另外一种方法来监控Socket的情况,不过这种方法比较恶心,需要建立额外的Socket来监控自己感兴趣的Socket。。。
这里先不说这些闲话了,来看看ZeroMQ中都定义了哪些监听事件:
public static final int EVENT_CONNECTED = zmq.ZMQ.ZMQ_EVENT_CONNECTED; //当主动建立连接建立成功之后的事件 public static final int EVENT_DELAYED = zmq.ZMQ.ZMQ_EVENT_CONNECT_DELAYED; //连接延迟 public static final int EVENT_RETRIED = zmq.ZMQ.ZMQ_EVENT_CONNECT_RETRIED; //尝试重新连接 public static final int EVENT_CONNECT_FAILED = zmq.ZMQ.ZMQ_EVENT_CONNECT_FAILED; //连接失败 public static final int EVENT_LISTENING = zmq.ZMQ.ZMQ_EVENT_LISTENING; //建立了监听 public static final int EVENT_BIND_FAILED = zmq.ZMQ.ZMQ_EVENT_BIND_FAILED; //bind失败 public static final int EVENT_ACCEPTED = zmq.ZMQ.ZMQ_EVENT_ACCEPTED; //接收到accept事件 public static final int EVENT_ACCEPT_FAILED = zmq.ZMQ.ZMQ_EVENT_ACCEPT_FAILED; //accept出错的事件 public static final int EVENT_CLOSED = zmq.ZMQ.ZMQ_EVENT_CLOSED; //关闭事件 public static final int EVENT_CLOSE_FAILED = zmq.ZMQ.ZMQ_EVENT_CLOSE_FAILED; //关闭失败 public static final int EVENT_DISCONNECTED = zmq.ZMQ.ZMQ_EVENT_DISCONNECTED; //连接断开 public static final int EVENT_ALL = zmq.ZMQ.ZMQ_EVENT_ALL; //所有的事件
上面是定义的所有可能发生的事件,具体每一种事件代表什么意思后面的注释都已经说明了,当然这里面我觉得最重要的事件有连接的断开,连接的建立,以及重连接等事件。。。
接下来我们来看看如何在ZeroMQ(java)中来监控这些事件吧,直接上代码了:
import org.zeromq.ZMQ; import zmq.ZMQ.Event; public class Request { public static void main (String args[]) { ZMQ.Context context = ZMQ.context(1); ZMQ.Socket req = context.socket(ZMQ.REQ); req.monitor("inproc://reqmoniter", ZMQ.EVENT_CONNECTED | ZMQ.EVENT_DISCONNECTED); //这段代码会创建一个pair类型的socket,专门来接收当前socket发生的事件 final ZMQ.Socket moniter = context.socket(ZMQ.PAIR); //这里创建一个pair类型的socket,用于与上面建立的moniter建立连接 moniter.connect("inproc://reqmoniter"); //连接当前socket的监听 new Thread(new Runnable(){ public void run() { // TODO Auto-generated method stub while (true) { Event event = Event.read(moniter.base()); //从当前moniter里面读取event System.out.println(event.event + " " + event.addr); } } }).start(); req.connect("tcp://127.0.0.1:5000"); while (true) { req.send("aaa"); req.recv(); } } } package monit; import org.zeromq.ZMQ; public class Response { public static void main(String args[]) { final ZMQ.Context context = ZMQ.context(1); ZMQ.Socket response = context.socket(ZMQ.REP); response.bind("tcp://*:5000"); while (!Thread.currentThread().isInterrupted()) { response.recv(); response.send("hello".getBytes()); try { Thread.currentThread().sleep(100); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } response.close(); context.term(); } }
这里代码用req/rep来举例子的,其实用起来还算是比较的简单,当连接建立之后,将会输出:
1 tcp://127.0.0.1:5000
1就是代表连接建立的事件,后面是建立连接的地址,
这个时候如果关闭response,那么将会输出如下:
512 tcp://127.0.0.1:5000
512代表的就是连接断开的事件。。。
嗯,至于说监控的源码是怎么实现的,这里就不细说了,其实还是非常的简单的,如果有兴趣可以自己去看看。。
到此ZeroMQ(java)的代码就算是读的差不多了,算是告一段落吧,接下来好好看看书。。。另外有想 再搞搞C/C++方面的东西。。。初步计划看看Redis的源码吧。。。。