Akka 和 Storm 的设计差异

标签: akka storm 设计 | 发表时间:2016-03-19 10:01 | 作者:
出处:http://m635674608.iteye.com

Akka 和 Storm 的设计差异

Akka 和 Storm 都是实现低延时, 高吞吐量计算的重要工具. 不过它们并非完全的竞品,
如果说 Akka 是 linux 内核的话, storm 更像是类似 Ubuntu 的发行版.然而 Storm
并非 Akka 的发行版, 或许说 Akka 比作 BSD, Storm 比作 Ubuntu 更合适.

实现的功能差异

Akka 包括了一套 API 和执行引擎.
Storm 除了 API 和执行引擎之外,还包括了监控数据,WEB界面,集群管理,消息传递保障机制.
此文讨论 Akka 和 Storm 重合的部分,也就是 API 和 执行引擎的异同.

API 差异

我们看下 Storm 两个主要的 API

    public interface ISpout extends Serializable {
    /**
     * Called when a task for this component is initialized within a worker on the cluster.
     * It provides the spout with the environment in which the spout executes.
     *
     * <p>This includes the:</p>
     *
     * @param conf The Storm configuration for this spout. This is the configuration provided to the topology merged in with cluster configuration on this machine.
     * @param context This object can be used to get information about this task's place within the topology, including the task id and component id of this task, input and output information, etc.
     * @param collector The collector is used to emit tuples from this spout. Tuples can be emitted at any time, including the open and close methods. The collector is thread-safe and should be saved as an instance variable of this spout object.
     */
    void open(Map conf, TopologyContext context, SpoutOutputCollector collector);

    /**
     * Called when an ISpout is going to be shutdown. There is no guarentee that close
     * will be called, because the supervisor kill -9's worker processes on the cluster.
     *
     * <p>The one context where close is guaranteed to be called is a topology is
     * killed when running Storm in local mode.</p>
     */
    void close();

    /**
     * Called when a spout has been activated out of a deactivated mode.
     * nextTuple will be called on this spout soon. A spout can become activated
     * after having been deactivated when the topology is manipulated using the
     * `storm` client.
     */
    void activate();

    /**
     * Called when a spout has been deactivated. nextTuple will not be called while
     * a spout is deactivated. The spout may or may not be reactivated in the future.
     */
    void deactivate();

    /**
     * When this method is called, Storm is requesting that the Spout emit tuples to the
     * output collector. This method should be non-blocking, so if the Spout has no tuples
     * to emit, this method should return. nextTuple, ack, and fail are all called in a tight
     * loop in a single thread in the spout task. When there are no tuples to emit, it is courteous
     * to have nextTuple sleep for a short amount of time (like a single millisecond)
     * so as not to waste too much CPU.
     */
    void nextTuple();

    /**
     * Storm has determined that the tuple emitted by this spout with the msgId identifier
     * has been fully processed. Typically, an implementation of this method will take that
     * message off the queue and prevent it from being replayed.
     */
    void ack(Object msgId);

    /**
     * The tuple emitted by this spout with the msgId identifier has failed to be
     * fully processed. Typically, an implementation of this method will put that
     * message back on the queue to be replayed at a later time.
     */
    void fail(Object msgId);
}

以及

    public interface IBasicBolt extends IComponent {
    void prepare(Map stormConf, TopologyContext context);
    /**
     * Process the input tuple and optionally emit new tuples based on the input tuple.
     *
     * All acking is managed for you. Throw a FailedException if you want to fail the tuple.
     */
    void execute(Tuple input, BasicOutputCollector collector);
    void cleanup();
}

