分布式日志收集系统Apache Flume的设计介绍

标签: 分布 日志 系统 | 发表时间:2014-01-13 06:30 | 作者:zbf8441372
出处:http://blog.csdn.net

概述

Flume是Cloudera公司的一款高性能、高可能的分布式日志收集系统。现在已经是Apache Top项目。 Github地址。同Flume相似的日志收集系统还有 Facebook ScribeApache ChuwkaApache Kafka(也是LinkedIn的)。Flume是后起之秀,本文尝试简要分析Flume数据流通过程中提供的组件、可靠性保证来介绍Flume的主要设计,不涉及Flume具体的安装使用,也不涉及代码层面的剖析。写博文来记录这个工具主要是觉得与最近开发的一个流式的数据搬运的工具在设计上有相似之处,想看看有没有可以参考的地方。在博文的基础上,还需要浏览一下源码。

数据流通

Flume传输的数据的基本单位是event,如果是文本文件,通常是一行记录,这也是事务的基本单位。flume运行的核心是agent。它是一个完整的数据收集工具,含有三个核心组件,分别是source、channel、sink。Event从Source,流向Channel,再到Sink,本身为一个byte数组,并可携带headers信息。Event代表着一个数据流的最小完整单元,从外部数据源来,向外部的目的地去。Source:完成对日志数据的收集,分成transtion 和 event 打入到channel之中。Channel:主要提供一个队列的功能,对source提供中的数据进行简单的缓存。Sink:取出Channel中的数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。通过这些组件,event可以从一个地方流向另一个地方,如下图所示。

Source消费从外部流进的Events,如AvroSource接收外部客户端传来的或是从别的agent流出来的Avro Event。Source可以把event送往一个或多个channel。channel是一个队列,持有event等待sink来消费,一种Channel的实现:FileChannel使用本地文件系统来作为它的存储。Sink的作用是把Event从channel里移除,送往外部数据仓库或给下一站agent的Source,如HDFSEventSink送往HDFS。同个agent下的source和sink是异步的。下面再举几个数据流通的例子,说明不同的使用方式。







Source接入

Client端操作消费数据的来源,Flume支持Avro,log4j,syslog和http post(body为json格式)。可以让应用程序同已有的Source直接打交道,如AvroSource,SyslogTcpSource。也可以写一个Source,以IPC或RPC的方式接入自己的应用,Avro和Thrift都可以(分别有NettyAvroRpcClient和ThriftRpcClient实现了RpcClient接口),其中Avro是默认的RPC协议。具体代码级别的Client端数据接入,可以参考 官方手册
对现有程序改动最小的使用方式是使用是直接读取程序原来记录的日志文件,基本可以实现无缝接入,不需要对现有程序进行任何改动。 
对于直接读取文件Source,有两种方式: 
  1. ExecSource:以运行Linux命令的方式,持续的输出最新的数据,如tail -F 文件名指令,在这种方式下,取的文件名必须是指定的。 ExecSource可以实现对日志的实时收集,但是存在Flume不运行或者指令执行出错时,将无法收集到日志数据,无法保证日志数据的完整性。
  2. SpoolSource:监测配置的目录下新增的文件,并将文件中的数据读取出来。需要注意两点:拷贝到spool目录下的文件不可以再打开编辑;spool目录下不可包含相应的子目录。SpoolSource虽然无法实现实时的收集数据,但是可以使用以分钟的方式分割文件,趋近于实时。如果应用无法实现以分钟切割日志文件的话,可以两种收集方式结合使用。 在实际使用的过程中,可以结合log4j使用,使用log4j的时候,将log4j的文件分割机制设为1分钟一次,将文件拷贝到spool的监控目录。log4j有一个TimeRolling的插件,可以把log4j分割的文件到spool目录。基本实现了实时的监控。Flume在传完文件之后,将会修改文件的后缀,变为.COMPLETED(后缀也可以在配置文件中灵活指定)
public class MySource extends AbstractSource implements Configurable, PollableSource {
  private String myProp;

  @Override
  public void configure(Context context) {
    String myProp = context.getString("myProp", "defaultValue");

    // Process the myProp value (e.g. validation, convert to another type, ...)

    // Store myProp for later retrieval by process() method
    this.myProp = myProp;
  }

  @Override
  public void start() {
    // Initialize the connection to the external client
  }

  @Override
  public void stop () {
    // Disconnect from external client and do any additional cleanup
    // (e.g. releasing resources or nulling-out field values) ..
  }

