ZeroMQ--使用jzmq进行编程
http://my.oschina.net/cloudcoder/blog/200989
一、环境搭建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
wget http://download.zeromq.org/zeromq-2.1.7.tar.gz tar -xzf zeromq-2.1.7.tar.gz cd zeromq-2.1.7 ./configure make sudo make install git clone https://github.com/nathanmarz/jzmq.git cd jzmq ./autogen.sh ./configure make sudo make install 如果没有安装libtool、libuuid-devel则需要先安装,否则安装失败 yum install libtool yum install libuuid-devel |
常见问题:
出现java.lang.UnsatisfiedLinkError: /usr/local/lib/libjzmq.so.0.0.0: libzmq.so.1: cannot open shared object file: No such file or directory异常
原因是未找到zmq动态链接库。
解决方法1:export LD_LIBRARY_PATH=/usr/local/lib
解决方法2:编辑/etc/ld.so.conf文件,增加一行:/usr/local/lib。再执行sudo ldconfig命令
Exception in thread "main" java.lang.UnsatisfiedLinkError: no jzmq in java.library.path
未设置native library
在eclipse设置native library为/usr/local/lib
或在jvm增加参数
-Djava.library.path=/usr/local/lib
或在启动脚本中增加
java -Djava.library.path=/usr/local/lib
二、使用jzmq进行编程
1.创建maven项目,pom.xml的内容参见pom.xml
注意:jzmq的版本不能太高,建议使用2.1.0,目前storm也是使用这个版本的jzmq-2.1.0.jar
否则报: java.lang.UnsatisfiedLinkError: org.zeromq.ZMQ$Socket.nativeInit()V
2.编写Publisher.java,Subscriber.java,参见源代码
Publisher.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
package com.catt.mqtest.pubsub;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
public class Publisher {
// 等待10个订阅者
private static final int SUBSCRIBERS_EXPECTED = 10 ;
// 定义一个全局的记录器,通过LoggerFactory获取
private final static Logger log = LoggerFactory.getLogger(Publisher. class );
public static void main(String[] args) throws InterruptedException{
Context context = ZMQ.context( 1 );
Socket publisher = context.socket(ZMQ.PUB);
publisher.bind( "tcp://*:5557" );
try {
// zmq发送速度太快,在订阅者尚未与发布者建立联系时,已经开始了数据发布
Thread.sleep( 1000 );
} catch (InterruptedException e) {
e.printStackTrace();
}
publisher.send( "send start......" .getBytes(), 0 );
for ( int i = 0 ; i < 10 ; i++) {
publisher.send(( "Hello world " +i).getBytes(), ZMQ.NOBLOCK);
}
publisher.send( "send end......" .getBytes(), 0 );
publisher.close();
context.term();
}
} |
Subscriber.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
package com.catt.mqtest.pubsub;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
public class Subscriber {
// 定义一个全局的记录器,通过LoggerFactory获取
private final static Logger log = LoggerFactory.getLogger(Subscriber. class );
public static void main(String[] args) {
Context context = ZMQ.context( 1 );
Socket subscriber = context.socket(ZMQ.SUB);
subscriber.connect( "tcp://192.168.230.128:5557" );
subscriber.subscribe( "" .getBytes());
int total = 0 ;
while ( true ) {
byte [] stringValue = subscriber.recv( 0 );
String string = new String(stringValue);
if (string.equals( "send end......" )) {
break ;
}
total++;
System.out.println( "Received " + total + " updates. :" + string);
}
subscriber.close();
context.term();
}
} |
pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
< project xmlns = "http://maven.apache.org/POM/4.0.0" xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation = "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" >
< modelVersion >4.0.0</ modelVersion >
< groupId >com.catt</ groupId >
< artifactId >mqtest</ artifactId >
< version >0.0.1-SNAPSHOT</ version >
< packaging >jar</ packaging >
< name >mqtest</ name >
< url >http://maven.apache.org</ url >
< properties >
< project.build.sourceEncoding >UTF-8</ project.build.sourceEncoding >
</ properties >
< dependencies >
< dependency >
< groupId >org.zeromq</ groupId >
< artifactId >jzmq</ artifactId >
< version >2.1.0</ version >
</ dependency >
< dependency >
< groupId >ch.qos.logback</ groupId >
< artifactId >logback-classic</ artifactId >
< version >1.1.1</ version >
</ dependency >
< dependency >
< groupId >junit</ groupId >
< artifactId >junit</ artifactId >
< version >4.10</ version >
< scope >test</ scope >
</ dependency >
</ dependencies >
</ project >
|
已有 0 人发表留言,猛击->> 这里<<-参与讨论
ITeye推荐