分布式选主 -- 利用Mysql ACID和Lease协议实现选主和高可用

标签: 分布 利用 mysql | 发表时间:2013-05-23 18:13 | 作者:GugeMichael
出处:http://blog.csdn.net

      在实际生产开发中,遇到一些多节点共存,需要选主,并且要实现HA自动容错的场景,思考了写方法拿出来和大家分享一下。

  1. Lease协议,Mysql ACID
  2. 高可用选主方案设计
  3. 适用场景
  4. Java语言实现描述
  5. 进一步优化

      系统中有很多应用场景要类似主从架构,主服务器(Master)对外提供服务,从服务器(Salve)热备份,不提供服务但随时活着,如果Master出现宕机或者网络问题,Slave即可接替Master对外服务,并由Slave提升为Master(新主)。典型的多节点共存, 但只能同时存在一个主,并且所有节点的状态能统一维护

      大家一定首先想到了著名的Paxos算法( http://baike.baidu.com/view/8438269.htm)。简单的说,Paxos通过每个节点的投票算法,来决议一个事情,当多余1/2个节点都投票通过时,Paxos产生一个唯一结果的决议,并通知各个节点维护这个信息。例如Paxos的选主,首先产生一个关于某个节点希望当Master的投票,然后各个节点给出反馈,最终Paxos集群维护唯一的Master的结论。Zookeeper就是Paxos的一种实现。这种场景最适合用zookeeper来选主, 但zookeeper有个明显的缺点,当存活的节点小于zookeeper集群的1/2时,就不能工作了。比如zk有10各节点,那么必须满足可用的节点大于5才可。

      在实际环境中,如果对Master要求不是那么严格的话,可以通过某些改进和取舍来达到目的。比如可能在秒级别允许Master暂时不能访问、选主时间内可能存在一定的冲突但通过再次选主即可。本人设计了一个简易的利用Mysql一致性和简易版Lease来workaround。

Mysql ACID保证了一条数据记录的一致性、完整性,不会出现多进程读写的一致性问题和唯一正确性。Lease协议(协议细节可以Google之)通过向Master发送一个lease(租期)包,Master在这个lease期之内充当主角色,如果lease期到了则再次去申请lease,如果lease期到了,但是网络除了问题,这时Master可以i主动下线,让其他节点去竞选Master。举个例子,三个节点A、B、C经过第一轮选主之后,A成为Master,它获得了10秒的lease,当前时间假设是00:00:00,那么它Master地位可以用到00:00:10,当时间到达00:00:10时,A、B、C会重新进行Master选举,每个节点都有可能成为Master(从工程的角度触发,A继续为Master的概率更大),如果这时候A的网络断了,不能联通B、C的集群了, 那么A会自动下线,不会去竞争,这样就不会出现“脑裂”的现象。

      

       ---------------------------------------------- 华丽的分割线 ----------------------------------------------

      

        设计方案如下:(server代表集群中的一台机器,也可看作一个进程,server之间是平等的)

  1. 各个server之间用ntpserver时间同步(保证服务器之间秒级同步即可)
  2. 各个server持有一个唯一ID号(ip+进程号),通过此id唯一标识一个server实例
  3. 各个server定义一个lease租期,单位为秒
  4. Mysql唯一表唯一一条记录维护全局Master的信息,ACID保证一致性
  5. Master Server每半个lease期向Mysql更新如上的唯一一条记录,并更新心跳,维护Master状态
  6. Slaver Server每半个lease周期从mysql获取Master Server信息,如果数据库中Master的Lease超过了当前时间(heartbeat_time+ lease > current_time),则申请当Master。

      这其中比较棘手的问题是:

        1、由于数据库访问和休眠的时间(lease的一半),有时延的存在,要处理Mysql异常、网络异常。

        2、可能存在同时抢占Master的server,这个时候就需要一个验证机制保证为抢到Master的server自动退位为Slaver


      下面给出图实例 :(10.0.0.1为Master)


     10.0.0.1 crash了。mysql中维护的10.0.0.1的主信息已过期,其他节点去抢占



      各个节点再次读取数据库,查看是否是自己抢占成功了:



之后,10.0.0.3作为Master对外服务。此时如果10.0.0.1重启,可作为Slaver。如果10.0.0.1因为网络分化或者网络异常而不能维护心跳,则在超过自身lease时自动停止服务,不会出现“双Master”的现象。


      每个Server遵循如下流程:



        数据库设计:


        某一时刻,数据库中Master的信息:

 

       当前时间: 45分15秒

       当前Master Lease :6秒

       当前Master Lease可用到: 45分21秒

  

       ---------------------------------------------- 华丽的分割线 ----------------------------------------------

       3、适用的场景

        一、生命周期内可使用Mysql、并且各个server之间时间同步。

        二、需要集群中选出唯一主对外提供服务,其他节点作为slaver做standby,主lease过期时竞争为Master

        三、对比zookeeper,可满足如果集群挂掉一半节点,也可正常工作的情况,比如只有一主一备。

        四、允许选主操作在秒级容错的系统,选主的时候可能有lease/2秒的时间窗口,此时服务可能不可用。

        五、允许lease/2秒内出现极限双Master情况,但是概率很小。


        ---------------------------------------------- 华丽的分割线 ----------------------------------------------

        4、Java语言实现描述

      一些配置信息和时间相关、休眠周期相关的时间变量
        final long interval = lease / intervalDivisor;
        long waitForLeaseChallenging = 0L; 
        lease = lease / 1000L;

        long challengeFailTimes = 0L; 
        long takeRest = 0L; 
        long dbExceptionTimes = 0L; 
        long offlineTime = 0L; 
        Random rand = new Random();
        Status stateMechine = Status.START;

        long activeNodeLease = 0L; 
        long activeNodeTimeStamp = 0L; 

        数据库异常的处理:

            KeepAlive keepaliveNode = null;
            try {
                /* first of all get it from mysql */
                keepaliveNode = dbService.accquireAliveNode();
                if (stateMechine != Status.START && keepaliveNode==null)
                    throw new Exception();
                // recount , avoid network shake
                dbExceptionTimes = 0L;
            } catch (Exception e) {
                log.fatal("[Scanner] Database Exception with times : " + dbExceptionTimes++);
                if (stateMechine == Status.OFFLINE) {
                    log.warn("[Scanner] Database Exception , OFFLINE ");
                } else if (dbExceptionTimes >= 3) {
                    log.fatal("[Scanner] Database Exception , Node Offline Mode Active , uniqueid : " + uniqueID);
                    stateMechine = Status.OFFLINE;
                    dbExceptionTimes = 0L;
                    offlineTime = System.currentTimeMillis();
                    online = false;
                } else
                    continue;
            }

        总的循环和状态机的变迁:

        while (true) {

            SqlSession session = dbConnecction.openSession();
            ActionScanMapper dbService = session.getMapper(ActionScanMapper.class);

            KeepAlive keepaliveNode = null;
            try {
                /* first of all get it from mysql */
                keepaliveNode = dbService.accquireAliveNode();
                if (stateMechine != Status.START && keepaliveNode==null)
                    throw new Exception();
                // recount , avoid network shake
                dbExceptionTimes = 0L;
            } catch (Exception e) {
                log.fatal("[Scanner] Database Exception with times : " + dbExceptionTimes++);
                if (stateMechine == Status.OFFLINE) {
                    log.warn("[Scanner] Database Exception , OFFLINE ");
                } else if (dbExceptionTimes >= 3) {
                    log.fatal("[Scanner] Database Exception , Node Offline Mode Active , uniqueid : " + uniqueID);
                    stateMechine = Status.OFFLINE;
                    dbExceptionTimes = 0L;
                    offlineTime = System.currentTimeMillis();
                    online = false;
                } else
                    continue;
            }

            try {

                activeNodeLease = keepaliveNode!=null ? keepaliveNode.getLease() : activeNodeLease;
                activeNodeTimeStamp = keepaliveNode!=null ? keepaliveNode.getTimestamp() : activeNodeTimeStamp;
                takeRest = interval;

                switch (stateMechine) {
                    case START:
                        if (keepaliveNode == null) {
                            log.fatal("[START] Accquire node is null , ignore ");
                            // if no node register here , we challenge it
                            stateMechine = Status.CHALLENGE_REGISTER;
                            takeRest = 0;
                        } else {
                            // check the lease , wether myself or others 
                            if (activeNodeLease < timestampGap(activeNodeTimeStamp)) {
                                log.warn("[START] Lease Timeout scanner for uniqueid : " + uniqueID + ", timeout : "
                                            + timestampGap(activeNodeTimeStamp));
                                if (keepaliveNode.getStatus().equals(STAT_CHALLENGE))
                                    stateMechine = Status.HEARTBEAT;
                                else {
                                    stateMechine = Status.CHALLENGE_MASTER;
                                    takeRest = 0;
                                }
                            } else if (keepaliveNode.getUniqueID().equals(uniqueID)) {
                                // I'am restart
                                log.info("[START] Restart Scanner for uniqueid : " + uniqueID
                                                + ", timeout : " + timestampGap(activeNodeTimeStamp));
 stateMechine = Status.HEARTBEAT;
                            } else {
                                log.info("[START] Already Exist Keepalive Node with uniqueid : " + uniqueID);
                                stateMechine = Status.HEARTBEAT;
                            }
                        }
                        break;
                    case HEARTBEAT:
                        /* uniqueID == keepaliveNode.uniqueID */
                        if (keepaliveNode.getUniqueID().equals(uniqueID)) {
                            if (activeNodeLease < timestampGap(activeNodeTimeStamp)) {
                                // we should challenge now , without nessesary to checkout Status[CHALLENGE]
                                log.warn("[HEARTBEAT] HEART BEAT Lease is timeout for uniqueid : " + uniqueID
                                                + ", time : " + timestampGap(activeNodeTimeStamp));
                                stateMechine = Status.CHALLENGE_MASTER;
                                takeRest = 0;
                                break;
                            } else {
                                // lease ok , just update mysql keepalive status
                                dbService.updateAliveNode(keepaliveNode.setLease(lease));
                                online = true;
                                log.info("[HEARTBEAT] update equaled keepalive node , uniqueid : " + uniqueID
                                        + ", lease : " + lease + "s, remain_usable : " +
                                        ((activeNodeTimeStamp * 1000L + lease * 1000L) - System.currentTimeMillis()) + " ms");
                            }
                        } else {
                            /* It's others , let's check lease */
                            if (activeNodeLease < timestampGap(activeNodeTimeStamp)) {
                                if (keepaliveNode.getStatus().equals(STAT_CHALLENGE)) {
                                    waitForLeaseChallenging = (long) (activeNodeLease * awaitFactor);
                                    if ((waitForLeaseChallenging) < timestampGap(activeNodeTimeStamp)) {
                                        log.info("[HEARTBEAT] Lease Expired , Diff[" + timestampGap(activeNodeTimeStamp) + "] , Lease[" + activeNodeLease + "]");
                                        stateMechine = Status.CHALLENGE_MASTER;
                                        takeRest = 0;
                                    } else {
                                        log.info("[HEARTBEAT] Other Node Challenging , We wait for a moment ...");
                                    }
                                } else {
                                    log.info("[HEARTBEAT] Lease Expired , Diff[" + timestampGap(activeNodeTimeStamp) + "] , lease[" + activeNodeLease + "]");
                                    stateMechine = Status.CHALLENGE_MASTER;
                                    takeRest = 0;
                                }
                            } else {
                                online = false;
                                log.info("[HEARTBEAT] Exist Active Node On The Way with uniqueid : "
                                                + keepaliveNode.getUniqueID() + ", lease : " + keepaliveNode.getLease());
                            }
                        }
                        break;
                    case CHALLENGE_MASTER:
                        dbService.challengeAliveNode(new KeepAlive().setUniqueID(uniqueID).setLease(lease));
                        online = false;
                        // wait for the expired node offline automatic
                        // and others also have changce to challenge
takeRest = activeNodeLease;
                        stateMechine = Status.CHALLENGE_COMPLETE;
                        log.info("[CHALLENGE_MASTER] Other Node is timeout["
                                        + timestampGap(activeNodeTimeStamp) + "s] , I challenge with uniqueid : " + uniqueID
                                        + ", lease : " + lease + ", wait : " + lease);
                        break;
                    case CHALLENGE_REGISTER:
                        dbService.registerNewNode(new KeepAlive().setUniqueID(uniqueID).setLease(lease));
                        online = false;
                        // wait for the expired node offline automatic 
                        // and others also have changce to challenge
                        takeRest = activeNodeLease;
                        stateMechine = Status.CHALLENGE_COMPLETE;
                        log.info("[CHALLENGE_REGISTER] Regiter Keepalive uniqueid : " + uniqueID + ", lease : " + lease);
                        break;
                    case CHALLENGE_COMPLETE :
                        if (keepaliveNode.getUniqueID().equals(uniqueID)) {
                            dbService.updateAliveNode(keepaliveNode.setLease(lease));
                            online = true;
                            log.info("[CHALLENGE_COMPLETE] I Will be the Master uniqueid : " + uniqueID);
                            // make the uptime correct
                            stateMechine = Status.HEARTBEAT;
                        } else {
                            online = false;
                            log.warn("[CHALLENGE_COMPLETE] So unlucky , Challenge Failed By Other Node with uniqueid : " + keepaliveNode.getUniqueID());
                            if (challengeFailTimes++ >= (rand.nextLong() % maxChallenge) + minChallenge) {
                                // need't challenge anymore in a long time
                                takeRest=maxChallengeAwaitInterval;
                                stateMechine = Status.HEARTBEAT;
                                challengeFailTimes = 0L;
                                log.info("[CHALLENGE_COMPLETE] Challenge Try Times Used Up , let's take a long rest !");
                            } else {
stateMechine = Status.HEARTBEAT;
                                log.info("[CHALLENGE_COMPLETE] Challenge Times : " + challengeFailTimes + ", Never Give Up , to[" + stateMechine + "]");
                            }
                        }
                        break;
                    case OFFLINE :
                        log.fatal("[Scanner] Offline Mode Node with uniqueid : " + uniqueID);
                        if (System.currentTimeMillis() - offlineTime >= maxOfflineFrozen) {
                            // I am relive forcely
                            log.info("[Scanner] I am relive to activie node  , uniqueid : " + uniqueID);
                            stateMechine = Status.HEARTBEAT;
                            offlineTime = 0L;
                        } else if (keepaliveNode != null) {
                            // db is reconnected
                            stateMechine = Status.HEARTBEAT;
                            offlineTime = 0L;
                            log.info("[Scanner] I am relive to activie node  , uniqueid : " + uniqueID);
                        }
                        break;

                    default :
                        System.exit(0);
                }

                session.commit();
                session.close();

                if (takeRest != 0)
                    Thread.sleep(takeRest);

                log.info("[Scanner] State Stage [" + stateMechine + "]");

            } catch (InterruptedException e) {
                log.fatal("[System] Thread InterruptedException : " + e.getMessage());
            } finally {
                log.info("[Scanner] UniqueID : " + uniqueID + ", Mode : " + (online?"online":"offline"));
            }
        }

    }

    enum Status {
        START, HEARTBEAT, CHALLENGE_MASTER, CHALLENGE_REGISTER, CHALLENGE_COMPLETE, OFFLINE
    }

