分布式应用框架Akka快速入门

标签: 分布 应用 框架 | 发表时间:2013-12-12 00:47 | 作者:jmppok
出处:http://blog.csdn.net

转载请注明出处: http://blog.csdn.net/jmppok/article/details/17264495

本文结合网上一些资料,对他们进行整理,摘选和翻译而成,对Akka进行简要的说明。引用资料在最后列出。

1.什么是Akka

Akka 是一个用 Scala 编写的库,用于简化编写容错的、高可伸缩性的 Java 和 Scala 的 Actor 模型应用。

官方网站 (http://akka.io/)的介绍是:

Akka is a toolkit and runtime for building highly concurrent, distributed, and fault tolerant event-driven applications on the JVM.

Build powerful concurrent & distributed applications more easily.

翻译成中文就是:Akka是一个开发库和运行环境,可以用于构建高并发、分布式、可容错、事件驱动的基于JVM的应用。使构建高并发的分布式应用更加容易。


Akka可以以两种不同的方式来使用

  • 以库的形式:在web应用中使用,放到 WEB-INF/lib 中或者作为一个普通的Jar包放进classpath。
  • 以微内核的形式:你可以将应用放进一个独立的内核。


2.Akka的五大特性

1)易于构建并行和分布式应用 (Simple Concurrency & Distribution)
      Akka在设计时采用了异步通讯和分布式架构,并对上层进行抽象,如Actors、Futures ,STM等。

2)可靠性(Resilient by Design)

     系统具备自愈能力,在本地/远程都有监护。

3)高性能(High Performance)

    在单机中每秒可发送50000000个消息。内存占用小,1GB内存中可保存2500000个actors。

4)弹性,无中心(Elastic — Decentralized)

   自适应的负责均衡,路由,分区,配置

5)可扩展(Extensible)

  可以使用Akka 扩展包进行扩展。


3. 什么场景下特别适合使用Akka?

我们看到Akka被成功运用在众多行业的众多大企业,从投资业到商业银行、从零售业到社会媒体、仿真、游戏和赌博、汽车和交通系统、数据分析等等等等。任何需要高吞吐率和低延迟的系统都是使用Akka的候选。

Actor使你能够进行服务失败管理(监管者),负载管理(缓和策略、超时和隔离),水平和垂直方向上的可扩展性(增加cpu核数和/或增加更多的机器)管理。

下面的链接中有一些Akka用户关于他们如何使用Akka的描述: http://stackoverflow.com/questions/4493001/good-use-case-for-akka

所有以上这些都在这个Apache2许可的开源软件中。


以下是Akka被部署到生产环境中的领域
事务处理 (在线游戏,金融/银行业,贸易,统计,赌博,社会媒体,电信):垂直扩展,水平扩展,容错/高可用性

服务后端 (任何行业,任何应用):提供REST, SOAP, Cometd, WebSockets 等服务 作为消息总线/集成层 垂直扩展,水平扩展,容错/高可用性

并发/并行 (任何应用):运行正确,方便使用,只需要将jar包添加到现有的JVM项目中(使用Scala,java, Groovy或jruby)

仿真:主/从,计算网格,MaReduce等等.

批处理 (任何行业):Camel集成来连接批处理数据源 Actor来分治地批处理工作负载

通信Hub (电信, Web媒体, 手机媒体):垂直扩展,水平扩展,容错/高可用性

游戏与赌博 (MOM, 在线游戏, 赌博):垂直扩展,水平扩展,容错/高可用性

商业智能/数据挖掘/通用数据处理:垂直扩展,水平扩展,容错/高可用性

复杂事件流处理:垂直扩展,水平扩展,容错/高可用性


4. Scala语言

Scala是一种多范式的编程语言,设计初衷是要集成面向对象编程和函数式编程的各种特性。

百度百科,Scala语言介绍:

http://baike.baidu.com/link?url=86mpCDT8PbqP29vR-ZDHQeMt1grr--g42DtVMkjvru4g57_TATYquSDnjLOJl-WovbPuywyikI7q1I0ZvWjuca

百度文库,Scala编程入门:

http://wenku.baidu.com/view/27cbf218964bcf84b9d57b83.html

5.Actors模型