和 akka 中 actor 的 api

    trait Actor {

  import Actor._

  // to make type Receive known in subclasses without import
  type Receive = Actor.Receive

  /**
   * Stores the context for this actor, including self, and sender.
   * It is implicit to support operations such as `forward`.
   *
   * WARNING: Only valid within the Actor itself, so do not close over it and
   * publish it to other threads!
   *
   * [[akka.actor.ActorContext]] is the Scala API. `getContext` returns a
   * [[akka.actor.UntypedActorContext]], which is the Java API of the actor
   * context.
   */
  implicit val context: ActorContext = {
    val contextStack = ActorCell.contextStack.get
    if ((contextStack.isEmpty) || (contextStack.head eq null))
      throw ActorInitializationException(
        s"You cannot create an instance of [${getClass.getName}] explicitly using the constructor (new). " +
          "You have to use one of the 'actorOf' factory methods to create a new actor. See the documentation.")
    val c = contextStack.head
    ActorCell.contextStack.set(null :: contextStack)
    c
  }

  /**
   * The 'self' field holds the ActorRef for this actor.
   * <p/>
   * Can be used to send messages to itself:
   * <pre>
   * self ! message
   * </pre>
   */
  implicit final val self = context.self //MUST BE A VAL, TRUST ME

  /**
   * The reference sender Actor of the last received message.
   * Is defined if the message was sent from another Actor,
   * else `deadLetters` in [[akka.actor.ActorSystem]].
   *
   * WARNING: Only valid within the Actor itself, so do not close over it and
   * publish it to other threads!
   */
  final def sender(): ActorRef = context.sender()

  /**
   * This defines the initial actor behavior, it must return a partial function
   * with the actor logic.
   */
  //#receive
  def receive: Actor.Receive
  //#receive

  /**
   * INTERNAL API.
   *
   * Can be overridden to intercept calls to this actor's current behavior.
   *
   * @param receive current behavior.
   * @param msg current message.
   */
  protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = receive.applyOrElse(msg, unhandled)

  /**
   * Can be overridden to intercept calls to `preStart`. Calls `preStart` by default.
   */
  protected[akka] def aroundPreStart(): Unit = preStart()

  /**
   * Can be overridden to intercept calls to `postStop`. Calls `postStop` by default.
   */
  protected[akka] def aroundPostStop(): Unit = postStop()

  /**
   * Can be overridden to intercept calls to `preRestart`. Calls `preRestart` by default.
   */
  protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = preRestart(reason, message)

  /**
   * Can be overridden to intercept calls to `postRestart`. Calls `postRestart` by default.
   */
  protected[akka] def aroundPostRestart(reason: Throwable): Unit = postRestart(reason)

  /**
   * User overridable definition the strategy to use for supervising
   * child actors.
   */
  def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy

  /**
   * User overridable callback.
   * <p/>
   * Is called when an Actor is started.
   * Actors are automatically started asynchronously when created.
   * Empty default implementation.
   */
  @throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest
  //#lifecycle-hooks
  def preStart(): Unit = ()

  //#lifecycle-hooks

  /**
   * User overridable callback.
   * <p/>
   * Is called asynchronously after 'actor.stop()' is invoked.
   * Empty default implementation.
   */
  @throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest
  //#lifecycle-hooks
  def postStop(): Unit = ()

  //#lifecycle-hooks

  /**
   * User overridable callback: '''By default it disposes of all children and then calls `postStop()`.'''
   * @param reason the Throwable that caused the restart to happen
   * @param message optionally the current message the actor processed when failing, if applicable
   * <p/>
   * Is called on a crashed Actor right BEFORE it is restarted to allow clean
   * up of resources before Actor is terminated.
   */
  @throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest
  //#lifecycle-hooks
  def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    context.children foreach { child ⇒
      context.unwatch(child)
      context.stop(child)
    }
    postStop()
  }

  //#lifecycle-hooks

  /**
   * User overridable callback: By default it calls `preStart()`.
   * @param reason the Throwable that caused the restart to happen
   * <p/>
   * Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash.
   */
  @throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest
  //#lifecycle-hooks
  def postRestart(reason: Throwable): Unit = {
    preStart()
  }
  //#lifecycle-hooks

  /**
   * User overridable callback.
   * <p/>
   * Is called when a message isn't handled by the current behavior of the actor
   * by default it fails with either a [[akka.actor.DeathPactException]] (in
   * case of an unhandled [[akka.actor.Terminated]] message) or publishes an [[akka.actor.UnhandledMessage]]
   * to the actor's system's [[akka.event.EventStream]]
   */
  def unhandled(message: Any): Unit = {
    message match {
      case Terminated(dead) ⇒ throw new DeathPactException(dead)
      case _                ⇒ context.system.eventStream.publish(UnhandledMessage(message, sender(), self))
    }
  }
}

可以说 Storm 主要的 API 和 Actor 非常相像, 不过从时间线上看 Storm 和 Akka
都是从差不多的时间开始开发的,因此很有可能 Storm 是作者受了 Erlang 的 Actor 实现启发而写的.
从目前的状况看来, 很有可能作者想用 Clojure 语言写一个"朴素"的 Actor 实现, 然而这个"朴素"实现已经满足了 Storm 的设计目标, 所以作者也没有继续把 Storm 变成一个 Actor 在 clojure 上的完整实现.

那么,仅仅是从 API 上看的话 Spout/Bolt 和 Actor 的差异有哪些呢?

Storm API 比 Actor 多的地方

Storm 在 API 上比 Actor 多了 ack 和 fail 两个接口. 有这两个接口主要是因为 Storm 比 Akka 的应用场景更加细分(基本上只是用于统计), 所以已经做好了容错机制,能让在这个细分领域的用户达到开箱可用.

