activeMQ 推送之mqtt客户端

标签: activemq 推送 mqtt | 发表时间:2014-05-17 23:01 | 作者:
出处:http://www.iteye.com

使用activeMQ进行android推送

activeMQ下载地址:http://activemq.apache.org/download.html

下载后是一个压缩包:apache-activemq-5.9.0-bin.zip

启动方式:

解压缩,进入apache-activemq-5.9.0-bin\apache-activemq-5.9.0\bin,双击activemq.bat,即可启动activeMQ服务

 启动之后:

 android客户端推送采用mqtt(paho-mqtt-client-1.0.1.jar),依赖包见附件

 

但是为了测试,我写了一个swing图形界面,充当手机客户端,依赖的jar包仍然是paho-mqtt-client-1.0.1.jar.界面如下:

使用方法:点击 [启动]按钮,开始接收推送消息
 对应的主类是:MqttSwing,用于接收推送消息.

 

我还写了一个发送推送消息的swing图形界面,充当推送后管系统,界面如下:

使用方法:点击[连接]按钮,才可以发送推送消息
 对应的主类:PusherApp,用于发送推送消息.

核心代码介绍如下.

客户端连接activeMQ,建立连接(只有建立连接,才能接收到推送消息)

方法名:connect,做了两件事:(1)建立连接;(2)订阅主题(topic)

 

/***
	 * 客户端和activeMQ服务器建立连接
	 * @param BROKER_URL
	 * @param clientId : 用于标识客户端,相当于ios中的device token
	 * @param TOPIC
	 * @param isCleanSession :false--可以接受离线消息;
	 * @return 是否启动成功 
	 */
	private boolean connect(String BROKER_URL,String clientId,String TOPIC,boolean isCleanSession){
		try {
			ComponentUtil.appendResult(resultTextPane, "connect time:"+TimeHWUtil.getCurrentMiniuteSecond(), true);
            mqttClient = new MqttClient(BROKER_URL, clientId, new MemoryPersistence());
            MqttConnectOptions options= new MqttConnectOptions();
            options.setCleanSession(isCleanSession);//mqtt receive offline message
            ComponentUtil.appendResult(resultTextPane, "isCleanSession:"+isCleanSession, true);
            options.setKeepAliveInterval(30);
            //推送回调类,在此类中处理消息,用于消息监听
            mqttClient.setCallback(new MyCallBack(MqttSwing.this));
            boolean isSuccess=false;
            try {
				mqttClient.connect(options);//CLIENT ID CAN NOT BE SAME
				isSuccess=true;
			} catch (Exception e) {
				if(isPrintException){
					e.printStackTrace();
				}
			}
            if(!isSuccess){
            	String message="连接失败,请检查client id是否重复了 或者activeMQ是否启动";
            	ComponentUtil.appendResult(resultTextPane, message, true);
            	GUIUtil23.warningDialog(message);
            	return false;
            }else{
            //Subscribe to topics 
	            mqttClient.subscribe(new String[]{TOPIC,clientId});
	           
	            System.out.println("topic:"+TOPIC+",  "+(clientId));
	            ComponentUtil.appendResult(resultTextPane, "TOPIC:"+TOPIC+",  "+(clientId), true);
            }

        } catch (MqttException e) {
        	if(isPrintException){
            e.printStackTrace();}
            GUIUtil23.errorDialog(e.getMessage());
            return false;
        }
		return true;
	}

 

 

推送消息到来时的回调类:MyCallBack

 

package com.mqtt.hw.callback;

import org.apache.commons.lang.StringEscapeUtils;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;

import com.mqtt.hw.MqttSwing;
import com.time.util.TimeHWUtil;

public class MyCallBack implements MqttCallback {
	private MqttSwing mqttSwing;
	
	
	
	public MyCallBack(MqttSwing mqttSwing) {
		super();
		this.mqttSwing = mqttSwing;
	}

	@Override
	public void connectionLost(Throwable cause) {
		
	}

	@Override
	public void messageArrived(MqttTopic topic, MqttMessage message)
			throws Exception {
		System.out.println("messageArrived...."+TimeHWUtil.getCurrentMiniuteSecond());
		String messageStr=StringEscapeUtils.unescapeHtml(new String(message.getPayload()));
		System.out.println("message:"+messageStr);
		this.mqttSwing.receiveMessage(messageStr);
		//使窗口处于激活状态

	}

	@Override
	public void deliveryComplete(MqttDeliveryToken token) {
		
	}

}

 

 

推送者与activeMQ建立连接:

 

/**
	 * 初始化connection和session
	 * 
	 * @throws Exception
	 */
	private void init(/* String mqIp,boolean transacted */) throws Exception {
		if (!DialogUtil.verifyTFEmpty(serverIpTextField, "服务器ip")) {
			return;
		}
		String transactedStr = transactedTextField.getText();
		boolean transacted = false;
		if (ValueWidget.isNullOrEmpty(transactedStr)) {
			transacted = false;
		} else {
			transacted = Boolean.parseBoolean(transactedStr);
		}
		String message = "transacted:" + transacted;
		ComponentUtil.appendResult(resultTextArea, message, true);
		System.out.println(message);
		String brokerUrl = String.format(BROKER_URL,
				serverIpTextField.getText());
		// 创建链接工厂
		TopicConnectionFactory factory = new ActiveMQConnectionFactory(
				ActiveMQConnection.DEFAULT_USER,
				ActiveMQConnection.DEFAULT_PASSWORD, brokerUrl);
		ComponentUtil.appendResult(resultTextArea, "url:" + brokerUrl, true);
		// 通过工厂创建一个连接
		
		connection = factory.createTopicConnection();
		// 启动连接
		connection.start();
		ComponentUtil.appendResult(resultTextArea, "启动connection 成功", true);
		// 创建一个session会话 transacted
		session = connection.createTopicSession(
				transacted /* Boolean.FALSE */, Session.AUTO_ACKNOWLEDGE);

	}

 

 

