Akka Cluster 2.3.1 bug分析和相关实现原理

标签: akka cluster bug | 发表时间:2014-06-14 19:25 | 作者:netcomm
出处:http://www.iteye.com

       Akka这样一个scala世界里的明星,给我们提供了各种各样吸引人的功能和特性,尤其在分布式、高并发领域。但就像任何其他优秀的框架,它的实现也必然会有其复杂性,在Roland Kuhn(Akka Tech Lead)的带领下,Akka的实现原理吸收了各个领域内成熟、领先的理论。尤其是Akka里cluster的实现,更是体现了非常多的优秀理论和实战经验。

        但由于它目前还处在实验阶段,在使用过程中还是会有可能碰到这样或那样的问题,下面就以Akka 2.3.1为例,详细分析我们碰到的一个bug。

1)场景描述

     集群里有两台机器SeedNode1(10.10.10.110) 和 SeedNode2(10.10.10.220),Akka的配置文件application.conf里相关配置如下:

   seed-nodes = [

        "akka.tcp://[email protected]:2551",

        "akka.tcp://[email protected]:2552"]

       我们先启动SeedNode1,等一会启动SeedNode2,发现SeedNode2和SeedNode1的TCP链路是连上了,但就是无法正常进行工作。但如果先让SeedNode2先启动,然后再启动SeedNode1,则

没有问题,集群可正常启动。

 

2)分析

        为了更好方便大家理解,下面先介绍一下cluster和remote的相关实现细节,这样才能前后串起来。

2.1)cluster的启动

        要使用一个cluster首先要启动它,所以我们先从启动这个步骤的实现开始进行分析。Akka集群的启动首先就是要启动一种叫做种子节点(SeedNode)的节点们。只有种子节点启动成功,其他节点才能选择任意一个种子节点加入集群。

        种子节点默认可配置多个,它们之间没有任何区别,种子节点的启动分以下几种情况:

1.某种子节点启动,它首先判断自己的ip是否在种子节点配置列表中,如果在并且是第一个,则它在一个规定时间内(默认是5秒),向其他种子节点发送‘InitJoin’消息,如果有确认消息返回,则加入第一个返回确认的种子节点所在的cluster中,否则,它自己将创建一个新的cluster。(这些任务由FirstSeedNodeProcess这个Actor完成,任务完成后它就销毁自己)

 

2.某种子节点启动,它首先判断自己的ip是否在种子节点配置中,但不是第一个,则它向其他种子节点发送消息,如果在一个规定时间内(默认是5秒)没有收到任何确认消息,则它将不断重试,直到有一个种子节点返回正确的确认消息,然后就加入这个种子节点所在的cluster中。(这里注意以下,它不会自己创建一个新cluster)。(这些任务由JoinSeedNodeProcess这个Actor完成,任务完成后它就销毁自己)

       从上面的分析,我们可以得出下面的一些结论:

#.一个集群第一次启动成功,那一定是种子节点配置列表中排在第一位的节点,由它来创建出集群。但是随着时间的推移,排在第一的种子节点有可能重启了,那这个时候,它将首选加入到其他种子节点去。

#一个种子节点可以加入任何一个其他节点,不用非得都加到排第一位的节点上。

 

下面我们举例说明,有种子节点1、2、3:

* 1. seed2启动, 但是没有收到seed1 或seed3的确认。

* 2. seed3启动,没有收到seed1 的确认消息(seed2处在’inactive’状态)。

* 3. seed1 启动,创建cluster并加入到自己中。

* 4. seed2 重试加入过程,收到seed1的确认, 加入到seed1。

* 5. seed3重试加入过程,先收到seed2的确认, 加入到seed2。

 

2.2)remote通讯链路的上行、下行实现  

2.2.1)上行 路径 (listen启动的全过程)

由于上行路径较复杂,所以画了几张图辅助说明:



 
                                         (
建立listen的步骤 )

 

                                              (接收一个新链路请求 )



                                                (接收一个新链路处于等待握手状态 )

 

1###可以把Remoting这个非常重要的类作为通讯模块的入口,它在启动的时候(start方法里)会向           EndpointManager这个Actor发送Listen消息,启动底层通讯NettyTransport的listen操作。

 

2###由AkkaProtocolTransport类来包一层NettyTransport,所以,先调用的是AkkaProtocolTransport的listen方法,这个方法里产生一个upstreamListenerPromise,这个promise最后会被成赋值为ActorAssociationEventListener(EndpointManager的实例),而这个promise的作用是为了设置AkkaProtocolManager的associationListener属性为EndpointManager的实例。

 

3###NettyTransport在linsten过程中,会返回一个associationListenerPromise,这个promise会通过调用interceptListen方法而被赋值ActorAssociationEventListener(AkkaProtocolManager的实例)。

