MQTT 客户机应用程序中的回调和同步

标签: | 发表时间:2019-07-23 18:13 | 作者:
出处:https://www.ibm.com

使用Paho订阅mqtt消息时,不需要明确确认收到消息。如果回调正常返回,Paho认为它成功消费并向服务器发送确认。

如果回调抛出 异常,则客户端将被关闭。 请注意,这将导致丢失任何QoS级别为0的消息。

一旦重新连接客户端并再次订阅该主题,服务器将重新发送具有QoS级别1或2的消息。

MQTT 客户机编程模型广泛地使用线程。线程将尽可能降低 MQTT 客户机应用程序与服务器之间传输消息时的延迟。 发布、传递令牌和连接断开事件被传递至用于实现 MqttCallback 的回调类中的方法。

回调

MqttCallback 接口有三种回调方法;请参阅  Callback.java 中的示例实现。connectionLost(java.lang.Throwable cause)
  • 当通信错误导致连接断开时,就会调用 connectionLost。在已经建立连接之后,如果由于服务器上发生错误而导致服务器断开连接,那么也会调用此方法。服务器错误被记录到队列管理器错误日志中。服务器将断开与客户机的连接,并且客户机将调用 MqttCallback.connectionLost。
  • 在客户机应用程序所在的同一线程上作为异常抛出的唯一远程错误就是 MqttClient.connect 产生的异常。在建立连接之后,由服务器检测到的错误将作为可抛出异常向 MqttCallback.connectionLost 回调方法报告。
  • 导致 connectionLost 的典型服务器错误是权限错误。例如,遥测服务器试图代表一个未被授权发布主题的客户机来发布主题。导致将 MQCC_FAIL 条件代码返回到遥测服务器的任何情况都会导致断开连接。
deliveryComplete(MqttDeliveryToken token)
  • deliveryComplete 由 MQTT 客户机调用以将传递令牌传回客户机应用程序;请参阅 传递令牌。通过使用传递令牌,回调可以使用 token.getMessage 方法来访问已发布的消息。
  • 当应用程序回调在由 deliveryComplete 方法调用后将控制返回到 MQTT 客户机时,将完成传递。 在完成传递之前,持久类将保留服务质量为 QoS 1 或 QoS 2 的消息。
  • 调用 deliveryComplete 是应用程序与持久性类之间的同步点。决不会对同一条消息两次调用 deliveryComplete 方法。
  • 当应用程序回调从 deliveryComplete 返回到 MQTT 客户机时,该客户机会针对具有 QoS 1 或 2 的消息调用 MqttClientPersistence.remove。MqttClientPersistence.remove 会删除所发布消息在本地存储的副本。
  • 从事务处理角度来说,调用 deliveryComplete 是用于落实传递的单阶段事务。如果在回调期间处理失败,那么在重新启动客户机时,将再次调用 MqttClientPersistence.remove,以删除已发布的消息的本地副本。不会再次调用此回调。如果要使用回调来存储已传递的消息的日志,那么将无法使该日志与 MQTT 客户机同步。如果您想可靠地存储日志,那么请更新 MqttClientPersistence 类中的日志。
  • 传递令牌和消息由主应用程序线程和 MQTT 客户机引用。完成传递后,MQTT 客户机将取消引用 MqttMessage 对象;客户机断开连接时,它将取消引用传递令牌对象。在完成传递之后,如果客户机应用程序取消对 MqttMessage 对象的引用,那么可以对此对象进行垃圾回收。在会话断开连接之后,可以对传递令牌进行垃圾回收。
  • 在已经发布消息之后,可以获取 MqttDeliveryToken 和 MqttMessage 属性。如果您在已经发布消息之后尝试设置任何 MqttMessage 属性,那么结果是不确定的。
  • 如果客户机重新连接到具有同一 ClientIdentifier 的先前的会话,那么 MQTT 客户机将继续处理传递应答;请参阅 清除会话。MQTT 客户机应用程序必须针对先前的会话将 MqttClient.CleanSession 设置为 false,并在新会话中将其设置为 false。MQTT 客户机可在新会话中创建新的传递令牌和消息对象,以供暂挂的传递使用。它将使用 MqttClientPersistence 类来恢复对象。如果应用程序客户机仍然引用了旧的传递令牌和消息,那么请取消对它们的引用。对于在先前会话中启动的以及在此会话中完成的任何传递,会在这些传递的新会话中调用应用程序回调。
  • 在应用程序客户机连接之后,当完成暂挂的传递时,就会调用应用程序回调。在应用程序客户机连接之前,它可以使用 MqttClient.getPendingDeliveryTokens 方法来检索暂挂的传递。
  • 请注意,客户机应用程序最初创建了已发布的消息对象及其有效内容字节数组。 MQTT 客户机将引用这些对象。由 token.getMessage 方法中的传递令牌返回的消息对象不一定是客户机所创建的同一消息对象。如果新的 MQTT 客户机实例重新创建了传递令牌,那么 MqttClientPersistence 类将重新创建 MqttMessage 对象。为了保持一致性,无论消息对象是由应用程序客户机还是由 MqttClientPersistence 类创建的,如果 token.isCompleted 为 true,那么 token.getMessage 将返回 null。
