websoket.io 高并发 实现
用websocket搭个聊天室实时互动小游戏,确实非常方便,但在线人数多了 就没那么简单了。一到300人在线就开始掉了。后来经过调整 终于好多了。以下是改进的方案记录:
调整websocket.io的传输方式
websoket.io Socket.IO 是一个功能非常强大的框架,能够帮助你构建基于 WebSocket 的跨浏览器的实时应用。支持主流浏览器,多种平台,多种传输模式。
关于传输模式主要有两种1.polly 轮询模式, 2.websocket 模式。
默认为先轮询握手连接后再升级为websocket。 轮询的效率肯定是低下的,修改配置参数直接用websokect 的方式传输。发现很效果差很多,满意!
var io = require('socket.io')({ "transports":['websocket', 'polling']})
判断传输方式的源码位置:
https://github.com/socketio/e...
使用命名空间的功能
可以区分不同的命名空间来特定针对性的发消息,对于一些不是全局需要接受的消息就加上命名空间,可以极大的节约资源的传输。
//创建 server命名空间
var ServerIo = io.of('/server').on('connection', function(socket){
socket.on('ready',function(roomId,data) {
pub.publish(roomId, JSON.stringify({
"event": 'ready',
"data": '',
"namespace" : '/user'
}))
})
socket.on('button-start',function(id){
pub.publish(id, JSON.stringify({
"event": 'button-start',
"data": '',
"namespace" : '/user'
}));
})
//针对namespace发送消息
io.of(namespace).emit('message', message)
nginx的负载均衡配置
通过nodeJS的process模块,可以实现多进程运行程序,最大限制的提高cpu的利用率。
多进程启动:
var fork = require('child_process').fork;
var cupNum = require('os').cpus().length,
workerArr = [],
connectNum = 0 ;
for (var i = 0; i < cupNum; i++) {
workerArr.push(fork('./shake_server.js', [8000 + i]));
process.on('uncaughtException', function(e) {
console.log('捕获到进程异常:',e);
});
}
然后通过nginx做负载
upstream io_nodes {
ip_hash;
server 127.0.0.1:8000;
server 127.0.0.1:8001;
server 127.0.0.1:8003;
server 127.0.0.1:8002;
}
server {
listen 8080;
server_name localhost;
#charset koi8-r;
#access_log logs/host.access.log main;
location /html {
alias /Users/snail/Documents/myworks/weihuodong/shake;
index index.html index.htm;
}
location / {
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header Host $host;
proxy_http_version 1.1;
proxy_pass http://io_nodes;
proxy_redirect off;
keepalive_timeout 300S
}
一些系统参数的调整:
1.修改系统参数:
http.Agent
官网说明:
agent.maxSockets
By default set to 5. Determines how many concurrent sockets the agent can have open per host.
(为了http请求能复用connection连接,Nodejs在http.Agent创建了一个默认大小为5的连接池)
修改后如下:require("http").globalAgent.maxSockets = Infinity; 如果使用WebSocke.io
2.ulimit 调整
心跳检测 掉线检测
如果出现自动掉线可调整nginx的各种timeout参数,有太多配置参数,具体根据需要去查文档配置
如: keepalive_timeout 的调整,默认75秒
还可以通过程序代码判断,掉线会触发disconcent 事件,监听它 自动再创建socket连接即可。
心跳检查为定时发送一次消息,保持连接状态。
内存数据共享
nodeJs的进程间通信是没问题的,但要共享数据还是用redis来的简单有效。
直接创建一个redisClient实例来用即可。
var client = redisClient(35050, '127.0.0.1')
var redisObj = {
get:function(id,cb){
client.hgetall(id, function (err, obj) {
if(err){
console.log(err)
}else{
cb(obj)
}
})
},
set:function(id,o){
if(typeof o === 'undefined' || typeof Object.keys(o)[0] == 'undefined'){
return
}
var key = Object.keys(o)[0]
var value =JSON.stringify(o[key])
client.hset(id,key,value)
},
del:function(id,uid){
if(id && uid){
client.hdel(id,uid)
}
},
flushdb:function(roomId){
client.del(roomId)
}
}
socket在各个进程间也是不共享的,那么我们可以利用redis的订阅发布系统实现。
详细看代码注释:
var redis = require("redis");
var sub = redis.createClient(35050, '127.0.0.1'), pub = redis.createClient(35050, '127.0.0.1');
var msg_count = 0;
//订阅事件的会时候触发 subscribe ,回调包含两个参数,分别为订阅的频道和总订阅数量
sub.on("subscribe", function (channel, count) {
console.log('监听到订阅事件',channel, count)
});
//在pub的时候会触发 message事件,我们的所有业务处理基本就是靠监听它了,通知socket发布消息
sub.on("message", function (channel, message) {
console.log('监听到发布事件')
console.log("sub channel " + channel + ": " + message);
// socket.to(channel).emit('nice game', "let's play a game");
socket.of(channel).emit('message', message)
msg_count += 1;
if (msg_count === 3) {
sub.unsubscribe()
sub.quit()
pub.quit();
}
});
//添加三个订阅
sub.subscribe("channel0");
sub.subscribe("channel1");
sub.subscribe("channel2");
//触发频道1的订阅者
setInterval(function(){
pub.publish("channel1", "I am message to chanle1")
},3e3)
//触发频道2的订阅者
setInterval(function(){
pub.publish("channel2", "I am message to chanle2")
},3e3)
经测试一台服务器1、2千用户没什么压力
测试的话 用webSocket bench 不好用.
(完)