另外,在 Storm 的 Tuple 类中存储着一些 context 信息,也是出于目标使用场景的需求封装的.

Actor API 比 Storm 多的地方

context: Spout 的 open 方法里也有 context, 然而 context 在 actor 中是随时可以调用的,表明 Actor 比 Spout 更加鼓励用户使用 context, context 中的数据也会动态更新.

self: Actor对自身的引用,可以理解为 Actor 模型更加支持下游收到数据的组件往上游回发数据的行为,甚至自己对自己发数据也可以.在 Storm 中,我们默认数据发送是单向的,下游接收的组件不会对上游有反馈(除了系统定义的ack,和fail)
postRestart: 区分 Actor 的第一次启动和重启, 还是蛮有用的,Storm 没有应该是最初懒得写或者没想到,后来又不想改核心 API.

unhandled: 对没有预期到会发送给自身的消息做处理,默认是传到一个系统 stream,因为 Actor 本身是开放的,外部应用只要知道这个 Actor 的地址就能发消息给它.Storm 本身只接收你为它设计好的消息,所以没有这个需求.

运行时差异

Actor 和 Task 的比较, 线程调度模型的不同, 以及代码热部署,Storm 的 ack 机制对异步代码的限制等.

Actor 和 Component 的比较

Component 是 Spout 和 Bolt 的总称,是 Storm 中执行用户代码的基本组件. 共同点是都根据消息做出响应,也能够存储内容,一次只有一个线程进入,除非你手动另外开启线程.主要的区别在于 Actor 是非常轻量的组件,你可以在一个程序里创建几万个 Actor, 或者每十行代码都在一个 Actor 里, 这都没有问题. 然而换成 Storm 的Component, 情况都不一样了,你最好只用若干个 Component 来描述顶层抽象.

线程调度模型

API 很相似,为什么 Actor 可以随便开新的, Component 就要尽量少开呢? 秘密都在 Akka 的 调度器(Dispatchers)里. Akka 程序的所有异步代码,包括 Actor, Future, Runnable 甚至 ParIterable, 可以说除了你要用主线程启动ActorSystem外,其他所有线程都可以交给Dispatcher管理.Dispatcher 可以自定义,默认的情况下采用了 "fork-join-executor",相对于一般的线程池,fork-join-executor 特别适合 Actor模型,可以提供相当优异的性能.

相比较的, Storm 的线程调度模型就要"朴素"很多,就是每个 Component 一个线程,或者若干个Component轮流共用一个线程,这也就是为什么Component不能开太多的原因.

代码热部署

实时计算方面,热部署的需求主要是诸如修改排序算法之类的,替换某个算法模块,其他东西不变.

因为 Storm 是可以通过 Thrift 支持任何语言编程的,所以你如果是用python之类的脚本语言写的算法,想要换掉算法而不重启,那只要把每台机器上相应位置的py文件替换掉就好了.不过这样就会让程序限定在用此类语言实现.

Akka 方面, 因为 Actor 模型对进程内和进程间的通信接口都是统一的, 可以负责算法的一类 Actor 作为单独的进程启动,代码更新了就重启这个进程. 虽然系统中有一个进程重启了,但是整个系统还是可以一刻不停地运转.

Storm 中的 Ack 机制

Storm 的 消息保障机制是 具有独创性的, 利用位亦或能够用非常小的内存,高性能地掌握数据处理过程中的成功或失败情况. 默认的情况下,在用户的代码中只需要指定一个MessageId, Ack 机制就能愉快地跑起来了. 所以通常用户不用关心这块内容, 但是默认接口的问题就是, 一旦使用了异步程序, ack 机制就会失效,包括 schedule 和 submit runnable 等行为,都不会被 Ack 机制关心,也就是说异步逻辑执行失败了,acker也不知道. 如何能让 Storm 的 Ack 机制与异步代码和谐相处,还是一个待探讨的问题.

总结

我认为 Storm 的 API 是优秀的, 可靠性也是在若干年的实践中得到证实的, 然而其核心运转机制过于朴素又给人一种烈士暮年的感觉. Storm 最初的使用者 Twitter 也在不久前公布了他们兼容 Storm 接口的新的解决方案 Heron, 不过并没有开源. 如果有开源方案能够基于 Akka "重新实现" 一个 Storm,那将是非常令人期待的事情. 我目前发现 gearpump是其中一个.

参考资料

 

https://segmentfault.com/a/1190000003886656

http://blog.csdn.net/jmppok/article/details/17267585

 http://www.linuxidc.com/Linux/2015-12/126439.htm



已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [akka storm 设计] 推荐:

Akka 和 Storm 的设计差异