5 进一步的优化
        一、在各个系统竞争Master时,可能因为节点太多,冲突概率较大,可以通过在数据库中增加字段Status状态字段,标识是否有其他节点正在争抢Master,如果是,则可以暂停等一下,然后在尝试,如果那个节点成功抢到了Master,则会省去很多节点冲突的概率。
        
        二、由于出现很极端的情况,因为竞争Master的时间和lease时间都是固定的,则可能出现”时间轴共振“的现象,最典型的如一直在竞争Master但是一直失败,然后一直重试。所有的server在同一时刻都在赶同样的事情。可以通过增加时间随机性解决问题,如尝试抢占Master连续失败,则通过random产生随机数然后sleep,抵消共振。



作者:GugeMichael 发表于2013-5-23 18:13:00 原文链接
阅读:91 评论:0 查看评论

相关 [分布 利用 mysql] 推荐:

分布式选主 -- 利用Mysql ACID和Lease协议实现选主和高可用

- - CSDN博客架构设计推荐文章
      在实际生产开发中,遇到一些多节点共存,需要选主,并且要实现HA自动容错的场景,思考了写方法拿出来和大家分享一下. Lease协议,Mysql ACID.       系统中有很多应用场景要类似主从架构,主服务器(Master)对外提供服务,从服务器(Salve)热备份,不提供服务但随时活着,如果Master出现宕机或者网络问题,Slave即可接替Master对外服务,并由Slave提升为Master(新主).