而这个promise有两个作用:

***把建立起来的通讯Channel(监听端口的)置为可读状态(setReadable),以便接收后续进入的消息。

***作为TcpServerHandler的构造参数传入(_associationListenerFuture),TcpServerHandler实例(它其实是

netty里SimpleChannelUpstreamHandler的一个扩展)里最重要的方法是onConnect这个回调方法。当有外部链接建立成功,

onConnect方法会被调用,紧接着会调用initInbound方法,然后在该promise处等待,直到promise被成功赋值。

 

4###当上面initInbound方法里的promise被成功唤醒,它就会调用init方法。

 

5###init方法里首先会创建一个TcpAssociationHandle实例(包含一个readHandlerPromise),这个Promise在这里等待被唤醒(它被后面7处的操作唤醒而设置channel(新链接的)置为可读状态(setReadable),同时在netty中注册该channel的listen为ProtocolStateActor实例),然后会向AkkaProtocolManager实例发送InboundAssociation消息(这个消息里包含一个TcpAssociationHandle实例)。

 

6###AkkaProtocolManager实例收到InboundAssociation消息,创建一个ProtocolStateActor实例(调用inboundProps构造方法),这个实例的构造函数里包含两个重要的参数TcpAssociationHandle实例、EndpointManager的实例;

 

7###ProtocolStateActor实例的这种构造方法会把TcpAssociationHandle实例里的readHandlerPromise设置值而唤醒它。

 

8###ProtocolStateActor实例初始化后会等待在接受握手的状态中(WaitHandshake),这个时候如果接收到网络报文,decode后发现是Associate消息,则调用notifyInboundHandler方法。在这个方法中会向EndpointManager实例发送InboundAssociation(new AkkaProtocolHandle(...))消息,notifyInboundHandler方法也创建了一个readHandlerPromise,它作为参数放在发往EndpointManager实例的消息里,然后等待被赋值。

 

9###EndpointManager实例收到InboundAssociation消息后,根据addressToWritable(EndpointPolicy规则的集合)进行一些必要的判断,如果符合要求则调用createAndRegisterEndpoint方法,这个方法最主要是创建EndpointWriter实例并注册这个实例。不符合则进行相关动作,如保存这个InboundAssociation消息,等待后续条件合适再处理。

 

10###在创建EndpointWriter实例的preStart方法里,判断是否已经存在AkkaProtocolHandle实例,如果已经存在则创建一个EndpointReader实例,并把它作为值设置给步骤7里的readHandlerPromise,使readHandlerPromise这个Promise的future被唤醒。

 

11###ProtocolStateActor实例的readHandlerPromise被唤醒后,会向自己发送一条HandleListenerRegistered(EndpointReader实例)的消息,接收到这个消息后,它会修改自己状态机里的状态数据为ListenerReady。后续所有接受的网络数据包就会被正常的decode和分发了。

       

2.2.2)下行路径

作为发送端(client),当seed节点A向seed节点B发送InitJoin消息时,调用链如下:

1###向处在accepting状态中的EndpointManager实例发送'Send(message, senderOption, recipientRef, _)'

 

2###EndpointManager实例调用createAndRegisterWritingEndpoint方法,创建一个ReliableDeliverySupervisor实例(在EndpointWriter实例之上封了一层,以加强可靠性)。

并且向addressToWritable这个HashMap里添加一条记录。

 

3###ReliableDeliverySupervisor实例会创建一个EndpointWriter实例,在其preStart方法里,由于传入的AkkaProtocolHandle为None,所以会调用transport.associate(remoteAddress, ...),同时EndpointWriter实例进入Initializing状态。

 

4###上面的transport是AkkaProtocolTransport实例,它会向AkkaProtocolManager实例的发送一个AssociateUnderlyingRefuseUid消息

 

5###AkkaProtocolManager实例收到AssociateUnderlyingRefuseUid消息后,调用createOutboundStateActor方法,该方法调用ProtocolStateActor.outboundProps的构造方法。

 

6###ProtocolStateActor实例的outboundProps构造方法,会调用NettyTransport实例的associate方法,它会调用NettyFutureBridge(bootstrap.connect(socketAddress)进行真正的网络连接。

 

7.1###如果无法成功建立连接,则向外发送异常,这个异常会最终被EndpointManager实例捕获。

 

8###EndpointManager实例捕获异常后,根据异常情况进行处理,如果是链接失败异常则调用markAsFailed修改addressToWritable相关配置。

 

7.2###如果成功建立连接,则InitJoin消息会发送对对方机器。

 

