Redis集群功能说明
- gOODiDEA - NoSQLFan虽然目前可以通过在客户端做hash的方法来构建Redis集群,但Redis原生的集群支持还是颇受期待. 本文是对Redis集群功能官方描述文档的一个翻译,译者是@PPS萝卜同学,也感谢他的投稿分享. 这篇文档主要是为了说明正在进展中的Redis集群功能. 文档主要分为两个部分,前一部分主要介绍我在非稳定分支已完成的代码,后一部分主要介绍还有哪些功能待实现.
之前做了一个Redis的集群方案,跑了小半年,线上运行的很稳定
差不多可以跟大家分享下经验,前面写了一篇文章 数据在线服务的一些探索经验,可以做为背景阅读
public class RedisKey implements Serializable{ private static final long serialVersionUID = 1L; //每个业务不同的family private String family; private String key; ...... //物理保存在Redis上的key为经过MurmurHash之后的值 private String makeRedisHashKey(){ return String.valueOf(MurmurHash.hash64(makeRedisKeyString())); } //ReidsKey由family.key组成 private String makeRedisKeyString(){ return family +":"+ key; } //返回用户的经过Hash之后RedisKey public String getRedisKey(){ return makeRedisHashKey(); } ..... }
Family的存在时为了避免多个业务key冲突,给每个业务定义自己独立的Faimily
出于性能考虑,参考Redis存储设计,实际保存在Redis上的key为经过hash之后的值
public interface RedisUseInterface{ /** * 通过RedisKey获取value * * @param redisKey * redis中的key * @return * 成功返回value,查询不到返回NULL */ public String get(final RedisKey redisKey) throws Exception; /** * 插入<k,v>数据到Redis * * @param redisKey * the redis key * @param value * the redis value * @return * 成功返回"OK",插入失败返回NULL */ public String set(final RedisKey redisKey, final String value) throws Exception; /** * 批量写入数据到Redis * * @param redisKeys * the redis key list * @param values * the redis value list * @return * 成功返回"OK",插入失败返回NULL */ public String mset(final ArrayList<RedisKey> redisKeys, final ArrayList<String> values) throws Exception; /** * 从Redis中删除一条数据 * * @param redisKey * the redis key * @return * an integer greater than 0 if one or more keys were removed 0 if none of the specified key existed */ public Long del(RedisKey redisKey) throws Exception; /** * 从Redis中批量删除数据 * * @param redisKey * the redis key * @return * 返回成功删除的数据条数 */ public Long del(ArrayList<RedisKey> redisKeys) throws Exception; /** * 插入<k,v>数据到Redis * * @param redisKey * the redis key * @param value * the redis value * @return * 成功返回"OK",插入失败返回NULL */ public String setByte(final RedisKey redisKey, final byte[] value) throws Exception; /** * 插入<k,v>数据到Redis * * @param redisKey * the redis key * @param value * the redis value * @return * 成功返回"OK",插入失败返回NULL */ public String setByte(final String redisKey, final byte[] value) throws Exception; /** * 通过RedisKey获取value * * @param redisKey * redis中的key * @return * 成功返回value,查询不到返回NULL */ public byte[] getByte(final RedisKey redisKey) throws Exception; /** * 在指定key上设置超时时间 * * @param redisKey * the redis key * @param seconds * the expire seconds * @return * 1:success, 0:failed */ public Long expire(RedisKey redisKey, int seconds) throws Exception; }
//获取写哪个Redis Node int slot = getSlot(keyHash); RedisDataNode redisNode = rdList.get(slot); //写Master JedisSentinelPool jp = redisNode.getSentinelPool(); Jedis je = null; boolean success = true; try { je = jp.getResource(); return je.set(key, value); } catch (Exception e) { log.error("Maybe master is down", e); e.printStackTrace(); success = false; if (je != null) jp.returnBrokenResource(je); throw e; } finally { if (success && je != null) { jp.returnResource(je); } }
//获取读哪个Redis Node int slot = getSlot(keyHash); RedisDataNode redisNode = rdList.get(slot); //根据权重选取一个工作Instatnce int rn = redisNode.getWorkInstance(); //轮询 int cursor = rn; do { try { JedisPool jp = redisNode.getInstance(cursor).getJp(); return getImpl(jp, key); } catch (Exception e) { log.error("Maybe a redis instance is down, slot : [" + slot + "]" + e); e.printStackTrace(); cursor = (cursor + 1) % redisNode.getInstanceCount(); if(cursor == rn){ throw e; } } } while (cursor != rn);
public int getWorkInstance() { //没有定义weight,则完全随机选取一个redis instance if(maxWeight == 0){ return (int) (Math.random() * RANDOM_SIZE % redisInstanceList.size()); } //获取随机数 int rand = (int) (Math.random() * RANDOM_SIZE % maxWeight); int sum = 0; //选取Redis Instance for (int i = 0; i < redisInstanceList.size(); i++) { sum += redisInstanceList.get(i).getWeight(); if (rand < sum) { return i; } } return 0; }