使用MQTT协议+Redis缓存实现APP登录顶号功能 | jwcqc个人笔记

标签: | 发表时间:2018-09-08 18:21 | 作者:
出处:http://jwcqc.me

大家在玩游戏或使用QQ等IM工具时,想必都见到过弹出被顶号或者是您的账号于xx时间在另一设备登录,您已被迫下线这样的提示,然后不得不点退出按钮退出整个应用,或者点击重新登录把另一设备再顶下来。最近我参与的一个项目,正好就有这样的需求,而且,由于我们项目中已经使用到了MQTT协议进行消息推送,实现远程控制,后台用Java实现,缓存使用了Redis,因此,正好可以利用现有的技术来实现这个功能。

实现的思路大概如下:首先,登录时不仅需要账号密码,还可以将设备关键信息记录下来,如设备型号(Android|iPhone)、登录时间、登录IP、设备唯一标识(UUID)等,这就需要前台登录功能与后台接口一起配合实现,并在后台把userId已经相关设备信息保存到Redis中,当在另外一台新设备上登录同一帐号时,将userId对应的相关登录设备信息直接进行覆盖,此时如果旧设备进行重连时,因为该uuid已经不是当前服务端的uuid了,所以直接返回下线通知,为了进行友好提示,也可以将新登录设备的主要信息(设备型号、登录时间)进行返回。

下面简单介绍一下实现的方法。

软件安装

Linux下mqtt服务器Apollo的安装

下载

选择一个目录用来下载保存
下载地址: http://activemq.apache.org/apollo/download.html
官网教程: http://activemq.apache.org/apollo/documentation/getting-started.html
目前版本是 apache-apollo-1.7.1-unix-distro.tar .gz

创建broker

一个broker实例是一个文件夹,其中包含所有的配置文件及运行时的数据,不如日志和消息数
据。Apollo强烈建议不要把实例同安装文件放在一起。在linux操作系统下面,建议将实例建在
/var/lib/目录下面

首先解压:tar -zxvf apache-apollo-1.7.1-unix-distro.tar.gz
选择一个目录存放解压后的文件,我放在了/server/下,解压后的文件夹为 apache-apollo-1.7.1

开始创建broker实例:

             
1
2
             
cd /var/lib
sudo /server/apache-apollo-1.7.1/bin/apollo create mybroker

下图是Apache官方给的一些建议截图:

启动broker实例

启动broker实例可以有两种方法,如下图中所示:

可以执行

             
1
             
/var/lib/mybroker/bin/apollo-broker run

或者

             
1
2
             
sudo ln -s "/var/lib/mybroker/bin/apollo-broker-service" /etc/init.d/
/etc/init.d/apollo-broker-service start

使其作为一个service进行启动,以后系统重启后只需运行/etc/init.d/apollo-broker-service start

访问Apollo的监控页面: http://localhost:61680/默认用户名、密码为为 admin/password

Linux下Redis的安装与配置

Redis的安装非常简单,已经有现成的Makefile文件,解压后在src目录下使用make命令完成编译即可,redis-benchmark、redis-cli、redis-server、redis-stat 这四个文件,加上一个 redis.conf 就构成了整个redis的最终可用包。它们的作用如下:

redis-server:Redis服务器的daemon启动程序
redis-cli:Redis命令行操作工具。当然,你也可以用telnet根据其纯文本协议来操作
redis-benchmark:Redis性能测试工具,测试Redis在你的系统及你的配置下的读写性能
redis-stat:Redis状态检测工具,可以检测Redis当前状态参数及延迟状况

下载安装:

            
1
2
3
4
5
            
wget http://download.redis.io/redis-stable.tar.gz
tar xzf redis-stable.tar.gz
cd redis-stable
make
make install

启动

编译后生成的可执行文件:
redis-server 是Redis的服务器,启动Redis即运行redis-server
redis-cli 是Redis自带的Redis命令行客户端,学习Redis的重要工具

./redis-server & 不指定配置直接运行,这时采用默认配置,无密码
./redis-server –port 6379 仅指定端口
./redis-server ../redis.conf 指定配置文件

最好还是使用最后一种方式进行启动

如果只是在本机连接,那麽使用默认配置文件不会有什么问题,但是,如果是连接远程服务器端的Redis,则需要对配置文件进行一些修改:

             
1
2
3
             
requirepass foobared
#bind 127.0.0.1 ##注释掉
protected-mode no ##从yes改成no

至于如何将Redis设置后台服务,开机自启等,这里就不介绍了,可以去搜索一下。