  @Override
  public Status process() throws EventDeliveryException {
    Status status = null;

    // Start transaction
    Channel ch = getChannel();
    Transaction txn = ch.getTransaction();
    txn.begin();
    try {
      // This try clause includes whatever Channel operations you want to do

      // Receive new data
      Event e = getSomeData();

      // Store the Event into this Source's associated Channel(s)
      getChannelProcessor().processEvent(e)

      txn.commit();
      status = Status.READY;
    } catch (Throwable t) {
      txn.rollback();

      // Log exception, handle individual exceptions as needed

      status = Status.BACKOFF;

      // re-throw all Errors
      if (t instanceof Error) {
        throw (Error)t;
      }
    } finally {
      txn.close();
    }
    return status;
  }}

Channel

Channel有多种方式:有MemoryChannel,JDBC Channel,MemoryRecoverChannel,FileChannel。
MemoryChannel可以实现高速的吞吐,但是无法保证数据的完整性。
MemoryRecoverChannel在官方文档的建议上已经建义使用FileChannel来替换。
FileChannel保证数据的完整性与一致性。在具体配置不现的FileChannel时,建议FileChannel设置的目录和程序日志文件保存的目录设成不同的磁盘,以便提高效率。 

Sink

Sink在设置存储数据时,可以向文件系统、数据库、hadoop存数据,在日志数据较少时,可以将数据存储在文件系中,并且设定一定的时间间隔保存数据。在日志数据较多时,可以将相应的日志数据存储到Hadoop中,便于日后进行相应的数据分析。
更多sink的内容可以参考 官方手册
public class MySink extends AbstractSink implements Configurable {
  private String myProp;

  @Override
  public void configure(Context context) {
    String myProp = context.getString("myProp", "defaultValue");

    // Process the myProp value (e.g. validation)

    // Store myProp for later retrieval by process() method
    this.myProp = myProp;
  }

  @Override
  public void start() {
    // Initialize the connection to the external repository (e.g. HDFS) that
    // this Sink will forward Events to ..
  }

  @Override
  public void stop () {
    // Disconnect from the external respository and do any
    // additional cleanup (e.g. releasing resources or nulling-out
    // field values) ..
  }

  @Override
  public Status process() throws EventDeliveryException {
    Status status = null;

    // Start transaction
    Channel ch = getChannel();
    Transaction txn = ch.getTransaction();
    txn.begin();
    try {
      // This try clause includes whatever Channel operations you want to do

      Event event = ch.take();

      // Send the Event to the external repository.
      // storeSomeData(e);

      txn.commit();
      status = Status.READY;
    } catch (Throwable t) {
      txn.rollback();

      // Log exception, handle individual exceptions as needed

      status = Status.BACKOFF;

      // re-throw all Errors
      if (t instanceof Error) {
        throw (Error)t;
      }
    } finally {
      txn.close();
    }
    return status;
  }}

可靠性

Flume的核心是把数据从数据源收集过来,再送到目的地。为了保证输送一定成功,在送到目的地之前,会先缓存数据,待数据真正到达目的地后,删除自己缓存的数据。
Flume使用事务性的方式保证传送Event整个过程的可靠性。Sink必须在Event被存入Channel后,或者,已经被传达到下一站agent里,又或者,已经被存入外部数据目的地之后,才能把Event从Channel中remove掉。这样数据流里的event无论是在一个agent里还是多个agent之间流转,都能保证可靠,因为以上的事务保证了event会被成功存储起来。而Channel的多种实现在可恢复性上有不同的保证。也保证了event不同程度的可靠性。比如Flume支持在本地保存一份文件channel作为备份,而memory channel将event存在内存queue里,速度快,但丢失的话无法恢复。
具体看一下Transaction。Source和Sink封装了Channel提供的对Event的事务存、取接口,下图为一个transaction过程:

一个Channel的实现里会包括一个transaction的实现,每个与channel打交道的source和sink都得带有一个transaction对象。下面的例子中可以看到一个Event的状态和变化会在一次transation中完成。transaction的状态也对应了时序图中的各个状态。
Channel ch = new MemoryChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
  // This try clause includes whatever Channel operations you want to do

  Event eventToStage = EventBuilder.withBody("Hello Flume!",
                       Charset.forName("UTF-8"));
  ch.put(eventToStage);
  // Event takenEvent = ch.take();
  // ...
  txn.commit();
} catch (Throwable t) {
  txn.rollback();

  // Log exception, handle individual exceptions as needed

  // re-throw all Errors
  if (t instanceof Error) {
    throw (Error)t;
  }
} finally {
  txn.close();
}

(全文完)
作者:zbf8441372 发表于2014-1-12 22:30:32 原文链接
阅读:107 评论:0 查看评论

相关 [分布 日志 系统] 推荐:

分布式日志收集收集系统:Flume

- - 标点符
Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统. 支持在系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力. Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera.

分布式日志收集系统Apache Flume的设计介绍

- - CSDN博客架构设计推荐文章
Flume是Cloudera公司的一款高性能、高可能的分布式日志收集系统. 现在已经是Apache Top项目. 同Flume相似的日志收集系统还有 Facebook Scribe, Apache Chuwka, Apache Kafka(也是LinkedIn的). Flume是后起之秀,本文尝试简要分析Flume数据流通过程中提供的组件、可靠性保证来介绍Flume的主要设计,不涉及Flume具体的安装使用,也不涉及代码层面的剖析.

实时分布式日志系统plumelog落地

- - 掘金 架构
这是我参与「掘金日新计划 · 4 月更文挑战」的第25天, 点击查看活动详情. 无代码入侵的分布式日志系统,基于log4j、log4j2、logback搜集日志,设置链路ID,方便查询关联日志. 基于elasticsearch作为查询引擎. 全程不占应用程序本地磁盘空间,免维护;对于项目透明,不影响项目本身运行.

分布式日志

- - Java - 编程语言 - ITeye博客
最近完成一个简单的日志管理系统,拿出来跟大家分享一下. 3、支持文件输出、habse输出、mongodb输出. 基于以上三点功能,我们下面详细说明. 说道支持这个功能,有个同事认为没有这个必要,他的观点是log4j的配置不需要经常变动,不需要支持这样的功能;本人的观点是“配置可以进行统一管理、而且正式机跟测试机的log4j的配置肯定会有一些差异的”,因此这个功能是必须的.

【Monogdb】MongoDB的日志系统

- - CSDN博客数据库推荐文章
记得前几天有个小伙伴要查看mongodb的日志,从而排查问题,可能总找不到日志放在何处,今天就系统说一下mongodb的日志系统. mongodb中主要有四种日志. 分别是系统日志、Journal日志、oplog主从日志、慢查询日志等. 这些 日志记录着Mongodb数据库不同方便的踪迹. 系统日志在Mongdb数据中很中重要,它记录mongodb启动和停止的操作,以及服务器在运行过程中发生的任何异常信息;配置系统日志也非常简单,在运行mongod时候增加一个参数logpath,就可以设置;.

开源日志系统比较

- Sepher - 董的博客
(注:时间有限,整篇文章写得比较粗糙,如果想进一步探讨,可给我发email/msn:[email protected],或者加入QQ讨论群:41172282). 许多公司的平台每天会产生大量的日志(一般为流式数据,如,搜索引擎的pv,查询等),处理这些日志需要特定的日志系统,一般而言,这些系统需要具有以下特征:.

使用elasticsearch+simple_flow搭建实时日志搜索系统

- - ITeye博客
    在实际的系统中,我们经常会进行分布式的系统部署,但是这样会导致一个问题,系统日志也被分散开了,导致根据日志进行错误定位不太方便,所以,利用simple_flow实时流的特点,再配合elasticsearch建立索引,搭配构建一个实时日志搜索系统.具体流程图如下:. 1.启动elasticsearch, 这个参考官方文档  http://www.elasticsearch.org/.

MYSQL优化之关闭文件系统日志

- - Linux - 操作系统 - ITeye博客
通常数据库系统在数据大量操作时,会产生很大的磁盘io,关闭linux各文件系统的日志功能可以提高数据库吞吐量. 1)查看文件系统是否开启日志功能了. tune2fs -l /dev/sda2 | grep 'has_journal' 如果返回结果中出现has_journal,则说明该文件系统开启了日志(jbd,也即 journaling block device).

开源日志系统简介——Scribe,flume,kafka,Chukwa

- - 互联网 - ITeye博客
许多公司的平台每天会产生大量的日志(一般为流式数据,如,搜索引擎的pv,查询等),处理这些日志需要特定的日志系统,一般而言,这些系统需要具有以下特征:. (1) 构建应用系统和分析系统的桥梁,并将它们之间的关联解耦;. (2) 支持近实时的在线分析系统和类似于Hadoop之类的离线分析系统;. 即:当数据量增加时,可以通过增加节点进行水平扩展.

Flume + kafka + HDFS构建日志采集系统

- - 企业架构 - ITeye博客
    Flume是一个非常优秀日志采集组件,类似于logstash,我们通常将Flume作为agent部署在application server上,用于收集本地的日志文件,并将日志转存到HDFS、kafka等数据平台中;关于Flume的原理和特性,我们稍后详解,本文只简述如何构建使用Flume + kafka + HDFS构建一套日志采集系统.