node.js下进行mqtt实践 | 看看俺 – KanKanAn.com

标签: | 发表时间:2018-09-25 14:33 | 作者:
出处:http://blog.kankanan.com

通过 mqtt 可以将设备连接在一起,能够实现将消息(可能来自服务器也可能来自其它设备)推送到设备,如果设备离线, 服务器可以暂存消息,在设备上线时再推送,有一些特性很关键:

  • offline

    允许设备暂时离线。

    即使是使用固定宽带,有些用户也会因为各种原因无法保持稳定的长连接,可能是上级路由设备有限制,或者是带宽被其它应用抢占而导致长连接不稳定。 将设备的在线状态与 TCP 长连接状态耦合在一起是不明智的。

  • bridge

    设备连接在不同的 broker 上,通过 bridge 实现互通。

    支持几万台设备在线,估计一台 broker 就够了,但是一旦达到数十万、百万甚至上亿,肯定需要搭建 broker 集群,参见  The C10K problem

简单起见, node.js 服务器端使用  mosca, 客户端使用  MQTT.js ,由于  mosca 不支持 bridge,本文不涉及 bridge 特性。

客户端与服务器通信

  • 客户端通过服务器给自已发个消息

    server.js

    var mosca = require('mosca');
    
    var settings = {
        port: 1883
    };
    
    var server = new mosca.Server(settings);
    server.on('ready', function () {
        console.log('mosca server running');
    }).on('clientConnected', function (client) {
        console.log('client(' + client.id + ') connected');
    }).on('published', function (packet, client) {
        console.log('client(' + (client ? client.id : 'internal') + ') published topic(' + packet.topic + '): ' + packet.payload);
    }).on('subscribed', function (topic, client) {
        console.log('client(' + client.id + ') subscribed topic(' + topic + ')');
    }).on('unsubscribed', function (topic, client) {
        console.log('client(' + client.id + ') unsubscribed topic(' + topic + ')');
    }).on('clientDisconnecting', function (client) {
        console.log('client(' + client.id + ') disconnecting');
    }).on('clientDisconnected', function (client) {
        console.log('client(' + client.id + ') disconnected');
    });
    

    client.js

    var mqtt = require('mqtt');
    
    
    var client = mqtt.connect('mqtt://127.0.0.1:1883');
    client.on('connect', function () {
        client.subscribe('presence');
        client.publish('presence', 'a message from myself');
    }).on('message', function (topic, message) {
        console.log(topic + ': ' + message.toString());
        client.end();
    });
    

    运行  server.js

    $ node server.js
    mosca server running
    client(mqttjs_a423c0af) connected
    client(internal) published topic($SYS/41TXEHPDe/new/clients): mqttjs_a423c0af
    client(mqttjs_a423c0af) subscribed topic(presence)
    client(internal) published topic($SYS/41TXEHPDe/new/subscribes): {"clientId":"mqttjs_a423c0af","topic":"presence"}
    client(mqttjs_a423c0af) published topic(presence): a message from myself
    client(mqttjs_a423c0af) unsubscribed topic(presence)
    client(mqttjs_a423c0af) disconnected
    client(internal) published topic($SYS/41TXEHPDe/new/unsubscribes): {"clientId":"mqttjs_a423c0af","topic":"presence"}
    client(internal) published topic($SYS/41TXEHPDe/disconnect/clients): mqttjs_a423c0af
    

    运行  client.js

    $ node client.js
    presence: a message from myself
    $ 
    

客户端与客户端通信

  • 客户端发送消息给另一个客户端

    下面的例子演示了客户端通过约定的  topic 互相通信。

    client_sub.js

    var mqtt = require('mqtt');
    
    var client = mqtt.connect('mqtt://127.0.0.1:1883');
    client.on('connect', function () {
        client.publish('sub', 'message from pub');
    }).on('message', function (topic, message) {
        console.log(topic + ': ' + message.toString());
        client.end();
    });
    

    client_pub.js

    var mqtt = require('mqtt');
    
    var client = mqtt.connect('mqtt://127.0.0.1:1883');
    client.on('connect', function () {
        client.publish('sub', 'message from pub');
    }).on('message', function (topic, message) {
        console.log(topic + ': ' + message.toString());
        client.end();
    });
    

    运行  server.js

    $ node server.js
    mosca server running
    client(mqttjs_ebdc9fd4) connected
    client(internal) published topic($SYS/4Jk9PBwDe/new/clients): mqttjs_ebdc9fd4
    client(mqttjs_ebdc9fd4) subscribed topic(sub)
    client(internal) published topic($SYS/4Jk9PBwDe/new/subscribes): {"clientId":"mqttjs_ebdc9fd4","topic":"sub"}
    client(mqttjs_ff000868) connected
    client(internal) published topic($SYS/4Jk9PBwDe/new/clients): mqttjs_ff000868
    client(mqttjs_ff000868) published topic(sub): message from pub
    client(mqttjs_ebdc9fd4) unsubscribed topic(sub)
    client(mqttjs_ebdc9fd4) disconnected
    client(internal) published topic($SYS/4Jk9PBwDe/new/unsubscribes): {"clientId":"mqttjs_ebdc9fd4","topic":"sub"}
    client(internal) published topic($SYS/4Jk9PBwDe/disconnect/clients): mqttjs_ebdc9fd4
    

    运行  client_sub.js

    $ node client_sub.js
    sub: message from pub
    $ 
    

    运行  client_pub.js

    $ node client_pub.js
    

客户端与客户端离线通信

离线通信需要同时满足以下条件

  • 服务器配置持久存储
  • 订阅方启用会话状态

    连接服务器时使用同样的 clientId 并指定  clean 为  false

  • 发布方发布持久消息

    发布消息时指定  qos 大于  0 以及  retain 为  true