功能实现

后台接口

Redis客户端使用的是Jedis,如下代码是一个对Jedis简单的封装

            
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
            
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.exceptions.JedisException;
import java.util.ResourceBundle;
/**
* Jedis Cache 工具类
*/
public class JedisUtils {
private static Logger logger = LoggerFactory.getLogger(JedisUtils.class);
private static JedisPool jedisPool;
/**
* 读取相关的配置
*/
static {
ResourceBundle resourceBundle = ResourceBundle.getBundle("redis");
int maxActive = Integer.parseInt(resourceBundle.getString("redis.pool.maxActive"));
int maxIdle = Integer.parseInt(resourceBundle.getString("redis.pool.maxIdle"));
int maxWait = Integer.parseInt(resourceBundle.getString("redis.pool.maxWait"));
int port = Integer.parseInt(resourceBundle.getString("redis.port"));
int timeout = Integer.parseInt(resourceBundle.getString("redis.timeout"));
String ip = resourceBundle.getString("redis.ip");
String auth = resourceBundle.getString("redis.auth");
JedisPoolConfig config = new JedisPoolConfig();
//设置最大连接数
config.setMaxTotal(maxActive);
//设置最大空闲数
config.setMaxIdle(maxIdle);
//设置超时时间
config.setMaxWaitMillis(maxWait);
//初始化连接池
jedisPool = new JedisPool(config, ip, port, timeout, auth);
}
/**
* 获取缓存
* @param key 键
* @return 值
*/
public static String get(String key) {
String value = null;
Jedis jedis = null;
try {
jedis = getResource();
if (jedis.exists(key)) {
value = jedis.get(key);
value = StringUtils.isNotBlank(value) && !"nil".equalsIgnoreCase(value) ? value : null;
logger.debug("get {} = {}", key, value);
}
} catch (Exception e) {
logger.warn("get {} = {}", key, value, e);
} finally {
returnResource(jedis);
}
return value;
}
/**
* 设置缓存
* @param key 键
* @param value 值
* @param cacheSeconds 超时时间,0为不超时
* @return
*/
public static String set(String key, String value, int cacheSeconds) {
String result = null;
Jedis jedis = null;
try {
jedis = getResource();
result = jedis.set(key, value);
if (cacheSeconds != 0) {
jedis.expire(key, cacheSeconds);
}
logger.debug("set {} = {}", key, value);
} catch (Exception e) {
logger.warn("set {} = {}", key, value, e);
} finally {
returnResource(jedis);
}
return result;
}
/**
* 删除缓存
* @param key 键
* @return
*/
public static long del(String key) {
long result = 0;
Jedis jedis = null;
try {
jedis = getResource();
if (jedis.exists(key)){
result = jedis.del(key);
logger.debug("del {}", key);
}else{
logger.debug("del {} not exists", key);
}
} catch (Exception e) {
logger.warn("del {}", key, e);
} finally {
returnResource(jedis);
}
return result;
}
/**
* 缓存是否存在
* @param key 键
* @return
*/
public static boolean exists(String key) {
boolean result = false;
Jedis jedis = null;
try {
jedis = getResource();
result = jedis.exists(key);
logger.debug("exists {}", key);
} catch (Exception e) {
logger.warn("exists {}", key, e);
} finally {
returnResource(jedis);
}
return result;
}
/**
* 获取资源
* @return
* @throws JedisException
*/
public static Jedis getResource() throws JedisException {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
} catch (JedisException e) {
logger.warn("getResource.", e);
returnBrokenResource(jedis);
throw e;
}
return jedis;
}
/**
* 归还资源
* @param jedis
*/
public static void returnBrokenResource(Jedis jedis) {
if (jedis != null) {
jedisPool.returnBrokenResource(jedis);
}
}
/**
* 释放资源
* @param jedis
*/
public static void returnResource(Jedis jedis) {
if (jedis != null) {
jedisPool.returnResource(jedis);
}
}
}

然后在登录接口中,当判断完登录的用户名密码正确后,可以参考如下代码的思路去实现,首先判断Redis中是否已保存有这个userId对用的值,有的话说明当前已经有登录,需要被替换到,同时使用MQTT发送消息给客户端使其退出,Redis中不存在则只需保存userId和uuidStr即可

            
1
2
3
4
5
6
7
8
9
10
11
12
            