messageArrived(MqttTopic topic, MqttMessage message)
  • 当客户机的与预订主题相匹配的预订到达时,就会调用 messageArrived。topic 是发布主题,而不是预订过滤器。如果过滤器中包含通配符,那么这两者可能不同。
  • 如果此主题与客户机所创建的多个预订相匹配,那么客户机将接收到此发布的多个副本。如果客户机发布至它也预订了的某个主题,那么它将接收到它自己的发布的副本。
  • 如果使用值为 1 或 2 的 QoS 发送消息,那么该消息将由 MqttClientPersistence 类存储,之后 MQTT 客户机将调用 messageArrived。messageArrived 的运行方式类似 deliveryComplete:它只能针对发布调用一次,当 messageArrived返回到 MQTT 客户机时,此发布的本地副本将由 MqttClientPersistence.remove 除去。当 messageArrived 返回到 MQTT客户机时,MQTT 客户机会中断其对主题和消息的引用。如果应用程序客户机尚不具备对对象的引用,那么会对主题和消息对象进行垃圾回收。

回调、线程技术和客户机应用程序同步

MQTT 客户机会将独立线程上的回调方法调用到主应用程序线程。客户机应用程序不会创建回调线程,它由 MQTT 客户机创建。

MQTT 客户机将同步回调方法。一次只有回调方法的一个实例运行。通过同步,很容易更新一个用于清点已经传递了哪些发布的对象。一次只运行 MqttCallback.deliveryComplete 的一个实例,因此,更新记录而不进一步进行同步是安全的。一次只有一个发布到达也是这种情况。messageArrived 方法中的代码可以更新对象而不使此对象同步。如果您正在另一个线程中引用记录或者要更新的对象,那么使此记录或对象同步。

传递令牌提供了主应用程序线程与发布的传递之间的同步机制。token.waitForCompletion 方法将一直等到完成了传递特定发布,或者等到可选超时已到期。可以使用 token.waitForCompletion 并通过两种简单的方法来一次处理一个发布:

  1. 暂停应用程序客户机,直到完成了发布的传递;请参阅  PubSync.java
  2. 与 MqttCallback.deliveryComplete 方法同步。只有当 MqttCallback.deliveryComplete 返回到 MQTT 客户机时,token.waitForCompletion 才会恢复。 通过使用此机制,在代码运行于主应用程序线程中之前,可以使 MqttCallback.deliveryComplete 中正在运行的代码同步。

如果您想进行发布而不等待传递每个发布,但您想确认何时已经传递了所有发布,情况会怎样呢?如果您在单个线程上发布,那么要发送的最后一个发布也是要传递的最后一个发布。

使发送至服务器的请求同步

表 1. 导致将请求发送至服务器的方法的同步行为.

该表列出了发送请求到服务器的 MQTT Java™ 客户机中的方法。对于每一种方法,该表描述了一些条件,在这些条件下,方法或者等待或者返回,并描述了方法的等待时间。

方法 同步 超时时间间隔
MqttClient.Connect

等待与服务器建立连接。

缺省设置为 30 秒,或者由参数进行设置。

MqttClient.Disconnect

等待 MQTT 客户机完成所有必需的工作,并等待 TCP/IP 会话断开连接。

缺省设置为 30 秒,或者由参数进行设置。

MqttClient.Subscribe

等待订阅请求完成。

缺省设置为 30 秒,或者由参数进行设置。

MqttClient.UnSubscribe

等待取消预订请求完成。

缺省设置为 30 秒,或者由参数进行设置。

MqttClient.Publish

将请求传递到 MQTT 客户机后,立即返回到应用程序线程。

无。

MqttDeliveryToken.waitForCompletion

等待返回传递令牌。

缺省设置无限期,或者由参数进行设置。


时间戳记图标 最近一次更新时间: 2018 年 10 月 26 日,星期五
https://www.ibm.com/support/knowledgecenter/zh/SSFKSJ_7.5.0/com.ibm.mm.tc.doc/tc60390_.htm

