web应用集群Session保持(转)
关于使用 memcached 或redis 存储 session ,以及使用 terracotta 服务器共享。
建议使用 redis,不仅仅因为它可以将缓存的内容持久化,还因为它支持的单个对象比较大,而且数据类型丰富,
不只是缓存 session,还可以做其他用途,一举几得啊。
1、使用 filter 方法存储
这种方法比较推荐,因为它的服务器使用范围比较多,不仅限于tomcat ,而且实现的原理比较简单容易控制。
可以使用memcached-session-filter
官方网址:http://code.google.com/p/memcached-session-filter/
官方介绍:解决集群环境下javaweb容器session共享,使用filter拦截器和memcached实现。在tomcat 6和websphere8测试通过,现网并发2000,日PV量1100万。
暂不支持sessionevent包括create destory 和 attribute change
东西很不错,体积很小,不过这个东东要和spring 一起使用,而且要求存储到 memcached 的对象要实现 java 的序列化接口
大家也知道,java本身的序列化性能也很一般。
我将其简单扩展了一下,不再依赖spring ,并且利用 javolution 实现序列化,缓存的对象不再有限制。
暂时没有发现 redis的实现,后面将自己实现使用 redis 存储并且序列化使用 kyro ,详细情况有时间再单独写出来。
2、使用 tomcat sessionmanager 方法存储
这种方法服务器只能使用 tomcat,但网上有针对 memcached 和 redis 实现,直接配置就行了。
memcached 实现:
网址:http://code.google.com/p/memcached-session-manager/
修改 tomcat 的 conf 目录下的context.xml 文件:
<ManagerclassName="de.javakaffee.web.msm.MemcachedBackupSessionManager"
memcachedNodes="n1:localhost:11211n2:localhost:11212"
failoverNodes="n2"
requestUriIgnorePattern=".*\.(png|gif|jpg|css|js)$"
sessionBackupAsync="false"
sessionBackupTimeout="100"
transcoderFactoryClass="de.javakaffee.web.msm.serializer.javolution.JavolutionTranscoderFactory"
copyCollectionsForSerialization="false" />
以上是以 1.3 版为例子,需要用的jar 包:
memcached-session-manager-1.3.0.jar
msm-javolution-serializer-1.3.0.jar
javolution-5.4.3.1.jar
memcached-2.4.2.jar
redis 实现:
网址:https://github.com/jcoleman/tomcat-redis-session-manager
同样修改 tomcat 的 conf目录下的 context.xml 文件:
<ValveclassName="com.radiadesign.catalina.session.RedisSessionHandlerValve"/>
<ManagerclassName="com.radiadesign.catalina.session.RedisSessionManager"
host="localhost"
port="6379"
database="0"
maxInactiveInterval="60"/>
以上是以 1.2 版为例子,需要用的jar 包:
tomcat-redis-session-manager-1.2-tomcat-6.jar
jedis-2.1.0.jar
commons-pool-1.6.jar
3、使用 terracotta 服务器共享
这种方式配置有点复杂,大家到网上搜索一下吧。
以上配置成功后,前端使用 nginx进行负载均衡就行了,同时使用 Gzip 压缩 和 静态文件缓存。
以下是实例:
一、nginx+tomcat+memcached (依赖架包下载)
1.memcached配置:(v1.4.13)
节点1(192.168.159.131:11444)
节点2(192.168.159.131:11333)
2.tomcat配置
tomcat1(192.168.159.128:8081)
tomcat2(192.168.159.128:8082)
3.nginx安装在192.168.159.131。
首先,是配置tomcat,使其将session保存到memcached上。有两种方法:
方法一:在server.xml中配置。
找到host节点,加入
<ContextdocBase="/var/www/html" path="">
<ManagerclassName="de.javakaffee.web.msm.MemcachedBackupSessionManager"
memcachedNodes="n1:192.168.159.131:11444n2:192.168.159.131:11333"
requestUriIgnorePattern=".*\.(png|gif|jpg|css|js)$"
sessionBackupAsync="false"sessionBackupTimeout="3000"
transcoderFactoryClass="de.javakaffee.web.msm.serializer.javolution.JavolutionTranscoderFactory"
copyCollectionsForSerialization="false"/>
</Context>
方法二:在context.xml中配置。
找到Context节点,在context中加入
<ManagerclassName="de.javakaffee.web.msm.MemcachedBackupSessionManager"
memcachedNodes="n1:192.168.159.131:11444"
requestUriIgnorePattern=".*\.(png|gif|jpg|css|js)$"
sessionBackupAsync="false"sessionBackupTimeout="3000"
transcoderFactoryClass="de.javakaffee.web.msm.serializer.javolution.JavolutionTranscoderFactory"
copyCollectionsForSerialization="false" />
其次,配置nginx,用于测试session保持共享。
upstream xxy.com {
server 192.168.159.128:8081 ;
server 192.168.159.128:8082 ;
}
log_format www_xy_com '$remote_addr - $remote_user [$time_local] $request '
'"$status"$body_bytes_sent "$http_referer"'
'"$http_user_agent" "$http_x_forwarded_for"';
server
{
listen 80;
server_name xxy.com;
location/ {
proxy_pass http://xxy.com;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
access_log /data/base_files/logs/www.xy.log www_xy_com;
}
最后,将你的应用放到两个tomcat中,并依次启动memcached、tomcat、nginx。访问你的nginx,可以发现两个tomcat中的session可以保持共享了。
二、nginx+tomcat+redis (依赖架包下载)
1.redis配置(192.168.159.131:16300)(v2.8.3)
2.tomcat配置
tomcat1(192.168.159.130:8081)
tomcat2(192.168.159.130:8082)
3.nginx安装在192.168.159.131。
首先,是配置tomcat,使其将session保存到redis上。有两种方法,也是在server.xml或context.xml中配置,不同的是memcached只需要添加一个manager标签,而redis需要增加的内容如下:(注意:valve标签一定要在manager前面。)
配置和memcached 一样 找到Context节点,在context中加入
<ValveclassName="com.radiadesign.catalina.session.RedisSessionHandlerValve"/>
<ManagerclassName="com.radiadesign.catalina.session.RedisSessionManager"
host="192.168.159.131"
port="16300"
database="0"
maxInactiveInterval="60"/>
其次,配置nginx,用于测试session保持共享。
upstream redis.xxy.com {
server 192.168.159.130:8081;
server 192.168.159.130:8082;
}
log_format www_xy_com '$remote_addr - $remote_user [$time_local] $request '
'"$status"$body_bytes_sent "$http_referer"'
'"$http_user_agent" "$http_x_forwarded_for"';
server
{
listen 80;
server_name redis.xxy.com;
location/ {
proxy_pass http://redis.xxy.com;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
access_log /data/base_files/logs/redis.xxy.log www_xy_com;
}
最后,将你的应用放到两个tomcat中,并依次启动redis、tomcat、nginx。访问你的nginx,可以发现两个tomcat中的session可以保持共享了。
详情参看原文:http://blog.csdn.net/qh_java/article/details/45955923
http://www.cnblogs.com/zhangxsh/p/3494165.html
http://www.cnblogs.com/zhangxsh/p/3494235.html
BJJC网改版,
计划将应用部署在tomcat集群上,集群的部署方案为Apache+Tomcat6,连接件为mod_jk,其中开启了session复制和粘性session。计划节点数为3个。
到这,或许就可以中止了,tomcat集群谁不会建啊?实现了fail-over,当节点1处理会话时如果突然宕掉,那么其他节点会迅速接管而且不停顿的执行服务,对客户端完全透明,apache也很好的执行了lb,虽然还没有进行性能测试,但是起码横向扩展是没有问题的。但是,仔细想想,觉得还是有些问题。
为了实现fail-over,启用了session复制,这每个节点都会留一份session的副本,对于大规模的访问,tomcat能否撑住?假如集群面临1000个并发访问,虽然这1000个请求的压力会分散到3个节点上,但是实际上每个节点都有1000个session数,从资源的消耗上并没有节省多少,这样的话,再大的访问量并不一定撑得住。其实session复制的主要目的就是为了某节点宕掉后其他节点能迅速的接管请求,其实就是一个替补的作用。存在于其他节点中的session其实大部分都是空闲并且高度冗余。也就是说,session复制在节点数增多或者访问量激增时是很耗费资源占用中间件内存的。集群虽然提高了可用性,但是性能没有大的提升,尤其集群节点增多时。每当一个session创建后,节点都要向集群分发session,这样,节点都忙着传播session去了,集群的吞吐量会下降。
所以,一种做法就是将session外部存储或者cache,也就是说,将分散在各个节点中的会话信息拿出来,集中式存储,每个节点接收到客户端的会话请求时都去session池中查找session。这其实并不是什么很时髦的做法,大型的网站很多都采用这种办法,加上前端的页面缓存,提高网站的并发访问量和可用性,只是,在tomcat下如何做?
通过查看tomcat源码,发现可以重写tomcat的会话管理器,自定义类接管该服务,对应session的创建、管理任务进行接管,将session对象从中间件内存中剥离出来进行外部存储。同时对于静态页面或者个性化信息极少的页面,进行页面级cache。
因此,对于外部缓存,我选择的是MemCache,我首先在MemCache上进行了试验。MemCache将是Session和页面的缓存地,不过后来我放弃使用MemCache来缓存Session,原因后面会说明。
首先,我定义了以下类结构来完成这个工作,包括接管Tomcat的session管理以及缓存session对象等一系列操作:
类说明:
CachedSessionManager:该类继承自ManagerBase类,后者为Tomcat的会话管理器。
CachedSession:自定义的Session类,继承自StandardSession,为自定义的一个Tomcat的Session对象。
通过以上两个类,首先将Tomcat的会话管理器架空,其次,对Tomcat处理的Session对象进行了重写,这样,就完全将Session从Tomcat中剥离出来了,Session管理器和被管理的对象都是我自定义的了。
ISessionCaching:接口,抽象了session缓存的各种操作接口,该接口的实现类具体将决定如何对提供的Session进行缓存,我分别实现了四种缓存方案,Map、MemCache、Oracle、TimeSten。
SessionCacheDb:ISessionCaching接口的实现类,提供了数据库缓存session的解决方案,该类继承自DbCacheSession,后者具体决定如何缓存Session至db。
SessionCacheMap:ISessionCaching接口的实现类,提供了JVM内部Map缓存,该方法主要用来测试是否正确的接管了Tomcat的Session管理并能完全的拦截Session对象,无实际意义。
SessionCacheMemCache:ISessionCaching接口的实现类,提供了MemCache缓存Session的解决方案,其中该类依赖于MemCachedManager类,后者具体决定将如何缓存Session至MemCache.
TimeStenCacheSession:ISessionCaching接口的实现类,提供了TimeSten的存储方案,其实该类和SessionCacheDb没有什么区别,就是数据源来源不同。
核心的类:
CachedSessionManager:
复制代码
package com.thunisoft.session;
import java.io.IOException;
import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
import org.apache.catalina.Session;
import org.apache.catalina.session.ManagerBase;
import org.apache.catalina.session.StandardSession;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.thunisoft.cache.ISessionCaching;
/**
* SessionCacheManager,Session自定义管理器
* @author zhangxsh
*
*/
public class CachedSessionManager extends ManagerBase {
private static String cachepath;
public String getCachepath() {
return cachepath;
}
public void setCachepath(String cachepath) {
this.cachepath = cachepath;
}
protected Log log = LogFactory.getLog(CachedSessionManager.class);
/**
* 定义如何将Session缓存
*/
private ISessionCaching sessionCache;
@Override
public void add(Session session) {
if (log.isDebugEnabled()) {
log.debug("===================" + this.getSessionMaxAliveTime());
}
initCache();
if (session != null) {
sessionCache.addSession(session.getId(), (CachedSession) session,
new Date(getExpireDate()));
}
}
/**
* 初始化Cache缓存,通过manager节点配置提供类名加载类
*/
private synchronized void initCache() {
if (sessionCache == null) {
try {
sessionCache = (ISessionCaching) Class.forName(cachepath)
.newInstance();
sessionCache.setManager(this);
} catch (InstantiationException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IllegalAccessException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
/**
* 获取session超时时间
*
* @return 时间毫秒数
*/
private long getExpireDate() {
return getCurrentTime() + 15 * 60 * 1000;
}
private long getCurrentTime() {
return System.currentTimeMillis();
}
@Override
public Session createEmptySession() {
System.out.println("createEmptySession");
return new CachedSession(this, sessionCache);
}
@Override
public Session createSession() {
if (log.isDebugEnabled()) {
log.debug("createEmptySession:null");
}
return createSession(null);
}
@Override
public Session createSession(String sessionId) {
if (log.isDebugEnabled()) {
log.debug(sessionId + "--Session create");
}
Session session = createEmptySession();
session.setNew(true);
session.setValid(true);
session.setCreationTime(System.currentTimeMillis());
session.setMaxInactiveInterval(this.maxInactiveInterval);
if (sessionId == null) {
sessionId = generateSessionId();
}
session.setId(sessionId);
return (session);
}
@Override
public void expireSession(String sessionId) {
initCache();
sessionCache.removeSession(sessionId);
}
@Override
public Session findSession(String sessionId) throws IOException {
initCache();
if (sessionId == null) {
return null;
}
return sessionCache.findSession(sessionId);
}
@Override
public Session[] findSessions() {
// TODO Auto-generated method stub
return super.findSessions();
}
@Override
protected synchronized String generateSessionId() {
String sid = super.generateSessionId();
if (log.isDebugEnabled()) {
log.debug("generateSessionId--" + sid);
}
// TODO Auto-generated method stub
return sid;
}
@Override
protected StandardSession getNewSession() {
if (log.isDebugEnabled()) {
log.debug("getNewSession");
}
// TODO Auto-generated method stub
return new CachedSession(this, sessionCache);
}
@Override
public HashMap getSession(String sessionId) {
Session s = (Session) sessionCache.getSession(sessionId);
if (s == null) {
if (log.isInfoEnabled()) {
log.info("Session not found " + sessionId);
}
return null;
}
Enumeration ee = s.getSession().getAttributeNames();
if (ee == null || !ee.hasMoreElements()) {
return null;
}
HashMap map = new HashMap();
while (ee.hasMoreElements()) {
String attrName = (String) ee.nextElement();
map.put(attrName, getSessionAttribute(sessionId, attrName));
}
return map;
}
@Override
public String getSessionAttribute(String sessionId, String key) {
initCache();
Session s = (Session) sessionCache.getSession(sessionId);
if (s == null) {
if (log.isInfoEnabled())
log.info("Session not found " + sessionId);
return null;
}
Object o = s.getSession().getAttribute(key);
if (o == null)
return null;
return o.toString();
}
@Override
public int getSessionMaxAliveTime() {
// TODO Auto-generated method stub
return super.getSessionMaxAliveTime();
}
private int sessionAliveTime;
public void setSessionAliveTime(int sessionAliveTime) {
// TODO Auto-generated method stub
if (log.isInfoEnabled())
log.info("sessionMaxAliveTime" + sessionMaxAliveTime);
super.setSessionMaxAliveTime(sessionAliveTime);
}
@Override
public void remove(Session session) {
if (log.isInfoEnabled())
log.info("removeSession" + session.getId());
sessionCache.removeSession(session.getId());
}
@Override
public void setSessionIdLength(int idLength) {
// TODO Auto-generated method stub
super.setSessionIdLength(idLength);
}
public int getRejectedSessions() {
// TODO Auto-generated method stub
return 0;
}
@Override
public void load() throws ClassNotFoundException, IOException {
// TODO Auto-generated method stub
}
@Override
public void setRejectedSessions(int arg0) {
// TODO Auto-generated method stub
}
@Override
public void unload() throws IOException {
// TODO Auto-generated method stub
}
}
复制代码
CachedSession:
复制代码
package com.thunisoft.session;
import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.servlet.http.HttpSession;
import org.apache.catalina.Manager;
import org.apache.catalina.SessionListener;
import org.apache.catalina.session.StandardSession;
import com.thunisoft.cache.ISessionCaching;
public class CachedSession extends StandardSession implements Serializable{
private static final long serialVersionUID = 1L;
/**
* session缓存方法接口
*/
private ISessionCaching sessionCache;
Map sessionMap=new ConcurrentHashMap();
/**
* 重写默认构造方法,提供session管理器和缓存接口
* @param manager session管理器
* @param sessionCache session缓存
*/
public CachedSession(Manager manager,ISessionCaching sessionCache){
super(manager);
this.sessionCache=sessionCache;
}
@Override
public void expire() {
// TODO Auto-generated method stub
super.expire();
}
@Override
public void setAttribute(String arg0, Object arg1, boolean arg2) {
// TODO Auto-generated method stub
/**
* 每当修改了一次对象类型,便重新往manager中set一次
*/
super.setAttribute(arg0, arg1, arg2);
getManager().add(this);
}
@Override
public void expire(boolean arg0) {
// TODO Auto-generated method stub
super.expire(arg0);
}
@Override
public long getCreationTime() {
// TODO Auto-generated method stub
return super.getCreationTime();
}
@Override
public String getId() {
// TODO Auto-generated method stub
return super.getId();
}
@Override
public Manager getManager() {
// TODO Auto-generated method stub
return super.getManager();
}
@Override
public HttpSession getSession() {
// TODO Auto-generated method stub
return super.getSession();
}
@Override
public Object getValue(String name) {
// TODO Auto-generated method stub
return super.getValue(name);
}
@Override
public String[] getValueNames() {
// TODO Auto-generated method stub
return super.getValueNames();
}
@Override
protected String[] keys() {
// TODO Auto-generated method stub
return super.keys();
}
@Override
public void putValue(String name, Object value) {
// TODO Auto-generated method stub
super.putValue(name, value);
}
@Override
public void removeAttribute(String name, boolean notify) {
// TODO Auto-generated method stub
super.removeAttribute(name, notify);
}
@Override
public void removeAttribute(String name) {
// TODO Auto-generated method stub
super.removeAttribute(name);
}
@Override
protected void removeAttributeInternal(String arg0, boolean arg1) {
// TODO Auto-generated method stub
super.removeAttributeInternal(arg0, arg1);
}
@Override
public void removeNote(String name) {
// TODO Auto-generated method stub
super.removeNote(name);
}
@Override
public void removeSessionListener(SessionListener listener) {
// TODO Auto-generated method stub
super.removeSessionListener(listener);
}
@Override
public void removeValue(String name) {
// TODO Auto-generated method stub
super.removeValue(name);
}
@Override
public void setAttribute(String arg0, Object arg1) {
// TODO Auto-generated method stub
super.setAttribute(arg0, arg1);
}
@Override
public void setId(String id) {
// TODO Auto-generated method stub
super.setId(id);
}
@Override
public void setManager(Manager manager) {
// TODO Auto-generated method stub
super.setManager(manager);
}
@Override
public String toString() {
// TODO Auto-generated method stub
return "session";
}
}
复制代码
DbCacheSession:
复制代码
1 package com.thunisoft.cache.impl.dbimpl;
2
3 import java.io.ByteArrayInputStream;
4 import java.io.ByteArrayOutputStream;
5 import java.io.IOException;
6 import java.io.ObjectInputStream;
7 import java.io.ObjectOutputStream;
8 import java.sql.Connection;
9 import java.sql.DriverManager;
10 import java.sql.PreparedStatement;
11 import java.sql.ResultSet;
12 import java.sql.SQLException;
13
14 import org.apache.commons.logging.Log;
15 import org.apache.commons.logging.LogFactory;
16
17 import com.thunisoft.session.CachedSession;
18
19 /**
20 * 数据库缓存Session
21 *
22 * @author zhangxsh
23 *
24 */
25 public class DbCacheSession {
26
27 private static final Log log = LogFactory.getLog(DbCacheSession.class);
28
29 private byte[] writeObject(CachedSession session) {
30 ByteArrayOutputStream byteout = new ByteArrayOutputStream();
31 ObjectOutputStream objout = null;
32 try {
33 objout = new ObjectOutputStream(byteout);
34 session.writeObjectData(objout);
35 } catch (IOException e) {
36 log.error("get bytes from session failed!", e);
37 }
38
39 return byteout.toByteArray();
40 }
41
42 /**
43 * 根据sessionId和Session空对象构造完整的session对象
44 * @param sessionId sessionid
45 * @param session 空session对象
46 * @return 反序列化后的session对象
47 */
48 public CachedSession getSessionObject(String sessionId,
49 CachedSession session) {
50
51 return readObjectFromDb(sessionId, session);
52 }
53
54 /**
55 * 根据sessionId和Session空对象构造完整的session对象
56 * @param sessionId sessionid
57 * @param session 空session对象
58 * @return 反序列化后的session对象
59 */
60 private CachedSession readObjectFromDb(String sessionId,
61 CachedSession session) {
62 PreparedStatement stat = null;
63 byte[] sessionBytes = null;
64 try {
65 stat = getConnection().prepareStatement(
66 "select c_session from t_session where c_sid=?");
67 stat.setString(1, sessionId);
68 ResultSet rus = stat.executeQuery();
69 while (rus.next()) {
70 sessionBytes = rus.getBytes(1);
71 }
72 } catch (SQLException e) {
73 // TODO Auto-generated catch block
74 e.printStackTrace();
75 }
76
77 return readObject(sessionBytes, session);
78 }
79
80 /**
81 * 将Session对象序列化为二进制数据然后保存入库
82 * @param session session对象
83 */
84 public void setSession(CachedSession session) {
85 byte[] sessionBytes = writeObject(session);
86 writeObjectIntoDb(session.getId(), sessionBytes);
87
88 }
89
90 public static void main(String[] args) {
91 // Session s=new Session();
92 // s.setId("AERDS122223");
93 DbCacheSession sess = new DbCacheSession();
94 // sess.setSession(s);
95 String s = "AERDS122223";
96 // System.out.println(sess.getSessionObject(s).getId());
97
98 }
99
100 /**
101 * 将session保存入库,先删再插
102 * @param sessionId sid
103 * @param sessionBytes session对象二进制数据
104 */
105 private void writeObjectIntoDb(String sessionId, byte[] sessionBytes) {
106 PreparedStatement stat = null;
107 Connection con = getConnection();
108 try {
109 stat = con.prepareStatement("delete from t_session where c_sid=?");
110 stat.setString(1, sessionId);
111 stat.execute();
112 stat = con.prepareStatement("insert into t_session values(?,?)");
113 stat.setString(1, sessionId);
114 stat.setBytes(2, sessionBytes);
115 stat.execute();
116 } catch (SQLException e) {
117 e.printStackTrace();
118 } finally {
119 try {
120 con.close();
121 stat.close();
122 } catch (SQLException e) {
123 // TODO Auto-generated catch block
124 e.printStackTrace();
125 }
126
127 }
128
129 }
130
131 private CachedSession readObject(byte[] sessionBytes, CachedSession session) {
132 if (sessionBytes == null) {
133 return session;
134 }
135 ByteArrayInputStream ins = null;
136 ObjectInputStream objipt = null;
137
138 try {
139 ins = new ByteArrayInputStream(sessionBytes);
140 objipt = new ObjectInputStream(ins);
141 session.readObjectData(objipt);
142 ins.close();
143 objipt.close();
144 } catch (IOException e) {
145 log.error("get session from bytes failed!", e);
146 } catch (ClassNotFoundException e) {
147 log.error("sesializable session failed!", e);
148 }
149 System.out.println(session.getId() + "-session is found");
150 return session;
151
152 }
153
154 protected Connection getConnection() {
155 Connection con = null;
156 try {
157 Class.forName("oracle.jdbc.driver.OracleDriver");
158 con = DriverManager.getConnection(
159 "jdbc:oracle:thin:@127.0.0.1:1521:ORCL", "zhangxsh",
160 "zhangxsh");
161
162 } catch (ClassNotFoundException e1) {
163 // TODO Auto-generated catch block
164 e1.printStackTrace();
165 }
166 // Context ctx;
167 // DataSource ds = null;
168 // Connection con = null;
169 // try {
170 // ctx = new InitialContext();
171 // ds = (DataSource) ctx.lookup("jdbc/oracle");
172 // con = ds.getConnection();
173 //
174 // } catch (NamingException e) {
175 // log.error("can not find jndi:" + "jdbc/oracle", e);
176 // } catch (SQLException e) {
177 // // TODO Auto-generated catch block
178 // e.printStackTrace();
179 // }
180 // return con;
181 catch (SQLException e) {
182 // TODO Auto-generated catch block
183 e.printStackTrace();
184 }
185 return con;
186 }
187 }
复制代码
将该类定义为一个manager加入context.xml中,启动tomcat,不用对应用做任何修改,因为修改的是tomcat。
以Oracle数据库缓存(暂时将session保存到数据库中)为例,部署到Tomcat集群下面(2个节点)测试效果:
1.首先打开测试页面:
可见请求被lb至s1节点服务,sessionid为:
E4ACDD8588CBCC0BD41B1789E23F1E5F.s1,
新建会话打开相同链接:
发现被lb至s2节点,sessionid为:
8D1037E94D95E162179921AB7D8CEA80.s2
查询数据库缓存表:
发现这两个会话均被保存至表中。
下面提交一些信息至session看效果:
分别提交了三次,发现都可以正常的读取并显示出来,说明会话可以正确的被修改,并且不会丢失更改。
同时在会话2也做几次修改session的操作:
发现session之间互不影响,是正常的隔离的。
s1页面的session来自于节点s1,如果关闭s1,会怎么样呢?下面关闭s1节点并刷新s1页面,此时只有节点2存活:
发现一样可以正常读取session,该请求被lb至节点2,节点2正常接管服务,并正常的拿到该会话的session信息。
如果把两个节点都重启呢?发现结果都一样,session信息一样可以读取,如果把数据库中的session删除,刷新页面,session立刻就变了,这就验证了session信息已经完全脱离了中间件了。
Session对象的持久化比较麻烦,虽然有序列化,但是并不确定Session对象中保存的其他信息是否可以序列化,这可能是网上很多解决方案摒弃此种做法的原因,网上的很多做法都是将Session中的attribute信息持久化并结构化存储,这显然很方便,但是session中的其他信息就丢了,否则仍然占据中间件内存,通过查看源码,惊喜的发现Tomcat对象提供了Session序列化的接口以及相关实现(Store),不过不是很满足需求,对其进行了一些改造就ok了,最终,Session对象作为一个整体,以二进制的形式保存在blob中,当反序列化时,还要装配Session和SessionManager之间的依赖关系。主要用到了session的以下方法,主要思想就是将session化整为零,传入一个输入流,将属性都写入该流中:
复制代码
1 protected void writeObject(ObjectOutputStream stream) throws IOException {
2
3 // Write the scalar instance variables (except Manager)
4 stream.writeObject(new Long(creationTime));
5 stream.writeObject(new Long(lastAccessedTime));
6 stream.writeObject(new Integer(maxInactiveInterval));
7 stream.writeObject(new Boolean(isNew));
8 stream.writeObject(new Boolean(isValid));
9 stream.writeObject(new Long(thisAccessedTime));
10 stream.writeObject(id);
11 if (manager.getContainer().getLogger().isDebugEnabled())
12 manager.getContainer().getLogger().debug
13 ("writeObject() storing session " + id);
14
15 // Accumulate the names of serializable and non-serializable attributes
16 String keys[] = keys();
17 ArrayList saveNames = new ArrayList();
18 ArrayList saveValues = new ArrayList();
19 for (int i = 0; i < keys.length; i++) {
20 Object value = attributes.get(keys[i]);
21 if (value == null)
22 continue;
23 else if ( (value instanceof Serializable)
24 && (!exclude(keys[i]) )) {
25 saveNames.add(keys[i]);
26 saveValues.add(value);
27 } else {
28 removeAttributeInternal(keys[i], true);
29 }
30 }
31
32 // Serialize the attribute count and the Serializable attributes
33 int n = saveNames.size();
34 stream.writeObject(new Integer(n));
35 for (int i = 0; i < n; i++) {
36 stream.writeObject((String) saveNames.get(i));
37 try {
38 stream.writeObject(saveValues.get(i));
39 if (manager.getContainer().getLogger().isDebugEnabled())
40 manager.getContainer().getLogger().debug
41 (" storing attribute '" + saveNames.get(i) +
42 "' with value '" + saveValues.get(i) + "'");
43 } catch (NotSerializableException e) {
44 manager.getContainer().getLogger().warn
45 (sm.getString("standardSession.notSerializable",
46 saveNames.get(i), id), e);
47 stream.writeObject(NOT_SERIALIZED);
48 if (manager.getContainer().getLogger().isDebugEnabled())
49 manager.getContainer().getLogger().debug
50 (" storing attribute '" + saveNames.get(i) +
51 "' with value NOT_SERIALIZED");
52 }
53 }
54
55 }
复制代码
这样将这个流的数据转换为一个字节流保存为blob即可。还原时,同样使用以下方法还原,将该流传入,方法会返回一个离线的Session对象,什么叫离散的?就是没有和具体的SessionMananger关联的:
复制代码
1 protected void readObject(ObjectInputStream stream)
2 throws ClassNotFoundException, IOException {
3
4 // Deserialize the scalar instance variables (except Manager)
5 authType = null; // Transient only
6 creationTime = ((Long) stream.readObject()).longValue();
7 lastAccessedTime = ((Long) stream.readObject()).longValue();
8 maxInactiveInterval = ((Integer) stream.readObject()).intValue();
9 isNew = ((Boolean) stream.readObject()).booleanValue();
10 isValid = ((Boolean) stream.readObject()).booleanValue();
11 thisAccessedTime = ((Long) stream.readObject()).longValue();
12 principal = null; // Transient only
13 // setId((String) stream.readObject());
14 id = (String) stream.readObject();
15 if (manager.getContainer().getLogger().isDebugEnabled())
16 manager.getContainer().getLogger().debug
17 ("readObject() loading session " + id);
18
19 // Deserialize the attribute count and attribute values
20 if (attributes == null)
21 attributes = new Hashtable();
22 int n = ((Integer) stream.readObject()).intValue();
23 boolean isValidSave = isValid;
24 isValid = true;
25 for (int i = 0; i < n; i++) {
26 String name = (String) stream.readObject();
27 Object value = (Object) stream.readObject();
28 if ((value instanceof String) && (value.equals(NOT_SERIALIZED)))
29 continue;
30 if (manager.getContainer().getLogger().isDebugEnabled())
31 manager.getContainer().getLogger().debug(" loading attribute '" + name +
32 "' with value '" + value + "'");
33 attributes.put(name, value);
34 }
35 isValid = isValidSave;
36
37 if (listeners == null) {
38 listeners = new ArrayList();
39 }
40
41 if (notes == null) {
42 notes = new Hashtable();
43 }
44 }
复制代码
说说我为什么实现以上三种缓存方案。
最开始我通过Map缓存(SessionCacheMap)测试通过之后,立刻将其迁移到MemCache中,因为Map缓存还是属于JVM进程内缓存,Session仍然在中间件内部,只是保存在我自定义的一块内存区域中而已,只是验证是否可以完全的拦截,因此没有实际意义,需要将其从JVM内部拿出来。因此我迁移至MemCache中去了。
而为什么最后没有使用MemCache,MemCache没有保障,这个缓存性能很高,功能强大,但是它并不对缓存信息持久化,一旦它宕掉,session信息就都丢了,如果一些页面缓存丢了倒也没什么,重启再建立就好了,但是如果session信息丢了,那就比较严重了,而且这个缓存没有持久化方案,所以,最后我没有采用它缓存session,而是使用它来缓存页面。
所以我选择了dbms来存储session信息,dbms强大、可靠的数据管理保证session不会丢失,每个session都被持久化进数据库中,上个例子我们可以看到,我们把所有节点都关闭再启动,客户端刷新界面,仍然可以正常显示,session不会丢失。
可靠性有了保证,获取session是一个非常频繁的操作,如何保证性能呢?每次都去查询数据库,性能还是没有保证,这地方很容易成为瓶颈,尤其大规模访问时,虽然oracle可以将记录保存在buffercache中,但是毕竟它不是专业的缓存, bufferCache大小有限,而且每次插入删除session信息会涉及到磁盘的读写,这都是瓶颈,所以,我又引入了timesten内存数据库,它在Oracle的前端作为Oracle的一个超大的bufferCache,session持久化信息首先进入timesten,而timesten后台异步将session持久化至oracle中, TimeSten其实就是一个大的前端缓存而已,进一步讲,就是一个外置的bufferCache,后端的Oracle保证TimeSten万一宕掉,持久化的session数据不会丢失,宕掉就宕掉了,重启就ok了。所以,我在TimeSten11g版本下搭建了环境,建立了一个AWT类型的CacheGroup这个Cache就缓存了一张表名为T_SESSION:这张表的定义为:
1 create table T_SESSION
2 (
3 C_SID VARCHAR2(200) primary key ,
4 C_SESSION BLOB
5 )
基于这张表的CacheGroup我定义如下(oracle的blob在timesten中影射为varbinary):
create dynamic asynchronous writethrough cache group g_awt from uss.t_session ( c_sid varchar(200) not null , c_session varbinary(262144),primary key(c_sid));
该Group采用异步方式写入oracle,数据的插入、删除都在timesten中完成,而timesten会异步的将数据刷新至oracle,看下例子:
首先在tt中插入1条数据:
再看下oracle中:
Timesten中删掉一条记录:
再看oracle中:
发现数据已经被异步更新。因此,采用timesten缓存session,oracle持久化session的基础环境已经OK了。下面其实就是将原本往oracle中保存的session往timesten中保存即可,其实就是切换一下Connection来源即可,此处省略。
那么如何处理session失效的问题?tomcat的session有一个lifescycle的概念,但是现在我把session对象从tomcat中完全剥离出来了,这样便不能由tomcat来维护了,怎么办呢?其实很简单,方案有两种:
1.我在t_session表的后面增加了d_create一列,类型为timestamp,该列代表这个session对象的创建时间戳,当session创建时记录时间戳,当session被修改时,更新这个时间戳,我定义一个job,这个job每5秒钟扫描一遍t_session记录,将截至到现在d_create超过某值的记录删除:
Delete from t_session where sysdate-c_create>n;这个n就是session的失效时间。
2.如果使用内存数据库,那更简单了,在文档中发现timesten有aging的概念,什么叫aging?看下面的CacheGroup定义:
create dynamic asynchronous writethrough cache group g_awt from uss.t_session ( c_sid varchar(200) not null , c_session varbinary(262144),d_create timestamp,primary key(c_sid)) AGING USE d_create LIFETIME 15 minutes CYCLE 5 seconds ON;
aging子句代表每隔6秒钟检测一次记录,使d_create超过15分钟的记录失效,是不是很方便?交给timesten去维护就ok了。
再说说我是如何实现页面缓存的,页面缓存使用Filter来实现,包装了ResponseWrapper类,该类拦截response的信息,将中间件的响应数据拦截并保存至MemCache中,当下次同样的URL到来,只需要去MemCache中取就可以了。
PageCacheFilter用于拦截url,将指定规则的url拦截响应内容并缓存至MemCache,其中具体的是由MonitorResponseWrapper类来完成的,中间件的响应数据存储在它的ByteArray输出流中,它继承自HttpServletResponseWrapper类,典型的装饰模式的应用。
复制代码
package com.thunisoft.filter;
import java.io.IOException;
import java.util.Date;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
public class PageCacheFilter implements Filter {
public void destroy() {
// TODO Auto-generated method stub
}
public void doFilter(ServletRequest req, ServletResponse resp,
FilterChain chain) throws IOException, ServletException {
MonitorResponseWrapper response = new MonitorResponseWrapper(
(HttpServletResponse) resp);
String url = ((HttpServletRequest) req).getRequestURI();
// MemCachedManager.getInstance().delete(url);
Object o = MemCachedManager.getInstance().get(url);
Date d = new Date(System.currentTimeMillis() + 10000);
if (o == null) {
chain.doFilter(req, response);
byte b[] = response.getResponseData();
resp.getOutputStream().write(b);
MemCachedManager.getInstance().set(url, b, d);
} else {
byte[] b = (byte[]) o;
resp.getOutputStream().write(b);
}
}
public static void main(String[] args) {
Date d = new Date(500);
}
public void init(FilterConfig arg0) throws ServletException {
// TODO Auto-generated method stub
}
}
复制代码
复制代码
package com.thunisoft.filter;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpServletResponseWrapper;
public class MonitorResponseWrapper extends HttpServletResponseWrapper {
private ByteArrayOutputStream buffer = null;
private ServletOutputStream out = null;
private PrintWriter writer = null;
public MonitorResponseWrapper(HttpServletResponse resp) throws IOException {
super(resp);
buffer = new ByteArrayOutputStream();// 真正存储数据的流
out = new WapperedOutputStream(buffer);
writer = new PrintWriter(new OutputStreamWriter(buffer, this
.getCharacterEncoding()));
}
// 重载父类获取outputstream的方法
@Override
public ServletOutputStream getOutputStream() throws IOException {
return out;
}
// 重载父类获取writer的方法
@Override
public PrintWriter getWriter() throws UnsupportedEncodingException {
return writer;
}
// 重载父类获取flushBuffer的方法
@Override
public void flushBuffer() throws IOException {
if (out != null) {
out.flush();
}
if (writer != null) {
writer.flush();
}
}
static int i;
public static void main(String[] args) {
System.out.println(i);
}
@Override
public void reset() {
buffer.reset();
}
public byte[] getResponseData() throws IOException {
// 将out、writer中的数据强制输出到WapperedResponse的buffer里面,否则取不到数据
flushBuffer();
return buffer.toByteArray();
}
// 内部类,对ServletOutputStream进行包装
private class WapperedOutputStream extends ServletOutputStream {
private ByteArrayOutputStream bos = null;
public WapperedOutputStream(ByteArrayOutputStream stream)
throws IOException {
bos = stream;
}
@Override
public void write(int b) throws IOException {
bos.write(b);
}
}
}
复制代码
复制代码
package com.thunisoft.filter;
import java.util.Date;
import org.apache.catalina.Session;
import com.danga.MemCached.MemCachedClient;
import com.danga.MemCached.SockIOPool;
import com.thunisoft.session.CachedSession;
/**
*
* Memcached 缓存管理
*
* @author Administrator
*
*
*/
public class MemCachedManager {
// 创建全局的唯一实例
protected static MemCachedClient mcc = new MemCachedClient();
protected static MemCachedManager memCachedManager = new MemCachedManager();
// 设置与缓存服务器的连接池
static {
// 服务器列表和其权重
String[] servers = { "172.20.70.251:12000" };
Integer[] weights = { 3 };
// 获取socke连接池的实例对象
SockIOPool pool = SockIOPool.getInstance();
// 设置服务器信息
pool.setServers(servers);
pool.setWeights(weights);
// 设置初始连接数、最小和最大连接数以及最大处理时间
pool.setInitConn(5);
pool.setMinConn(5);
pool.setMaxConn(250);
pool.setMaxIdle(1000 * 60 * 60 * 6);
// 设置主线程的睡眠时间
pool.setMaintSleep(30);
// 设置TCP的参数,连接超时等
pool.setNagle(false);
pool.setSocketTO(3000);
pool.setSocketConnectTO(0);
// 初始化连接池
pool.initialize();
// 压缩设置,超过指定大小(单位为K)的数据都会被压缩
mcc.setCompressEnable(true);
mcc.setCompressThreshold(64 * 1024);
}
/**
*
* 保护型构造方法,不允许实例化!
*
*
*/
protected MemCachedManager() {
}
/**
*
* 获取唯一实例.
*
*
*
* @return
*/
public synchronized static MemCachedManager getInstance() {
return memCachedManager;
}
/**
*
* 添加一个指定的值到缓存中.
*
*
*
* @param key
*
* @param value
*
* @return
*/
public boolean add(String key, Object value) {
return mcc.add(key, value);
}
public boolean add(String key, Object value, Date expiry) {
return mcc.add(key, value, expiry);
}
public boolean set(String key, Object value, Date expiry) {
return mcc.set(key, value, expiry);
}
public boolean set(String key, Object value) {
return mcc.set(key, value);
}
/**
*
* 更新缓存对象
*
* @param key
*
* @param value
*
* @return
*/
public boolean replace(String key, Object value) {
return mcc.replace(key, value);
}
public boolean replace(String key, Object value, Date expiry) {
return mcc.replace(key, value, expiry);
}
/**
*
* 根据指定的关键字获取对象.
*
*
*
* @param key
*
* @return
*/
public Object get(String key) {
return mcc.get(key);
}
public boolean delete(String key){
return mcc.delete(key);
}
public static void main(String[] args) {
MemCachedManager cache = MemCachedManager.getInstance();
Session s=new CachedSession(null,null);
cache.set("111",s, new Date(System.currentTimeMillis()+10*1000));
// Object o=cache.get("E4A8127405876F0F94627BB0F440CCDC");
System.out.println("get value : " + cache.get("111"));
}
}
复制代码
复制代码
package com.thunisoft.filter;
import java.util.Date;
import org.apache.catalina.Session;
import com.danga.MemCached.MemCachedClient;
import com.danga.MemCached.SockIOPool;
import com.thunisoft.session.CachedSession;
/**
*
* Memcached 缓存管理
*
* @author Administrator
*
*
*/
public class MemCachedManager {
// 创建全局的唯一实例
protected static MemCachedClient mcc = new MemCachedClient();
protected static MemCachedManager memCachedManager = new MemCachedManager();
// 设置与缓存服务器的连接池
static {
// 服务器列表和其权重
String[] servers = { "172.20.70.251:12000" };
Integer[] weights = { 3 };
// 获取socke连接池的实例对象
SockIOPool pool = SockIOPool.getInstance();
// 设置服务器信息
pool.setServers(servers);
pool.setWeights(weights);
// 设置初始连接数、最小和最大连接数以及最大处理时间
pool.setInitConn(5);
pool.setMinConn(5);
pool.setMaxConn(250);
pool.setMaxIdle(1000 * 60 * 60 * 6);
// 设置主线程的睡眠时间
pool.setMaintSleep(30);
// 设置TCP的参数,连接超时等
pool.setNagle(false);
pool.setSocketTO(3000);
pool.setSocketConnectTO(0);
// 初始化连接池
pool.initialize();
// 压缩设置,超过指定大小(单位为K)的数据都会被压缩
mcc.setCompressEnable(true);
mcc.setCompressThreshold(64 * 1024);
}
/**
*
* 保护型构造方法,不允许实例化!
*
*
*/
protected MemCachedManager() {
}
/**
*
* 获取唯一实例.
*
*
*
* @return
*/
public synchronized static MemCachedManager getInstance() {
return memCachedManager;
}
/**
*
* 添加一个指定的值到缓存中.
*
*
*
* @param key
*
* @param value
*
* @return
*/
public boolean add(String key, Object value) {
return mcc.add(key, value);
}
public boolean add(String key, Object value, Date expiry) {
return mcc.add(key, value, expiry);
}
public boolean set(String key, Object value, Date expiry) {
return mcc.set(key, value, expiry);
}
public boolean set(String key, Object value) {
return mcc.set(key, value);
}
/**
*
* 更新缓存对象
*
* @param key
*
* @param value
*
* @return
*/
public boolean replace(String key, Object value) {
return mcc.replace(key, value);
}
public boolean replace(String key, Object value, Date expiry) {
return mcc.replace(key, value, expiry);
}
/**
*
* 根据指定的关键字获取对象.
*
*
*
* @param key
*
* @return
*/
public Object get(String key) {
return mcc.get(key);
}
public boolean delete(String key){
return mcc.delete(key);
}
public static void main(String[] args) {
MemCachedManager cache = MemCachedManager.getInstance();
Session s=new CachedSession(null,null);
cache.set("111",s, new Date(System.currentTimeMillis()+10*1000));
// Object o=cache.get("E4A8127405876F0F94627BB0F440CCDC");
System.out.println("get value : " + cache.get("111"));
}
}
复制代码
所以,最终,这个方案的整体架构如下:
这个架构的优点有以下:
使用廉价的Tomcat集群,摒弃了效率低下的session复制,可以实现大规模的横向扩展,增加网站的吞吐量和并发访问量,如果网站的并发访问超大,那么可以对前端的Apache进行分层的扩展,也很简单,未来Session如果更多的话,那Timsten的查询可能会成为瓶颈,那可以继续将TimeSten拆分,在tomcat的会话管理器中对Session查询进行路由,比如对sessionid进行hash散列,将session分散缓存至分布式的TimeSten中,客户端请求时,根据hash映射确定sessionid存在于哪个TimeSten节点中,达到分散压力的目的,这只是对于一般网站,如果对于mis系统,可能最终的压力都跑到数据库上而非中间件了,那又是另外一回事了。
已有 0 人发表留言,猛击->> 这里<<-参与讨论
ITeye推荐