下面的例子演示了客户端接收离线消息

client_sub.js

var mqtt = require('mqtt');

var client = mqtt.connect('mqtt://127.0.0.1:1883', {clientId: 'sub', clean: false});
client.on('connect', function () {
    client.subscribe('sub');
}).on('message', function (topic, message) {
    console.log(topic + ': ' + message.toString());
    client.end();
});

client_pub.js

var mqtt = require('mqtt');

var client = mqtt.connect('mqtt://127.0.0.1:1883', {clientId: 'pub'});
client.on('connect', function () {
    client.publish('sub', 'message from pub', {qos: 1, retain: true});
}).on('message', function (topic, message) {
    console.log(topic + ': ' + message.toString());
    client.end();
});

运行  srever.js

$ node mqtt_server.js
mosca server running
client(sub) connected
client(internal) published topic($SYS/V19OSVfix/new/clients): sub
client(sub) subscribed topic(sub)
client(internal) published topic($SYS/V19OSVfix/new/subscribes): {"clientId":"sub","topic":"sub"}
client(sub) disconnected
client(internal) published topic($SYS/V19OSVfix/disconnect/clients): sub
client(pub) connected
client(internal) published topic($SYS/V19OSVfix/new/clients): pub
client(pub) published topic(sub): message from pub
client(sub) connected
client(internal) published topic($SYS/V19OSVfix/new/clients): sub
client(sub) subscribed topic(sub)
client(internal) published topic($SYS/V19OSVfix/new/subscribes): {"clientId":"sub","topic":"sub"}
client(sub) disconnected
client(internal) published topic($SYS/V19OSVfix/disconnect/clients): sub

运行  client_sub.js 订阅消息后退出

$ node client_sub.js 
sub: message from pub
$ 

运行  client_pub.js 发布消息

$ node mqtt_client_pub.js 

运行  client_sub.js 接收离线消息后退出

$ node client_sub.js 
sub: message from pub
$ 

相关 [node js mqtt] 推荐:

node js 断点调试

- - Web前端 - ITeye博客
大部分基于 Node.js 的应用都是运行在浏览器中的,. 例如强大的调试工具 node-inspector. node-inspector 是一个完全基于 Node.js 的开源在线调试工具,提供了强大的调试功能和友好. 的用户界面,它的使用方法十分简便. 首先,使用 npm install -g node-inspector 命令安装 node-inspector,然后在终.

什么是Node?

- We_Get - 博客园新闻频道
译者按:前不久Oreilly出了一本小册子“What is Node?”,扼要的讲解了Node的身世和所适用的场景,作者文笔轻松流畅、内容充实,是非常难得的学习资料.   译文全文:http://jayli.github.com/whatisnode/index.html.   作者:Brett McLaughlin ,原文:What is Node?.

Node入门

- - CSDN博客编程语言推荐文章
作者:  Manuel Kiessling. 翻译:  goddyzhao &  GrayZhang &  MondayChen. 本书致力于教会你如何用Node.js来开发应用,过程中会传授你所有所需的“高级”JavaScript知识. 本书绝不是一本“Hello World”的教程. 你正在阅读的已经是本书的最终版.

GitHub - GruppoFilippetti/vertx-mqtt-broker: Vert.x based MQTT Broker

- -

浅析Hadoop Secondary NameNode,CheckPoint Node,Backup Node

- - CSDN博客云计算推荐文章
Hadoop SecondaryNameNode并不是Hadoop 第二个NameNode,它不提供NameNode服务,而仅仅是NameNode的一个工具. 这个工具帮助NameNode管理Metadata数据. NameNode的HDFS文件信息(即Metadata)记录在内存中,client的文件写操作直接修改内存中的Metadata,同时也会记录到硬盘的Edits文件,这是一个Log文件.

[译]什么是Node?

- blacktulip - Taobao UED Team
译者按:前不久Oreilly出了一本小册子“What is Node?”,扼要的讲解了Node的身世和所适用的场景,作者文笔轻松流畅、内容充实,是非常难得的学习资料. 译文全文:http://jayli.github.com/whatisnode/index.html. 作者:Brett McLaughlin ,原文:What is Node?.

MQTT协议 - 安全问题

- - ITeye博客
        物联网的核心是连接万物,通过交换并分析数据使得生活更舒适与便捷. 不过,敏感数据泄露或者设备被非法控制可不是闹着玩的. 比如前段时间国内某著名家电企业的智能洗衣机,使用了某著名电商基于XMPP协议的物联网平台,不费吹灰之力便被黑客攻破并远程遥控,给智能家居的发展带来了一些阴影. 究其本质,并不是物联网技术本身有缺陷,而是在物联网系统的设计中最基本的安全设计被工程师轻视了,才导致整个系统的崩塌.

用node作桌面开发

- InterMa - CNode社区
node的定位是,server-side javascript. 但程序员最爱做的事,就是把一个东西用在不该用的地方. 那么,可以把node用在桌面开发上吗. 把Javascript用在桌面开发上,早有先例,比如GTK+的gjs,还有Qt的QML(顺带一提,QML代表着桌面开发的另一个方向,a promising way),GNOME3中,也用javascript作为桌面插件的开发语言.

Vercel 部署 Node 服务

- - 掘金 前端
之前在写 面试常客:HTTP 缓存时,曾经就强缓存和协商缓存写过两个demo,但缓存要在服务端做,只能贴上代码,不能在网页上感受(虽然我贴了gif). 笔者的所有 demo 例子都放在 github page 上,其特点是不需要服务器即可部署静态资源,但它不具备部署服务端应用能力. 最近笔者在了解 CI/CD 方面的知识点,想起了 Vercel,就想着能否将服务端应用架在 vercel 上呢.