Actor模型并非什么新鲜事物,它由Carl Hewitt于上世纪70年代早期提出,目的是为了解决分布式编程中一系列的编程问题。其特点如下:
系统中的所有事物都可以扮演一个Actor
Actor之间完全独立
在收到消息时Actor所采取的所有动作都是并行的,在一个方法中的动作没有明确的顺序
Actor由标识和当前行为描述
Actor可能被分成原始(primitive)和非原始(non primitive)类别

很多开发语言都提供了原生的Actor模型。例如erlang,scala等




Actor,可以看作是一个个独立的实体,他们之间是毫无关联的。但是,他们可以通过消息来通信。

一个Actor收到其他Actor的信息后,它可以根据需要作出各种相应。消息的类型可以是任意的,消息的内容也可以是任意的。这点有点像webservice了。只提供接口服务,你不必了解我是如何实现的。

 

一个Actor如何处理多个Actor的请求呢?它先建立一个消息队列,每次收到消息后,就放入队列,而它每次也从队列中取出消息体来处理。通常我们都使得这个过程是循环的。让Actor可以时刻处理发送来的消息。


6.示例(http://www.th7.cn/Program/java/2012/03/29/67015.shtml)

应用场景:服务端要处理大量的客户端的请求,并且处理请求耗费较长的时间。这时就需要使用并发处理。多线程是一种方法,这里使用Akka框架处理并发。(以下代码在Groovy1.7.5、akka-actors-1.2下运行成功)

这里有三个角色:Client、Master、Worker
Client傻乎乎地发同步请求给Master,一直等到结果返回客户端才离开。
Master接收客户端发来的请求,然后将请求交给Worker处理,处理完成之后将结果返回给Client。
Worker负责具体的业务处理,它耗费的事件比较长。

所以这里的关键在于Master,如果Master线性地“接收请求――调用Worker处理得到返回结果――将结果返回”,这样的系统必将歇菜。
使用Akka可以方便地将它变成并行地。


先看看Client,模拟同时多个客户端给Master发请求
import akka.actor.ActorRef
import static akka.actor.Actors.remote


class HelloClient implements Runnable {
    int seq
    String serviceName

    HelloClient(int seq, String serviceName) {
        this.seq = seq
        this.serviceName = serviceName
    }

    void run() {
        ActorRef actor = remote().actorFor(serviceName, "10.68.15.113", 9999);
        String str = "Hello--" + seq
        println "请求-----${str}"
        Object res = actor.sendRequestReply(str)
        println "返回-----${res}"
    }

    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread(new HelloClient(i, "hello-service"))
            thread.start()        //同时启动5个客户端请求Master
        }
    }
}


真正干活的Worker:
import akka.actor.UntypedActor


class HelloWorker extends UntypedActor {    //Worker是一个Actor,需要实现onReceive方法
    @Override
    void onReceive(Object o) {
        println "Worker 收到消息----" + o
        if (o instanceof String) {
            String result = doWork(o)        //调用真实的处理方法
            getContext().replyUnsafe(result)//将结果返回给Master
        }
    }
    //Worker处理其实很简单,仅仅将参数字符串改造一下而已。只不过使其sleep了20秒,让它变得“耗时较长”
    String doWork(String str) {
        Thread.sleep(1000 * 20)
        return "result----" + str + " 。"
    }
}


负责并发调度的Master:
import akka.actor.ActorRef
import akka.actor.Actors
import akka.actor.UntypedActor
import akka.actor.UntypedActorFactory
import akka.dispatch.Future
import akka.dispatch.Futures
import java.util.concurrent.Callable


class HelloMaster extends UntypedActor {
    @Override
    void onReceive(Object o) {
        println "Master接收到Work消息:" + o
        def clientChannel = getContext().channel()    //客户端链接Channel
        //启动worker actor
        ActorRef worker = Actors.actorOf(new UntypedActorFactory() {
            public UntypedActor create() {
                return new HelloWorker();
            }
        }).start();

        //这里实现真正的并发
        Future f1 = Futures.future(new Callable() {
            Object call() {
                def result = worker.sendRequestReply(o)            //将消息发给worker actor,让Worker处理业务,同时得到返回结果
                worker.stop()
                println "Worker Return----" + result
                clientChannel.sendOneWay(result)                //将结果返回给客户端
                return result
            }
        })

        println "Future call over"
    }