相关 [mqtt 客户机 应用程序] 推荐:

MQTT 客户机应用程序中的回调和同步

- -
使用Paho订阅mqtt消息时,不需要明确确认收到消息. 如果回调正常返回,Paho认为它成功消费并向服务器发送确认.  请注意,这将导致丢失任何QoS级别为0的消息. 一旦重新连接客户端并再次订阅该主题,服务器将重新发送具有QoS级别1或2的消息. MQTT 客户机编程模型广泛地使用线程. 线程将尽可能降低 MQTT 客户机应用程序与服务器之间传输消息时的延迟.

GitHub - GruppoFilippetti/vertx-mqtt-broker: Vert.x based MQTT Broker

- -

MQTT协议 - 安全问题

- - ITeye博客
        物联网的核心是连接万物,通过交换并分析数据使得生活更舒适与便捷. 不过,敏感数据泄露或者设备被非法控制可不是闹着玩的. 比如前段时间国内某著名家电企业的智能洗衣机,使用了某著名电商基于XMPP协议的物联网平台,不费吹灰之力便被黑客攻破并远程遥控,给智能家居的发展带来了一些阴影. 究其本质,并不是物联网技术本身有缺陷,而是在物联网系统的设计中最基本的安全设计被工程师轻视了,才导致整个系统的崩塌.

activeMQ 推送之mqtt客户端

- - ITeye博客
使用activeMQ进行android推送. activeMQ下载地址:http://activemq.apache.org/download.html. 下载后是一个压缩包:apache-activemq-5.9.0-bin.zip. 解压缩,进入apache-activemq-5.9.0-bin\apache-activemq-5.9.0\bin,双击activemq.bat,即可启动activeMQ服务.

Android推送方案分析(MQTT/XMPP/GCM)

- - 移动开发 - ITeye博客
本文主旨在于,对目前Android平台上最主流的几种消息推送方案进行分析和对比,比较客观地反映出这些推送方案的优缺点,帮助大家选择最合适的实施方案. 方案1、使用GCM服务(Google Cloud Messaging). 简介:Google推出的云消息服务,即第二代的C2DM. 优点:Google提供的服务、原生、简单,无需实现和部署服务端.

Android 应用程序

- - CSDN博客推荐文章
Android 应用程序由四个模块构造而成:Activity、Intent 、Content Provider 、Service. 下面简单介绍一下如下模块的含义:. 1、Activity  "活动". 一个Activity就是单独的屏幕,每一个活动都被实现为一个独立的类,并且从活动基类中继承而来,活动类将会显示由视图控件组成的用户接口并对事件作出响应.

建立一个高可用的MQTT物联网集群How to Build an High Availability MQTT Cluster for the Internet of Things

- -
建立一个高可用的MQTT物联网集群. We were searching for a secure (auth based), customisable (communicating with our REST API) and easy to use solution (we knew Node.js).

iPhone应用程序推荐

- sylvia - 月光博客
  本文将为大家推荐一些笔者非常喜欢的iPhone应用程序. 注意,并不怎么包括游戏,因为笔者不太喜欢玩游戏,要玩也只玩小游戏. 这也有些遗憾,毕竟iPhone最大的卖点就是丰富的游戏了. 本文主要是推荐实用的软件和系统工具. 对新手应该帮助比较大,老鸟们也可以参考一下. 而且本文也不提供下载链接与安装方法.

MQTT协议笔记之mqtt.io项目TCP协议支持

- - BlogJava-首页技术区
MQTT定义了物联网传输协议,其标准倾向于原始TCP实现. 构建于TCP的上层协议堆栈,诸如HTTP等,在空间上多了一些处理路径,稍微耗费了CPU和内存,虽看似微乎其微,但对很多处理能力不足的嵌入式设备而言,选择原始的TCP却是最好的选择. 但单纯TCP不是所有物件联网的最佳选择,提供构建与TCP基础之上的传统的HTTP通信支持,尤其是浏览器、性能富裕的桌面涉及领域,还是企业最 可信赖、最可控的传输方式之一.

使用ActiveMQ+MQTT实现Android点对点消息通知-转载

- - 开源软件 - ITeye博客
ActiveMQ使用MQTT协议,加上android上的paho包,即可简单实现消息通知功能,但是mqtt协议只支持topic,而且不能用selector,使得点对点的消息投递变成问题. 1、每个clientId,建一个topic...这个办法对解决消息点对点投递非常有效,但是有两个大问题:. 随着用户数增多,topic数量增多,对管理性要求增大,对内存的管理也有问题.