3)bug具体原因分析

      通过上面的cluster集群启动过程的分析和remoting的实现过程,可以用来具体分析一下我们的问题场景。 我们是先启动SeedNode1,它启动后会调用remoting的下行路径向SeedNode2发送 ’InitJoin‘消息,它在发送几次后,还没收到响应则自己创建了集群。等我们再启动SeedNode2的时候,SeedNode2会向SeedNode1发起链接,走的是SeedNode1的上行路径,于是bug发生了。

        它具体原因就在下行链路的处理环节8###中没有捕获ConnectException异常,也就没有对addressToWritable相关配置进行调整。这就使得上行链路的处理环节9###EndpointManager,无法正常往下进行。

      该bug在今年4月份被修复,2.3.2及其之后的版本都没有问题,具体修复请查看 https://github.com/akka/akka/commit/672e7f947c9d4e3499bb3667a7230685546b7f7b

      虽然就是新增了一个对ConnectException异常的捕获,但分析这个bug的原因过程,还是有收获的,应该能对使用Akka的remoting、cluster模块的相关朋友有帮助。



已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [akka cluster bug] 推荐:

Akka Cluster 2.3.1 bug分析和相关实现原理

- - 开源软件 - ITeye博客
       Akka这样一个scala世界里的明星,给我们提供了各种各样吸引人的功能和特性,尤其在分布式、高并发领域. 但就像任何其他优秀的框架,它的实现也必然会有其复杂性,在Roland Kuhn(Akka Tech Lead)的带领下,Akka的实现原理吸收了各个领域内成熟、领先的理论. 尤其是Akka里cluster的实现,更是体现了非常多的优秀理论和实战经验.

java 协程 实现 Akka

- - zzm
Akka是开源的,可以通过Apache 2许可获得. 可以从 http://akka.io/downloads/ 下载.         对并发/并行程序的简单的、高级别的抽象.         异步、非阻塞、高性能的事件驱动编程模型.         非常轻量的事件驱动处理(1G内存可容纳约270万个actors).

Akka简单性能分析

- - 并发编程网 - ifeve.com
因为最近工作的关系,要把异步任务从应用服务器中拆分到专门的异步处理服务器中. 是采用MQ的方式将任务消息发出,在服务端进行处理,如下图所示:. 这种方案是采用MQ作为中间的媒介,在服务端采用线程池异步处理任务,处理完成之后将结果发送到MQ中,客户端采用侦听的方式得到结果继续进行处理. 这种方案的不足是,可能在某些需求的情况下,需要将结果存放到共享的HashMap或者Threadlocal中进行存放结果,客户端会一直阻塞,直到得到结果,从多线程的角度来说,还是用了共享变量,虽然共享变量可能是线程安全的,但是从并发模型的角度来讲,并不是一个最好的方式.

小试 MariaDB Galera Cluster

- - raynix 筆記
前些时, 难得老板关注新技术, 哈哈, 我于是有机会尝试一下数据库服务器集群. 什么是 Galera Cluster. 简单的说就是3个或以上的 MariaDB 服务器相互作为镜像. 我按照 Digital Ocean 的指点, 用 AWS 上 3 个虚拟机做了个最小的集群, 下面是我的一些心得(针对 MariaDB 10.0.17):.

找bug记(1)

- BTK 4eVeR - BlogJava-庄周梦蝶
    转载请注明出处 http://www.blogjava.net/killme2008/archive/2011/07/10/354062.html.     上周在线上系统发现了两个bug,值得记录下查找的过程和原因. 以后如果还有查找bug比较有价值的经历,我也会继续分享.     第一个bug的起始,是在线上日志发现一个频繁打印的异常——java.lang.ArrayIndexOutOfBoundsException.

找bug记(2)

- gengmao - BlogJava-庄周梦蝶
    这篇blog迟到了很久,本来是想写另一个跟网络相关bug的查找过程,偷偷懒,写下最近印象比较深刻的bug. 这个bug是我的同事水寒最终定位到的.     前几个月同事报告称有一个线上MQ集群会同一时间抛出ArrayIndexOutOfBoundsException这个异常,也就是数组越界.

Akka 和 Storm 的设计差异

- - zzm
Akka 和 Storm 的设计差异. Akka 和 Storm 都是实现低延时, 高吞吐量计算的重要工具. 如果说 Akka 是 linux 内核的话, storm 更像是类似 Ubuntu 的发行版.然而 Storm. 并非 Akka 的发行版, 或许说 Akka 比作 BSD, Storm 比作 Ubuntu 更合适..

Percona XtraDB Cluster 搭配 HAProxy

- - 小惡魔 - 電腦技術 - 工作筆記 - AppleBOY
本篇文章紀錄安裝 Percona XtraDB Cluster (簡稱 PXC) 及搭配 HAProxy 做分散流量系統,其實在業界已經很常看到 HAProxy + MySQL Cluster Database 解決方案,HAProxy 幫您解決負載平衡,並且偵測系統是否存活,管理者也就不用擔心 MySQL 服務是否會掛掉.