利用tcpdump抓取mysql sql语句

- - 学习笔记
这个脚本是我之前在网上无意间找个一个利用tcpdump 抓包工具获取mysql流量,并通过过滤把sql 语句输入. 脚本不是很长,但是效果很好. #!/bin/bash #this script used montor mysql network traffic.echo sql tcpdump -i eth0 -s 0 -l -w - dst port 3306 | strings | perl -e ' while(<>) { chomp; next if /^[^ ]+[ ]*$/;.

MySQL分布式中间件:MyCAT

- - 标点符
随着传统的数据库技术日趋成熟、计算机网络技术的飞速发展和应用范围的扩充,数据库应用已经普遍建立于计算机网络之上. 这时集中式数据库系统表现出它的不足:. 集中式处理,势必造成性能瓶颈;. 应用程序集中在一台计算机上运行,一旦该计算机发生故障,则整个系统受到影响,可靠性不高;. 集中式处理引起系统的规模和配置都不够灵活,系统的可扩充性差.

利用 index、explain和profile优化mysql数据库查询小结

- - 博客园_首页
想必大家对index,explain和profile的利用也很多,这是我最近两天优化mysql语句查询资料整理的一些内容,希望大家可以一起来补充一下. 1.最好是在相同类型的字段间进行比较的操作. 在MySQL 3.23版之前,这甚至是一个必须的条件. 例如不能将一个建有索引的INT字段和BIGINT字段进行比较;但是作为特殊的情况,在CHAR类型的字段和VARCHAR类型字段的字段大小相同的时候,可以将它们进行比较.