- - zzm
Akka 和 Storm 的设计差异. Akka 和 Storm 都是实现低延时, 高吞吐量计算的重要工具. 如果说 Akka 是 linux 内核的话, storm 更像是类似 Ubuntu 的发行版.然而 Storm. 并非 Akka 的发行版, 或许说 Akka 比作 BSD, Storm 比作 Ubuntu 更合适..

Storm Akka Finagle对比及使用场景分析

- - CSDN博客云计算推荐文章
本文翻译自: http://blog.samibadawi.com/2013/04/akka-vs-finagle-vs-storm.html. Storm Akka Finagle对比及使用场景分析. Storm、Akka、Finagle是三个开源的分布式并行处理框架,都基于JVM运行. 他们在解决下面这些问题上十分有用:.

java 协程 实现 Akka

- - zzm
Akka是开源的,可以通过Apache 2许可获得. 可以从 http://akka.io/downloads/ 下载.         对并发/并行程序的简单的、高级别的抽象.         异步、非阻塞、高性能的事件驱动编程模型.         非常轻量的事件驱动处理(1G内存可容纳约270万个actors).

Akka简单性能分析

- - 并发编程网 - ifeve.com
因为最近工作的关系,要把异步任务从应用服务器中拆分到专门的异步处理服务器中. 是采用MQ的方式将任务消息发出,在服务端进行处理,如下图所示:. 这种方案是采用MQ作为中间的媒介,在服务端采用线程池异步处理任务,处理完成之后将结果发送到MQ中,客户端采用侦听的方式得到结果继续进行处理. 这种方案的不足是,可能在某些需求的情况下,需要将结果存放到共享的HashMap或者Threadlocal中进行存放结果,客户端会一直阻塞,直到得到结果,从多线程的角度来说,还是用了共享变量,虽然共享变量可能是线程安全的,但是从并发模型的角度来讲,并不是一个最好的方式.

storm简介

- - 搜索技术博客-淘宝
伴随着信息科技日新月异的发展,信息呈现出爆发式的膨胀,人们获取信息的途径也更加多样、更加便捷,同时对于信息的时效性要求也越来越高. 举个搜索场景中的例子,当一个卖家发布了一条宝贝信息时,他希望的当然是这个宝贝马上就可以被卖家搜索出来、点击、购买啦,相反,如果这个宝贝要等到第二天或者更久才可以被搜出来,估计这个大哥就要骂娘了.

Storm Trident 学习

- - 小火箭
Storm支持的三种语义:. 至少一次语义的Topology写法. 参考资料: Storm消息的可靠性保障 Storm提供了Acker的机制来保证数据至少被处理一次,是由编程人员决定是否使用这一特性,要使用这一特性需要:. 在Spout emit时添加一个MsgID,那么ack和fail方法将会被调用当Tuple被正确地处理了或发生了错误.

Storm实战之WordCount

- - 编程语言 - ITeye博客
 在全面介绍Storm之前,我们先通过一个简单的Demo让大家整体感受一下什么是Storm. 本地模式(Local Mode): 即Topology(相当于一个任务,后续会详细讲解)  运行在本地机器的单一JVM上,这个模式主要用来开发、调试. 远程模式(Remote Mode):在这个模式,我们把我们的Topology提交到集群,在这个模式中,Storm的所有组件都是线程安全的,因为它们都会运行在不同的Jvm或物理机器上,这个模式就是正式的生产模式.

storm常见问题解答

- - BlogJava-庄周梦蝶
    最近有朋友给我邮件问一些storm的问题,集中解答在这里. 一、我有一个数据文件,或者我有一个系统里面有数据,怎么导入storm做计算. 你需要实现一个Spout,Spout负责将数据emit到storm系统里,交给bolts计算. 怎么实现spout可以参考官方的kestrel spout实现:.

Storm 实时性分析

- - CSDN博客架构设计推荐文章
都说Storm是一个实时流处理系统,但Storm的实时性体现在什么方面呢. 首先有一个前提:这里的实时性和我们通常所说的实时系统(芯片+汇编或C编写的实时处理软件)的实时性肯定是没法比的,也不是同一个概念. 这里的实时性应该是一个相对的实时性(相对于Hadoop之类 ). 总结一下,Storm的实时性可能主要体现在:.

那些storm的坑坑

- - 开源软件 - ITeye博客
转载请声明出处:http://blackwing.iteye.com/blog/2147633. 在使用storm的过程中,感觉它还是不如hadoop那么成熟. 当然,它的流式处理能力挺让人眼前一亮,以前做的个性化推荐都是离线计算,现在总算把实时部分也加上了. 总结一下storm使用的些心得:. 1.尽量把大量数据处理行为分拆成多个处理component.