    public static void main(String[] args) {    //启动Master进程,绑定IP、端口和服务
        Actors.remote().start("10.68.15.113", 9999).register(
                "hello-service",
                Actors.actorOf(HelloMaster.class));
    }
}


看看客户端的调用日志
请求-----Hello--4
请求-----Hello--1
请求-----Hello--3
请求-----Hello--0
请求-----Hello--2
[GENERIC] [11-10-6 下午9:49] [RemoteClientConnected(akka.remote.netty.NettyRemoteSupport@195b6aad,/10.68.15.113:9999)]
[GENERIC] [11-10-6 下午9:49] [RemoteClientStarted(akka.remote.netty.NettyRemoteSupport@195b6aad,/10.68.15.113:9999)]
返回-----result----Hello--0 。
返回-----result----Hello--1 。
返回-----result----Hello--2 。
返回-----result----Hello--4 。
返回-----result----Hello--3 。

 

服务端的日志:
[GENERIC] [11-10-6 下午9:49] [RemoteServerClientConnected(akka.remote.netty.NettyRemoteSupport@5a4fdf11,Some(/10.68.15.113:53462))]
Master接收到Work消息:Hello--1
Future call over
Master接收到Work消息:Hello--2
Future call over
Worker 收到消息----Hello--1
[DEBUG]   [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-13] [HelloWorker] started
[DEBUG]   [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-13] [HelloWorker] started
Worker 收到消息----Hello--2
Master接收到Work消息:Hello--0
Future call over
Master接收到Work消息:Hello--3
Worker 收到消息----Hello--0
[DEBUG]   [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-7] [HelloWorker] started
Future call over
Master接收到Work消息:Hello--4
[DEBUG]   [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-7] [HelloWorker] started
Worker 收到消息----Hello--3
Future call over
Worker 收到消息----Hello--4
[DEBUG]   [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-7] [HelloWorker] started
Worker 将消息Hello--1处理完成
Worker 将消息Hello--2处理完成
Worker Return----result----Hello--2 。
Worker Return----result----Hello--1 。
[DEBUG]   [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-13] [HelloWorker] stopping
Worker 将消息Hello--0处理完成
[DEBUG]   [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-3] [HelloWorker] stopping
Worker Return----result----Hello--0 。
[DEBUG]   [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-23] [HelloWorker] stopping
Worker 将消息Hello--4处理完成
Worker 将消息Hello--3处理完成
Worker Return----result----Hello--4 。
Worker Return----result----Hello--3 。
[DEBUG]   [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-11] [HelloWorker] stopping
[DEBUG]   [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-10] [HelloWorker] stopping


可以从服务端日志看到,Master接收到Work消息后onReceive就结束了(函数最后打印Future call over),一连接收了5个消息,然后Worker才收到消息并处理。最后消息处理完成好后f1的call才收到Worker Return的消息。
这里使用Future实现并发。


如果不使用Future:
def result = worker.sendRequestReply(o)       //将消息发给worker actor
println "Worker Return----" + result
getContext().replyUnsafe(result)      // 将worker返回的消息回复给客户端
这就成了同步处理(第一个消息处理完后才接收并处理第二个消息)。

 

如果在Future后调用了f1.await()或f1.get(),也成同步的了,因为await将等待worker返回后再继续往下执行。       
Future f1 = Futures.future(new Callable() {
    Object call() {
        def result = worker.sendRequestReply(o)       //将消息发给worker actor
        worker.stop()
        println "Worker Return----" + result
        clientChannel.sendOneWay(result)
        return result
    }
})

println "Future call over" + f1.get()