String uuidStr = ""; //这个值从APP端传过来
// 先判断Redis中是否已经有,有的话需要替换掉
if(JedisUtils.get(userId) != null && !JedisUtils .get(userId).equals(uuidStr)) {
MqttClient client = MyMqttClient.getInstance();
String topic = "TOPIC/LOGIN_LOGOUT";
client.subscribe(topic, 1);
MyMqttClient.sendMessage("Log out", topic);
client.unsubscribe(topic);
}
JedisUtils.set(userId, uuidStr, 0);

至于MQTT协议的实现,这里使用的是Paho,如果后台项目是使用Maven构建的话,在pom.xml中加入如下几行即可:

            
1
2
3
4
5
            
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.1.0</version>
</dependency>

然后对其进行了一个简单的封装

            
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
            
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MyMqttClient {
private MyMqttClient() {}
private static MqttClient mqttClientInstance = null;
private static MqttConnectOptions options;
//静态工厂方法
public static synchronized MqttClient getInstance() {
try {
if (mqttClientInstance == null) {
mqttClientInstance = new MqttClient("tcp://125.216.242.151:61613",
MqttClient.generateClientId(), new MemoryPersistence());
options = new MqttConnectOptions();
//设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(true);
//设置连接的用户名
options.setUserName("admin");
//设置连接的密码
options.setPassword("password".toCharArray());
// 设置超时时间 单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
mqttClientInstance.connect(options);
}
return mqttClientInstance;
}catch (Exception e){
e.printStackTrace();
return null;
}
}
public static void sendMessage(String content, String myTopic) {
MqttTopic topic = getInstance().getTopic(myTopic);
MqttMessage message = new MqttMessage();
message.setQos(1);
message.setRetained(false);
message.setPayload(content.getBytes());
try {
MqttDeliveryToken token = topic.publish(message);
} catch (MqttException e) {
e.printStackTrace();
}
}
public static MqttConnectOptions getOptions(){
return options;
}
}

app端

客户端的做法思路也很简单,由于使用了MQTT,因此客户端和服务器端其实已经保持了一个长连接,可以为客户端写一个MQTTService,随时监听服务器推送过来的消息进行处理

            
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
            
//为MTQQ client设置回调
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
//连接丢失后,一般在这里面进行重连
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
//publish后会执行到这里
}
@Override
public void messageArrived(String topicName, MqttMessage message) throws Exception {
if(message.toString().equals("Log out")) {
handler.post(new Runnable() {
@Override
public void run() {
AlertDialog.Builder builder = new AlertDialog.Builder(getApplicationContext());
builder.setMessage("被顶号了");
builder.setNegativeButton("退出", new DialogInterface.OnClickListener() {
@Override
public void onClick(DialogInterface dialog, int which) {
// TODO 退出当前账号,在这里简单粗暴的结束了应用
stopSelf();
android.os.Process.killProcess(android.os.Process.myPid());
}
});
Dialog dialog = builder.create();
dialog.setCanceledOnTouchOutside(false);
dialog.getWindow().setType(WindowManager.LayoutParams.TYPE_SYSTEM_ALERT);
dialog.show();
}
});
}
}
});

总结

上述代码可能在严谨性和可靠性上还会存在一些问题,还需要经过不断的完善,但思路是很明确的。在这里尤其要安利一下MTQQ,现在越来越多的产品都是基于这个协议进行开发,进行消息推送等。它开销很小,支持各种流行编程语言,能够适应不稳定的网络传输需求,在未来几年,相信MQTT的应用会越来越广。

相关 [mqtt 协议 redis] 推荐:

使用MQTT协议+Redis缓存实现APP登录顶号功能 | jwcqc个人笔记

- -
大家在玩游戏或使用QQ等IM工具时,想必都见到过弹出被顶号或者是您的账号于xx时间在另一设备登录,您已被迫下线这样的提示,然后不得不点退出按钮退出整个应用,或者点击重新登录把另一设备再顶下来. 最近我参与的一个项目,正好就有这样的需求,而且,由于我们项目中已经使用到了MQTT协议进行消息推送,实现远程控制,后台用Java实现,缓存使用了Redis,因此,正好可以利用现有的技术来实现这个功能.

MQTT协议 - 安全问题

- - ITeye博客
        物联网的核心是连接万物,通过交换并分析数据使得生活更舒适与便捷. 不过,敏感数据泄露或者设备被非法控制可不是闹着玩的. 比如前段时间国内某著名家电企业的智能洗衣机,使用了某著名电商基于XMPP协议的物联网平台,不费吹灰之力便被黑客攻破并远程遥控,给智能家居的发展带来了一些阴影. 究其本质,并不是物联网技术本身有缺陷,而是在物联网系统的设计中最基本的安全设计被工程师轻视了,才导致整个系统的崩塌.