项目源代码见附件 mqtt_swing.zip

手机android客户端(测试推送)见附件 android-mqtt-push-master.zip

也可以从   https://github.com/tokudu/AndroidPushNotificationsDemo

 

  下载

详细配置参阅附件 mqtt推送详解.zip

 





已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [activemq 推送 mqtt] 推荐:

activeMQ 推送之mqtt客户端

- - ITeye博客
使用activeMQ进行android推送. activeMQ下载地址:http://activemq.apache.org/download.html. 下载后是一个压缩包:apache-activemq-5.9.0-bin.zip. 解压缩,进入apache-activemq-5.9.0-bin\apache-activemq-5.9.0\bin,双击activemq.bat,即可启动activeMQ服务.

使用ActiveMQ+MQTT实现Android点对点消息通知-转载

- - 开源软件 - ITeye博客
ActiveMQ使用MQTT协议,加上android上的paho包,即可简单实现消息通知功能,但是mqtt协议只支持topic,而且不能用selector,使得点对点的消息投递变成问题. 1、每个clientId,建一个topic...这个办法对解决消息点对点投递非常有效,但是有两个大问题:. 随着用户数增多,topic数量增多,对管理性要求增大,对内存的管理也有问题.

Android推送方案分析(MQTT/XMPP/GCM)

- - 移动开发 - ITeye博客
本文主旨在于,对目前Android平台上最主流的几种消息推送方案进行分析和对比,比较客观地反映出这些推送方案的优缺点,帮助大家选择最合适的实施方案. 方案1、使用GCM服务(Google Cloud Messaging). 简介:Google推出的云消息服务,即第二代的C2DM. 优点:Google提供的服务、原生、简单,无需实现和部署服务端.

互联网推送服务原理:长连接+心跳机制(MQTT协议)

- - 移动开发 - ITeye博客
互联网推送消息的方式很常见,特别是移动互联网上,手机每天都能收到好多推送消息,经过研究发现,这些推送服务的原理都是维护一个长连接(要不不可能达到实时效果),但普通的socket连接对服务器的消耗太大了,所以才会出现像MQTT这种轻量级低消耗的协议来维护长连接,那么要如何维护长连接呢:.        在写之前,我们首先了解一下为什么android维护长连接需要心跳机制,首先我们知道,维护任何一个长连接都需要心跳机制,客户端发送一个心跳给服务器,服务器给客户端一个心跳应答,这样就形成客户端服务器的一次完整的握手,这个握手是让双方都知道他们之间的连接是没有断开,客户端是在线的.

Android APP必备高级功能,消息推送之MQTT - CSDN博客

- -
本文已授权微信公众号《鸿洋》原创首发,转载请务必注明出处. Android端实现消息推送的几种方式. 轮询:客户端定时向服务器请求数据. 服务器需要向客户端发通知时,发送一条短信,客户端收到特定短信之后,先获取信息,然后拦截短信. 缺点:贵而且短信可能被安全软件拦截. 持久连接(Push)方式:客户端和服务器之间建立长久连接.

MQTT(使用mosquitto做broker)做Android推送部分总结 - Scholer的个人页面 - 开源中国

- -
我觉得这句话用在程序员的工作中就是:在网络中找一万篇资料,在实践中做一万种尝试.
**2014-09-17:** **在本文中,由于作者事先不了解,设计不合理,使每个设备采用prefix+CLIENT_ID的方式作为topic,导致需要给每个设备的topic单独推送,才产生了一些问题,特别是推送的时间上的问题,是PHP循环往每个topic写入消息的时间.

【ActiveMQ Tuning】Prefetch Limit

- - 博客园_首页
   摘要:ActiveMQ优化 客户端优化 预取限制. 原文: http://fusesource.com/docs/broker/5.4/tuning/GenTuning-Consumer-Prefetch.html. Overview:图列4.1阐明了Broker在等待之前发送给客户端消息的反馈的行为.

【ActiveMQ Tuning】Serializing to Disk

- - 博客园_首页
     翻译自: http://fusesource.com/docs/broker/5.4/tuning/PersTuning-SerialToDisk.html.      KahaDB message store:KahaDB 是ActiveMQ Broker 为了高性能而推荐使用的消息存储机制.

ActiveMQ 桥接

- - CSDN博客互联网推荐文章
使用目的:将本地产生的消息转发到远程,通过远程服务器来处理消息,处理完成后,再启动消费者处理本地服务器消息(验证消息是否被转走,本地无消息可处理为正常). 消息在下面的地址被消费,无需任何特别配置,采用默认的配置即可. 生产消息地址为localhost:7001,需要做如下配置. 注意: 表示只有这个队列的会进行桥接转发.

GitHub - GruppoFilippetti/vertx-mqtt-broker: Vert.x based MQTT Broker

- -