服务器日志如下:
[GENERIC] [11-10-6 下午10:06] [RemoteServerStarted(akka.remote.netty.NettyRemoteSupport@7e566633)]
[DEBUG]   [11-10-6 下午10:06] [main] [HelloMaster] started
[GENERIC] [11-10-6 下午10:07] [RemoteServerClientConnected(akka.remote.netty.NettyRemoteSupport@7e566633,Some(/10.68.15.113:53571))]
Master接收到Work消息:Hello--0
[DEBUG]   [11-10-6 下午10:07] [akka:event-driven:dispatcher:global-3] [HelloWorker] started
Worker 收到消息----Hello--0
Worker 将消息Hello--0处理完成
[DEBUG]   [11-10-6 下午10:07] [akka:event-driven:dispatcher:global-5] [HelloWorker] stopping
Worker Return----result----Hello--0 。
Future call overresult----Hello--0 。
Master接收到Work消息:Hello--2
Worker 收到消息----Hello--2
[DEBUG]   [11-10-6 下午10:07] [akka:event-driven:dispatcher:global-3] [HelloWorker] started
Worker 将消息Hello--2处理完成
Worker Return----result----Hello--2 。
Future call overresult----Hello--2 。
Master接收到Work消息:Hello--3
[DEBUG]   [11-10-6 下午10:07] [akka:event-driven:dispatcher:global-10] [HelloWorker] stopping
[DEBUG]   [11-10-6 下午10:07] [akka:event-driven:dispatcher:global-3] [HelloWorker] started
Worker 收到消息----Hello--3
Worker 将消息Hello--3处理完成
Worker Return----result----Hello--3 。
Future call overresult----Hello--3 。
Master接收到Work消息:Hello--4
[DEBUG]   [11-10-6 下午10:08] [akka:event-driven:dispatcher:global-14] [HelloWorker] stopping
[DEBUG]   [11-10-6 下午10:08] [akka:event-driven:dispatcher:global-3] [HelloWorker] started
Worker 收到消息----Hello--4
Worker 将消息Hello--4处理完成
Worker Return----result----Hello--4 。
Future call overresult----Hello--4 。
Master接收到Work消息:Hello--1
[DEBUG]   [11-10-6 下午10:08] [akka:event-driven:dispatcher:global-18] [HelloWorker] stopping
[DEBUG]   [11-10-6 下午10:08] [akka:event-driven:dispatcher:global-3] [HelloWorker] started
Worker 收到消息----Hello--1
Worker 将消息Hello--1处理完成
Worker Return----result----Hello--1 。
Future call overresult----Hello--1 。
[DEBUG]   [11-10-6 下午10:08] [akka:event-driven:dispatcher:global-21] [HelloWorker] stopping
Master接收到Work消息:Hello--6
[DEBUG]   [11-10-6 下午10:08] [akka:event-driven:dispatcher:global-24] [HelloWorker] started
Worker 收到消息----Hello--6
Worker 将消息Hello--6处理完成
Worker Return----result----Hello--6 。
Future call overresult----Hello--6 。
Master接收到Work消息:Hello--5
[DEBUG]   [11-10-6 下午10:09] [akka:event-driven:dispatcher:global-26] [HelloWorker] stopping
Worker 收到消息----Hello--5
[DEBUG]   [11-10-6 下午10:09] [akka:event-driven:dispatcher:global-24] [HelloWorker] started

 


需要注意的是,Akka默认使用环境变量%AKKA_HOME%/config/akka.conf配置,默认配置是client的read-timeout = 10(客户端连接10秒后将自动断开,这时服务端再给客户端发消息就发布了了。报RemoteServerWriteFailed异常),可以将值设为0,将一直连着不断开。
actor的timeout默认为5秒,也太短了,延长(不能设为0,0为总是超时).


7.参考文档

1) akka官网    http://akka.io/        

2)akka简介        http://blog.chinaunix.net/uid-25885064-id-3400549.html

3)Scala语言:集成面向对象和函数式编程的特性

4)actors模型           http://janeky.iteye.com/blog/1504125

5)示例          http://www.th7.cn/Program/java/2012/03/29/67015.shtml

6)akka资料  http://www.jdon.com/tags/10531

作者:jmppok 发表于2013-12-11 16:47:54 原文链接
阅读:106 评论:0 查看评论

相关 [分布 应用 框架] 推荐:

分布式应用框架 Dapr

- - IT瘾-dev
微服务架构已成为构建云原生应用程序的标准,微服务架构提供了令人信服的好处,包括可伸缩性,松散的服务耦合和独立部署,但是这种方法的成本很高,需要了解和熟练掌握分布式系统. 为了使用所有开发人员能够使用任何语言和任何框架轻松地构建便携式微服务应用程序,无论是开发新项目还是迁移现有代码. Dapr是一种可移植的,事件驱动的,无服务器运行时,用于构建跨云和边缘的分布式应用程序.

分布式应用框架Akka快速入门