XMPP协议、MQTT协议、HTTP协议、CoAP协议的基本比较

- - ITeye博客
一、先看下相关国外的专业数据对四大协议的比较:.           XML的解析对于嵌入多设备来说是比较痛苦的 ,所以在嵌入设备上做开发的时候,最好不要选择基于XML的协议.          二、四大协议的基本介绍:.    XMPP是一种基于标准通用标记语言的子集XML的协议,它继承了在XML环境中灵活的发展性.

MQTT协议笔记之mqtt.io项目TCP协议支持

- - BlogJava-首页技术区
MQTT定义了物联网传输协议,其标准倾向于原始TCP实现. 构建于TCP的上层协议堆栈,诸如HTTP等,在空间上多了一些处理路径,稍微耗费了CPU和内存,虽看似微乎其微,但对很多处理能力不足的嵌入式设备而言,选择原始的TCP却是最好的选择. 但单纯TCP不是所有物件联网的最佳选择,提供构建与TCP基础之上的传统的HTTP通信支持,尤其是浏览器、性能富裕的桌面涉及领域,还是企业最 可信赖、最可控的传输方式之一.

网络抓包学MQTT物联网协议

- -
MQTT (Message Queue Telemetry Transport),翻译成中文就是,遥测传输协议,其主要提供了订阅/发布两种消息模式,更为简约、轻量,易于使用,特别适合于受限环境(带宽低、网络延迟高、网络通信不稳定)的消息分发,属于物联网(Internet of Thing)的一个标准传输协议.

互联网推送服务原理:长连接+心跳机制(MQTT协议)

- - 移动开发 - ITeye博客
互联网推送消息的方式很常见,特别是移动互联网上,手机每天都能收到好多推送消息,经过研究发现,这些推送服务的原理都是维护一个长连接(要不不可能达到实时效果),但普通的socket连接对服务器的消耗太大了,所以才会出现像MQTT这种轻量级低消耗的协议来维护长连接,那么要如何维护长连接呢:.        在写之前,我们首先了解一下为什么android维护长连接需要心跳机制,首先我们知道,维护任何一个长连接都需要心跳机制,客户端发送一个心跳给服务器,服务器给客户端一个心跳应答,这样就形成客户端服务器的一次完整的握手,这个握手是让双方都知道他们之间的连接是没有断开,客户端是在线的.

基于MQTT协议与ESP8266平台的家庭环境监控实现 | 千里

- -
在智能家居的应用场景中,传感器一直是非常重要的组成部分. 比如传感器报告的温度高了可以自动关窗开空调. 在一般的使用场景中,众多的传感器和开关会分散在房子中的不同位置,通过布线连接是不现实的,理想的情况当然是通过已有的无线网络进行连接,网络传输就需要特定的协议,MQTT协议就是一个不错的选择. 手头刚好有一些传感器和一个ESP8266芯片的单片机(WeMos D1),做了一个家庭温湿度、光照及空气质量监控器,并通过MQTT协议推送数据到Home Assistant平台.

activeMQ 推送之mqtt客户端

- - ITeye博客
使用activeMQ进行android推送. activeMQ下载地址:http://activemq.apache.org/download.html. 下载后是一个压缩包:apache-activemq-5.9.0-bin.zip. 解压缩,进入apache-activemq-5.9.0-bin\apache-activemq-5.9.0\bin,双击activemq.bat,即可启动activeMQ服务.

Android推送方案分析(MQTT/XMPP/GCM)

- - 移动开发 - ITeye博客
本文主旨在于,对目前Android平台上最主流的几种消息推送方案进行分析和对比,比较客观地反映出这些推送方案的优缺点,帮助大家选择最合适的实施方案. 方案1、使用GCM服务(Google Cloud Messaging). 简介:Google推出的云消息服务,即第二代的C2DM. 优点:Google提供的服务、原生、简单,无需实现和部署服务端.

Redis 负载监控——redis-monitor

- - ITeye资讯频道
redis-monitor是一个Web可视化的 redis 监控程序. 使用 Flask 来开发的,代码结构非常简单,适合移植到公司内网使用. redis 服务器信息,包括 redis 版本、上线时间、 os 系统信息等等. 实时的消息处理信息,例如处理 command 数量、连接总数量等. 内存占用、 cpu 消耗实时动态图表.