利用keepalived构建高可用MySQL-HA(转)

- - 数据库 - ITeye博客
关于MySQL-HA,目前有多种解决方案,比如heartbeat、drbd、mmm、共享存储,但是它们各有优缺点. heartbeat、drbd配置较为复杂,需要自己写脚本才能实现MySQL自动切换,对于不会脚本语言的人来说,这无疑是一种脑裂问题;对于mmm,生产环境中很少有人用,且mmm 管理端需要单独运行一台服务器上,要是想实现高可用,就得对mmm管理端做HA,这样无疑又增加了硬件开支;对于共享存储,个人觉得MySQL数据还是放在本地较为安全,存储设备毕竟存在单点隐患.

利用一致性哈希水平拆分MySql单表

- - snoopyxdy的博客
Sharding(切片) 不是一门新技术,而是一个相对简朴的软件理念,就是当我们的数据库单机无法承受高强度的i/o时,我们就考虑利用 sharding 来把这种读写压力分散到各个主机上去. 所以Sharding 不是一个某个特定数据库软件附属的功能,而是在具体技术细节之上的抽象处理,是Horizontal Partitioning 水平扩展(或横向扩展)的解决方案,其主要目的是为突破单节点数据库服务器的 I/O 能力限制,注意这里是突破单点数据库服务器的“I/O”能力.