- - CSDN博客架构设计推荐文章
转载请注明出处: http://blog.csdn.net/jmppok/article/details/17264495. 本文结合网上一些资料,对他们进行整理,摘选和翻译而成,对Akka进行简要的说明. Akka 是一个用 Scala 编写的库,用于简化编写容错的、高可伸缩性的 Java 和 Scala 的 Actor 模型应用.

android应用框架

- - CSDN博客移动开发推荐文章
原文地址:http://developer.android.com/guide/components/fundamentals.html. android应用程序一旦装进设备,每个程序会在它自己安全的沙盒里运行. 1.android操作系统是一个多用户linux系统,每一个应用程序是一个用户. 2.默认情况下,系统会为每个app分配唯一的linux用户id(这个id只会被系统使用,并且只会被这个app知道),系统为每个app的所有文件都设置了权限,只有被分配了这个app用户ID的程序可以访问它.

分布式框架Dubbo

- - Linux - 操作系统 - ITeye博客
互联网的发展,网站应用的规模不断扩大,常规的垂直应用架构已无法应对,分布式服务架构以及流动计算架构势在必行,Dubbo是一个分布式服务框架,在这种情况下诞生的. 现在核心业务抽取出来,作为独立的服务,使前端应用能更快速和稳定的响应. 大规模服务化之前,应用可能只是通过RMI或Hessian等工具,简单的暴露和引用远程服务,通过配置服务的URL地址进行调用,通过F5等硬件进行负载均衡.

分布式流式处理框架:Storm

- - 标点符
Storm是一个免费开源、分布式、高容错的实时计算系统. 它与其他大数据解决方案的不同之处在于它的处理方式. Hadoop 在本质上是一个批处理系统,数据被引入 Hadoop 文件系统 (HDFS) 并分发到各个节点进行处理. 当处理完成时,结果数据返回到 HDFS 供始发者使用. Hadoop的高吞吐,海量数据处理的能力使得人们可以方便地处理海量数据.

分布式服务框架:Zookeeper

- - 标点符
Zookeeper是一个高性能,分布式的,开源分布式应用协调服务. 它提供了简单原始的功能,分布式应用可以基于它实现更高级的服务,比如同步,配置管理,集群管理,名空间. 它被设计为易于编程,使用文件系统目录树作为数据模型. 服务端跑在java上,提供java和C的客户端API. Zookeeper是Google的Chubby一个开源的实现,是高有效和可靠的协同工作系统,Zookeeper能够用来leader选举,配置信息维护等,在一个分布式的环境中,需要一个Master实例或存储一些配置信息,确保文件写入的一致性等.

[Ext JS 4] MVC 应用程序框架

- - CSDN博客Web前端推荐文章
大型客户端应用程序总是很难编写,很难组织和很难维护. 随着功能的增加和更多的开发人员加入项目,对项目的控制也越来越困难了. Ext JS 4 提供了一个新的应用程序框架帮助组织代码. 模型 - 一组栏位和数据的集合. Model (在Ext JS 3中使用Record class). 视图 - 组件类型, grids,trees 和 panels 都是属于试图.

HTML5应用框架-Lavaca介绍

- - CSDN博客推荐文章
HTML5应用框架-Lavaca介绍. 作者:chszs,转载需注明. 博客主页:http://blog.csdn.net/chszs. Lavaca是一个全功能的HTML5应用框架,专注于快速、简便的构建Web应用. Lavaca是一个完整的构建系统,内建了配置以适应于不同的环境. 随着构建系统打包HTML、CSS和JavaScript,还有一个JavaScript文档生成根据,一个单元测试框架和数个通用的JavaScript库.

阿里巴巴Dubbo分布式服务框架已开源

- tangfl - ITeye论坛最新精华讨论帖
Serving services with invocations everyday, Dubbo becomes the key part of Alibaba's SOA solution and has been deployed to the whole alibaba.com family:.

分布式计算开源框架Hadoop入门实践

- - ITeye博客
一、分布式计算开源框架Hadoop实践. 在 SIP项目设计的过程中,对于它庞大的日志在开始时就考虑使用任务分解的多线程处理模式来分析统计,在我从前写的文章《Tiger Concurrent Practice --日志分析并行分解设计与实现》中有所提到. 但是由于统计的内容暂时还是十分简单,所以就采用Memcache作为计数器,结合MySQL就完成了访问 控制以及统计的工作.