node.js下进行mqtt实践 | 看看俺 – KanKanAn.com
通过 mqtt 可以将设备连接在一起,能够实现将消息(可能来自服务器也可能来自其它设备)推送到设备,如果设备离线, 服务器可以暂存消息,在设备上线时再推送,有一些特性很关键:
-
offline
允许设备暂时离线。
即使是使用固定宽带,有些用户也会因为各种原因无法保持稳定的长连接,可能是上级路由设备有限制,或者是带宽被其它应用抢占而导致长连接不稳定。 将设备的在线状态与 TCP 长连接状态耦合在一起是不明智的。
-
bridge
设备连接在不同的 broker 上,通过 bridge 实现互通。
支持几万台设备在线,估计一台 broker 就够了,但是一旦达到数十万、百万甚至上亿,肯定需要搭建 broker 集群,参见 The C10K problem。
简单起见, node.js 服务器端使用 mosca, 客户端使用 MQTT.js ,由于 mosca 不支持 bridge,本文不涉及 bridge 特性。
客户端与服务器通信
-
客户端通过服务器给自已发个消息
server.js
var mosca = require('mosca'); var settings = { port: 1883 }; var server = new mosca.Server(settings); server.on('ready', function () { console.log('mosca server running'); }).on('clientConnected', function (client) { console.log('client(' + client.id + ') connected'); }).on('published', function (packet, client) { console.log('client(' + (client ? client.id : 'internal') + ') published topic(' + packet.topic + '): ' + packet.payload); }).on('subscribed', function (topic, client) { console.log('client(' + client.id + ') subscribed topic(' + topic + ')'); }).on('unsubscribed', function (topic, client) { console.log('client(' + client.id + ') unsubscribed topic(' + topic + ')'); }).on('clientDisconnecting', function (client) { console.log('client(' + client.id + ') disconnecting'); }).on('clientDisconnected', function (client) { console.log('client(' + client.id + ') disconnected'); });
client.js
var mqtt = require('mqtt'); var client = mqtt.connect('mqtt://127.0.0.1:1883'); client.on('connect', function () { client.subscribe('presence'); client.publish('presence', 'a message from myself'); }).on('message', function (topic, message) { console.log(topic + ': ' + message.toString()); client.end(); });
运行
server.js
$ node server.js mosca server running client(mqttjs_a423c0af) connected client(internal) published topic($SYS/41TXEHPDe/new/clients): mqttjs_a423c0af client(mqttjs_a423c0af) subscribed topic(presence) client(internal) published topic($SYS/41TXEHPDe/new/subscribes): {"clientId":"mqttjs_a423c0af","topic":"presence"} client(mqttjs_a423c0af) published topic(presence): a message from myself client(mqttjs_a423c0af) unsubscribed topic(presence) client(mqttjs_a423c0af) disconnected client(internal) published topic($SYS/41TXEHPDe/new/unsubscribes): {"clientId":"mqttjs_a423c0af","topic":"presence"} client(internal) published topic($SYS/41TXEHPDe/disconnect/clients): mqttjs_a423c0af
运行
client.js
$ node client.js presence: a message from myself $
客户端与客户端通信
-
客户端发送消息给另一个客户端
下面的例子演示了客户端通过约定的
topic
互相通信。client_sub.js
var mqtt = require('mqtt'); var client = mqtt.connect('mqtt://127.0.0.1:1883'); client.on('connect', function () { client.publish('sub', 'message from pub'); }).on('message', function (topic, message) { console.log(topic + ': ' + message.toString()); client.end(); });
client_pub.js
var mqtt = require('mqtt'); var client = mqtt.connect('mqtt://127.0.0.1:1883'); client.on('connect', function () { client.publish('sub', 'message from pub'); }).on('message', function (topic, message) { console.log(topic + ': ' + message.toString()); client.end(); });
运行
server.js
$ node server.js mosca server running client(mqttjs_ebdc9fd4) connected client(internal) published topic($SYS/4Jk9PBwDe/new/clients): mqttjs_ebdc9fd4 client(mqttjs_ebdc9fd4) subscribed topic(sub) client(internal) published topic($SYS/4Jk9PBwDe/new/subscribes): {"clientId":"mqttjs_ebdc9fd4","topic":"sub"} client(mqttjs_ff000868) connected client(internal) published topic($SYS/4Jk9PBwDe/new/clients): mqttjs_ff000868 client(mqttjs_ff000868) published topic(sub): message from pub client(mqttjs_ebdc9fd4) unsubscribed topic(sub) client(mqttjs_ebdc9fd4) disconnected client(internal) published topic($SYS/4Jk9PBwDe/new/unsubscribes): {"clientId":"mqttjs_ebdc9fd4","topic":"sub"} client(internal) published topic($SYS/4Jk9PBwDe/disconnect/clients): mqttjs_ebdc9fd4
运行
client_sub.js
$ node client_sub.js sub: message from pub $
运行
client_pub.js
$ node client_pub.js
客户端与客户端离线通信
离线通信需要同时满足以下条件
- 服务器配置持久存储
-
订阅方启用会话状态
连接服务器时使用同样的 clientId 并指定
clean
为false
-
发布方发布持久消息
发布消息时指定
qos
大于0
以及retain
为true
下面的例子演示了客户端接收离线消息
client_sub.js
var mqtt = require('mqtt'); var client = mqtt.connect('mqtt://127.0.0.1:1883', {clientId: 'sub', clean: false}); client.on('connect', function () { client.subscribe('sub'); }).on('message', function (topic, message) { console.log(topic + ': ' + message.toString()); client.end(); });
client_pub.js
var mqtt = require('mqtt'); var client = mqtt.connect('mqtt://127.0.0.1:1883', {clientId: 'pub'}); client.on('connect', function () { client.publish('sub', 'message from pub', {qos: 1, retain: true}); }).on('message', function (topic, message) { console.log(topic + ': ' + message.toString()); client.end(); });
运行 srever.js
$ node mqtt_server.js mosca server running client(sub) connected client(internal) published topic($SYS/V19OSVfix/new/clients): sub client(sub) subscribed topic(sub) client(internal) published topic($SYS/V19OSVfix/new/subscribes): {"clientId":"sub","topic":"sub"} client(sub) disconnected client(internal) published topic($SYS/V19OSVfix/disconnect/clients): sub client(pub) connected client(internal) published topic($SYS/V19OSVfix/new/clients): pub client(pub) published topic(sub): message from pub client(sub) connected client(internal) published topic($SYS/V19OSVfix/new/clients): sub client(sub) subscribed topic(sub) client(internal) published topic($SYS/V19OSVfix/new/subscribes): {"clientId":"sub","topic":"sub"} client(sub) disconnected client(internal) published topic($SYS/V19OSVfix/disconnect/clients): sub
运行 client_sub.js
订阅消息后退出
$ node client_sub.js sub: message from pub $
运行 client_pub.js
发布消息
$ node mqtt_client_pub.js
运行 client_sub.js
接收离线消息后退出
$ node client_sub.js sub: message from pub $