阿里开源Mysql分布式中间件:Cobar

- - 数据库 - ITeye博客
Cobar是阿里巴巴研发的关系型数据的分布式处理系统(Amoeba的升级版,该产品成功替代了原先基于Oracle的数据存储方案,目前已经接管了3000+个MySQL数据库的schema,平均每天处理近50亿次的SQL执行请求. )(github上面的是源码,大家下来需要自己用maven2编译后运行、者放Eclipse里面运行,一开始我用maven3没有执行成功.

分布式MySQL数据库TDSQL架构分析(转)

- - 数据库 - ITeye博客
腾讯计费平台部为了解决基于内存的NoSQL解决方案HOLD平台在应对多种业务接入时的不足,结合团队在MySQL领域多年应用和优化经验,最终在MySQL存储引擎基础上,打造一套分布式SQL系统TDSQL. 腾讯计费平台部托管着公司90%以上的虚拟账户,如QB、Q点、包月服务、游戏的二级账户等,为了保证能顺畅支撑公司各大业务的实时在线交易,并且在各种灾难场景下数据是一致并且可用的,对系统的可用性、一致性切换要求非常高,因此计费团队历来都非常重视高一致性存储系统的建设.

浅谈 Redis 与 MySQL 的耦合性以及利用管道完成 MySQL 到 Redis 的高效迁移

- - CSDN博客数据库推荐文章
    ㈠ Redis 与 MySQL 的耦合性.     在业务架构早期、我们便该"吃着碗里的看着锅里的"、切莫让MySQL 有梦、而Redis 无心.     毕竟、有些关系型的结构不适合放到Redis跑、"男女搭配、干活不累"嘛、推荐让MySQL与Redis喜结连理.     其次、这 2 人、一般是在不同场景做选择、而不会在性能上选择、.

利用RabbitMQ实现分布式事务

- -
  实现要点:1、构建本地消息表及定时任务,确保消息可靠发送;2、RabbitMQ可靠消费;3、redis保证幂等.   两个服务:订单服务和消息服务.   使用springboot构建项目,相关代码如下. //设置消息发送确认回调,发送成功后更新消息表状态. //定时扫描记录表,将发送状态为0的消息再次发送,甚至可以记录重发次数,必要时人工干预,生产环境中需要单独部署定时任务.