MQTT 客户机应用程序中的回调和同步
使用Paho订阅mqtt消息时,不需要明确确认收到消息。如果回调正常返回,Paho认为它成功消费并向服务器发送确认。
如果回调抛出 异常,则客户端将被关闭。 请注意,这将导致丢失任何QoS级别为0的消息。
一旦重新连接客户端并再次订阅该主题,服务器将重新发送具有QoS级别1或2的消息。
MQTT 客户机编程模型广泛地使用线程。线程将尽可能降低 MQTT 客户机应用程序与服务器之间传输消息时的延迟。 发布、传递令牌和连接断开事件被传递至用于实现 MqttCallback 的回调类中的方法。
回调
MqttCallback 接口有三种回调方法;请参阅 Callback.java 中的示例实现。connectionLost(java.lang.Throwable cause)- 当通信错误导致连接断开时,就会调用 connectionLost。在已经建立连接之后,如果由于服务器上发生错误而导致服务器断开连接,那么也会调用此方法。服务器错误被记录到队列管理器错误日志中。服务器将断开与客户机的连接,并且客户机将调用 MqttCallback.connectionLost。
- 在客户机应用程序所在的同一线程上作为异常抛出的唯一远程错误就是 MqttClient.connect 产生的异常。在建立连接之后,由服务器检测到的错误将作为可抛出异常向 MqttCallback.connectionLost 回调方法报告。
- 导致 connectionLost 的典型服务器错误是权限错误。例如,遥测服务器试图代表一个未被授权发布主题的客户机来发布主题。导致将 MQCC_FAIL 条件代码返回到遥测服务器的任何情况都会导致断开连接。
- deliveryComplete 由 MQTT 客户机调用以将传递令牌传回客户机应用程序;请参阅 传递令牌。通过使用传递令牌,回调可以使用 token.getMessage 方法来访问已发布的消息。
- 当应用程序回调在由 deliveryComplete 方法调用后将控制返回到 MQTT 客户机时,将完成传递。 在完成传递之前,持久类将保留服务质量为 QoS 1 或 QoS 2 的消息。
- 调用 deliveryComplete 是应用程序与持久性类之间的同步点。决不会对同一条消息两次调用 deliveryComplete 方法。
- 当应用程序回调从 deliveryComplete 返回到 MQTT 客户机时,该客户机会针对具有 QoS 1 或 2 的消息调用 MqttClientPersistence.remove。MqttClientPersistence.remove 会删除所发布消息在本地存储的副本。
- 从事务处理角度来说,调用 deliveryComplete 是用于落实传递的单阶段事务。如果在回调期间处理失败,那么在重新启动客户机时,将再次调用 MqttClientPersistence.remove,以删除已发布的消息的本地副本。不会再次调用此回调。如果要使用回调来存储已传递的消息的日志,那么将无法使该日志与 MQTT 客户机同步。如果您想可靠地存储日志,那么请更新 MqttClientPersistence 类中的日志。
- 传递令牌和消息由主应用程序线程和 MQTT 客户机引用。完成传递后,MQTT 客户机将取消引用 MqttMessage 对象;客户机断开连接时,它将取消引用传递令牌对象。在完成传递之后,如果客户机应用程序取消对 MqttMessage 对象的引用,那么可以对此对象进行垃圾回收。在会话断开连接之后,可以对传递令牌进行垃圾回收。
- 在已经发布消息之后,可以获取 MqttDeliveryToken 和 MqttMessage 属性。如果您在已经发布消息之后尝试设置任何 MqttMessage 属性,那么结果是不确定的。
- 如果客户机重新连接到具有同一 ClientIdentifier 的先前的会话,那么 MQTT 客户机将继续处理传递应答;请参阅 清除会话。MQTT 客户机应用程序必须针对先前的会话将 MqttClient.CleanSession 设置为 false,并在新会话中将其设置为 false。MQTT 客户机可在新会话中创建新的传递令牌和消息对象,以供暂挂的传递使用。它将使用 MqttClientPersistence 类来恢复对象。如果应用程序客户机仍然引用了旧的传递令牌和消息,那么请取消对它们的引用。对于在先前会话中启动的以及在此会话中完成的任何传递,会在这些传递的新会话中调用应用程序回调。
- 在应用程序客户机连接之后,当完成暂挂的传递时,就会调用应用程序回调。在应用程序客户机连接之前,它可以使用 MqttClient.getPendingDeliveryTokens 方法来检索暂挂的传递。
- 请注意,客户机应用程序最初创建了已发布的消息对象及其有效内容字节数组。 MQTT 客户机将引用这些对象。由 token.getMessage 方法中的传递令牌返回的消息对象不一定是客户机所创建的同一消息对象。如果新的 MQTT 客户机实例重新创建了传递令牌,那么 MqttClientPersistence 类将重新创建 MqttMessage 对象。为了保持一致性,无论消息对象是由应用程序客户机还是由 MqttClientPersistence 类创建的,如果 token.isCompleted 为 true,那么 token.getMessage 将返回 null。
- 当客户机的与预订主题相匹配的预订到达时,就会调用 messageArrived。topic 是发布主题,而不是预订过滤器。如果过滤器中包含通配符,那么这两者可能不同。
- 如果此主题与客户机所创建的多个预订相匹配,那么客户机将接收到此发布的多个副本。如果客户机发布至它也预订了的某个主题,那么它将接收到它自己的发布的副本。
- 如果使用值为 1 或 2 的 QoS 发送消息,那么该消息将由 MqttClientPersistence 类存储,之后 MQTT 客户机将调用 messageArrived。messageArrived 的运行方式类似 deliveryComplete:它只能针对发布调用一次,当 messageArrived返回到 MQTT 客户机时,此发布的本地副本将由 MqttClientPersistence.remove 除去。当 messageArrived 返回到 MQTT客户机时,MQTT 客户机会中断其对主题和消息的引用。如果应用程序客户机尚不具备对对象的引用,那么会对主题和消息对象进行垃圾回收。
回调、线程技术和客户机应用程序同步
MQTT 客户机会将独立线程上的回调方法调用到主应用程序线程。客户机应用程序不会创建回调线程,它由 MQTT 客户机创建。
MQTT 客户机将同步回调方法。一次只有回调方法的一个实例运行。通过同步,很容易更新一个用于清点已经传递了哪些发布的对象。一次只运行 MqttCallback.deliveryComplete 的一个实例,因此,更新记录而不进一步进行同步是安全的。一次只有一个发布到达也是这种情况。messageArrived 方法中的代码可以更新对象而不使此对象同步。如果您正在另一个线程中引用记录或者要更新的对象,那么使此记录或对象同步。
传递令牌提供了主应用程序线程与发布的传递之间的同步机制。token.waitForCompletion 方法将一直等到完成了传递特定发布,或者等到可选超时已到期。可以使用 token.waitForCompletion 并通过两种简单的方法来一次处理一个发布:
- 暂停应用程序客户机,直到完成了发布的传递;请参阅 PubSync.java。
- 与 MqttCallback.deliveryComplete 方法同步。只有当 MqttCallback.deliveryComplete 返回到 MQTT 客户机时,token.waitForCompletion 才会恢复。 通过使用此机制,在代码运行于主应用程序线程中之前,可以使 MqttCallback.deliveryComplete 中正在运行的代码同步。
如果您想进行发布而不等待传递每个发布,但您想确认何时已经传递了所有发布,情况会怎样呢?如果您在单个线程上发布,那么要发送的最后一个发布也是要传递的最后一个发布。
使发送至服务器的请求同步
方法 | 同步 | 超时时间间隔 |
---|---|---|
MqttClient.Connect | 等待与服务器建立连接。 | 缺省设置为 30 秒,或者由参数进行设置。 |
MqttClient.Disconnect | 等待 MQTT 客户机完成所有必需的工作,并等待 TCP/IP 会话断开连接。 | 缺省设置为 30 秒,或者由参数进行设置。 |
MqttClient.Subscribe | 等待订阅请求完成。 | 缺省设置为 30 秒,或者由参数进行设置。 |
MqttClient.UnSubscribe | 等待取消预订请求完成。 | 缺省设置为 30 秒,或者由参数进行设置。 |
MqttClient.Publish | 将请求传递到 MQTT 客户机后,立即返回到应用程序线程。 | 无。 |
MqttDeliveryToken.waitForCompletion | 等待返回传递令牌。 | 缺省设置无限期,或者由参数进行设置。 |