flume日志采集

标签: flume 日志 | 发表时间:2013-08-05 09:25 | 作者:sunmeng_Alex
出处:http://blog.csdn.net

1.  Log4j Appender

1.1.  使用说明

1.1.2.  Client端Log4j配置文件

(黄色文字为需要配置的内容)

log4j.rootLogger=INFO,A1,R

 

 

# ConsoleAppender out

log4j.appender.A1= org. apache.log4j.ConsoleAppender

log4j.appender.A1.layout= org. apache.log4j.PatternLayout

log4j.appender.A1.layout.ConversionPattern=%d{ yyyy/MM/ ddHH:mm:ss}%-5p%-10C {1} %m%n

# File out

//日志Appender修改为flume提供的Log4jAppender

log4j.appender.R= org. apache. flume.clients.log4jappender.Log4jAppender

log4j.appender.R.File=${ catalina.home}/logs/ ultraIDCPServer.log

//日志需要发送到的端口号,该端口要有ARVO类型的source在监听

log4j.appender.R.Port =44444

//日志需要发送到的主机ip,该主机运行着ARVO类型的source

log4j.appender.R.Hostname = localhost

log4j.appender.R.MaxFileSize=102400KB

# log4j.appender.R.MaxBackupIndex=5

log4j.appender.R.layout= org. apache.log4j.PatternLayout

log4j.appender.R.layout.ConversionPattern=%d{ yyyy/MM/ ddHH\: mm\: ss}%-5p%-10C {1} %m%n

log4j.appender.R.encoding=UTF-8

 

log4j.logger.com.ultrapower.ultracollector.webservice.MessageIntercommunionInterfaceImpl=INFO, webservice

log4j.appender.webservice= org. apache.log4j.FileAppender

log4j.appender.webservice.File=${ catalina.home}/logs/logsMsgIntercommunionInterface.log

log4j.appender.webservice.layout= org. apache.log4j.PatternLayout

log4j.appender.webservice.layout.ConversionPattern=%d{ yyyy/MM/ ddHH\: mm\: ss}%-5p[%t]%l%X-%m%n

log4j.appender.webservice.encoding=UTF-8

注:Log4jAppender继承自AppenderSkeleton,没有日志文件达到特定大小,转换到新的文件的功能

1.1.3.  flume agent配置

agent1.sources = source1

agent1.sinks = sink1

agent1.channels = channel1

# Describe/configure source1

agent1.sources.source1.type = avro

agent1.sources.source1.bind = 192.168.0.141

agent1.sources.source1.port = 44444

 

# Describe sink1

agent1.sinks.sink1.type = FILE_ROLL

agent1.sinks.sink1.sink.directory = /home/yubojie/flume/apache-flume-1.2.0/flume-out

 

# Use a channel which buffers events in memory

agent1.channels.channel1.type = memory

agent1.channels.channel1.capacity = 1000

agent1.channels.channel1.transactionCapactiy = 100

 

# Bind the source and sink to the channel

agent1.sources.source1.channels = channel1

agent1.sinks.sink1.channel = channel1

注:生成的文件的规则为每隔固定时间间隔生成一个新的文件,文件里面保存该时间段agent接收到的信息

1.2.  分析

1.       使用简便,工作量小。

2.       用户应用程序使用log4j作为日志记录jar包,而且项目中使用的jar包要在log4j-1.2.15版本以上,

3.       应用系统必须将flume所需jar包引入到项目中。如下所示为所有必须jar包:可能会存在jar冲突,影响应用运行

4.       能够提供可靠的数据传输,使用flume log4jAppender采集日志可以不在客户机上启动进程,而只通过修改logapppender直接把日志信息发送到采集机(参见图一),此种情况可以保证采集机接受到数据之后的数据可靠性,但是客户机与采集机连接失败时候数据会丢失。改进方案是在客户机上启动一个agent,这样可以保证客户机和采集机不能连通时,当能连通是日志也被采集上来,不会发送数据的丢失(参见图二),为了可靠性,需在客户机上启动进程

               

1.3.  日志代码

Log.info(“this message has DEBUG in it”);

1.4.  采集到的数据样例

this message has DEBUG in it

this message has DEBUG in it

 

2.  Exec source(放弃)

The problem with ExecSource and other asynchronous sources is that thesource can not guarantee that if there is a failure to put the event into theChannel the client knows about it. In such cases, the data will be lost. As afor instance, one of the most commonly requested features is thetail -F [file]-like use casewhere an application writes to a log file on disk and Flume tails the file,sending each line as an event. While this is possible, there’s an obviousproblem; what happens if the channel fills up and Flume can’t send an event?Flume has no way of indicating to the application writing the log file that itneeds to retain the log or that the event hasn’t been sent, for some reason. Ifthis doesn’t make sense, you need only know this: Your application can neverguarantee data has been received when using a unidirectional asynchronousinterface such as ExecSource! As an extension of this warning - and to becompletely clear - there is absolutely zero guarantee of event delivery whenusing this source. You have been warned.

注:即使是agent内部的可靠性都不能保证

2.1.  使用说明

2.1.1.  flume agent配置

 

# The configuration file needs to define the sources,

# the channels and the sinks.

# Sources, channels and sinks are defined per agent,

# in this case called 'agent'

# example.conf: A single-node Flume configuration

 

# Name the components on this agent

 

agent1.sources = source1

agent1.sinks = sink1

agent1.channels = channel1

# Describe/configure source1

#agent1.sources.source1.type = avro

agent1.sources.source1.type = exec

agent1.sources.source1.command = tail -f /home/yubojie/logs/ultraIDCPServer.log

#agent1.sources.source1.bind = 192.168.0.146

#agent1.sources.source1.port = 44444

 

agent1.sources.source1.interceptors = a

agent1.sources.source1.interceptors.a.type = org.apache.flume.interceptor.HostInterceptor$Builder

agent1.sources.source1.interceptors.a.preserveExisting = false

agent1.sources.source1.interceptors.a.hostHeader = hostname

 

 

# Describe sink1

#agent1.sinks.sink1.type = FILE_ROLL

#agent1.sinks.sink1.sink.directory = /home/yubojie/flume/apache-flume-1.2.0/flume-out

agent1.sinks.sink1.type = hdfs

agent1.sinks.sink1.hdfs.path = hdfs://localhost:9000/user/

agent1.sinks.sink1.hdfs.fileType = DataStream

 

# Use a channel which buffers events in memory

agent1.channels.channel1.type = memory

agent1.channels.channel1.capacity = 1000

agent1.channels.channel1.transactionCapactiy = 100

 

# Bind the source and sink to the channel

agent1.sources.source1.channels = channel1

agent1.sinks.sink1.channel = channel1

 

2.2.  分析

1.         tail方式采集日志需要宿主主机能够执行tail命令,应该是只有linux系统可以执行,不支持window系统日志采集

2.         EXEC采用异步方式采集,会发生日志丢失,即使在节点内的数据也不能保证数据的完整

3.         tail方式采集需要宿主操作系统支持tail命令,即原始的windows操作系统不支持tail命令采集

2.3.  采集到的数据样例

2012/10/26 02:36:34 INFO  LogTest     this message has DEBUG 中文 in it

2012/10/26 02:40:12 INFO  LogTest     this message has DEBUG 中文 in it

2.4.  日志代码

Log.info(“this message has DEBUG 中文 in it”);

3.  Syslog

Passing messages using syslogprotocol doesn't work well for longer messages.  The syslog appender forLog4j is hardcoded to linewrap around 1024 characters in order to comply withthe RFC.  I got a sample program logging to syslog, picking it up with asyslogUdp source, with a JSON layout (to avoid new-lines in stack traces) onlyto find that anything but the smallest stack trace line-wrapped anyway.  Ican't see a way to reliably reconstruct the stack trace once it is wrapped andsent through the flume chain.(注:内容不确定是否1.2版本)

   Syslog TCP需要指定eventsize,默认为2500

   Syslog UDP为不可靠传输,数据传输过程中可能出现丢失数据的情况。

3.1.  使用说明

3.1.1.  Client端示例代码

import java.io.IOException;

importjava.io.OutputStream;

import java.net.Socket;

import java.net.UnknownHostException;

 

 

publicclass SyslogTcp {

  publicstaticvoid main(String args[]){

     Socket client =  null;

     OutputStream out = null;

     try {

       client =  new Socket("127.0.0.1", 5140);

       out= client.getOutputStream(); 

        String event = "<4>hello\n"; 

        out.write(event.getBytes()); 

        out.flush(); 

        System. out.println("发送成功 ");

    }  catch (UnknownHostException e) {

       // TODO Auto-generated catch block

       e.printStackTrace();

    }  catch (IOException e) {

       // TODO Auto-generated catch block

       e.printStackTrace();

    }  finally{

        try {

           out.close();

       }  catch (IOException e) {

           // TODO Auto-generated catch block

           e.printStackTrace();

       } 

         try {

           client.close();

       }  catch (IOException e) {

           // TODO Auto-generated catch block

           e.printStackTrace();

       }

    }

      

 }

}

 

 

3.1.2.  日志接收的flume agent配置

agent1.sources = source1

agent1.sinks = sink1

agent1.channels = channel1

 

# Describe/configure source1

agent1.sources.source1.type =  syslogtcp

agent1.sources.source1.bind = 127.0.0.1

agent1.sources.source1.port = 5140

 

# Describe sink1

#agent1.sinks.sink1.type =  avro

#agent1.sinks.sink1.channels = channel1

#agent1.sinks.sink1. hostname = 192.168.0.144

#agent1.sinks.sink1.port = 44444

 

agent1.sinks.sink1.type = FILE_ROLL

agent1.sinks.sink1.sink.directory = E:\\file-out

 

 

# Use a channel which buffers events in memory

agent1.channels.channel1.type = memory

agent1.channels.channel1.capacity = 1000

agent1.channels.channel1.transactionCapactiy = 100

 

# Bind the source and sink to the channel

agent1.sources.source1.channels = channel1

agent1.sinks.sink1.channel = channel1

3.2.     分析

需要编写Client采集代码,增量采集日志信息通过socket发送到flume agent;对于长数据处理不是很理想。可靠性可以参考log4j appender的方式来保证。

 

 

4.  日志过滤Interceptor(FLUME-1358)

Flume支持依据正则表达式过滤event,但是在1.2.0的源代码中没有发现具体实现的代码,根据FLUME-1358的说明信息,可以将RegexFilteringInterceptor类加入到代码中使用。

需要的操作为:

添加类RegexFilteringInterceptor

修改InterceptorType,添加type与类的映射关系:

REGEX_FILTER(org.apache.flume.interceptor.RegexFilteringInterceptor.Builder.class)

4.1.  Regex FilteringInterceptor说明

This interceptor filters events selectively by interpreting the eventbody as text and matching the text against a configured regular expression. Thesupplied regular expression can be used to include events or exclude events.

Property Name

Default

Description

type

The component type name has to be REGEX_FILTER

regex

”.*”

Regular expression for matching against events

excludeRegex

false

If true, regex determines events to exclude, otherwise regex determines events to include.

4.2.  使用说明(测试配置)

4.2.1.  日志接收的Flume agent配置

agent1.sources = source1

agent1.sinks = sink1

agent1.channels = channel1

 

# Describe/configure source1

agent1.sources.source1.type =  avro

agent1.sources.source1.bind =  localhost

agent1.sources.source1.port = 5140

 

 

agent1.sources.source1. interceptors = inter1

agent1.sources.source1. interceptors.inter1.type = REGEX_FILTER

agent1.sources.source1. interceptors.inter1. regex = .*DEBUG.*

agent1.sources.source1. interceptors.inter1.excludeRegex = false

 

 

 

 

# Describe sink1

#agent1.sinks.sink1.type =  avro

#agent1.sinks.sink1.channels = channel1

#agent1.sinks.sink1. hostname = 192.168.0.144

#agent1.sinks.sink1.port = 44444

 

agent1.sinks.sink1.type = FILE_ROLL

agent1.sinks.sink1.sink.directory = E:\\file-out

 

 

# Use a channel which buffers events in memory

agent1.channels.channel1.type = memory

agent1.channels.channel1.capacity = 1000

agent1.channels.channel1.transactionCapactiy = 100

 

# Bind the source and sink to the channel

agent1.sources.source1.channels = channel1

agent1.sinks.sink1.channel = channel1

5.  HDFS SINK

5.1.  使用说明

输出到hdfs的数据,首先在hdfs上创建文件.tmp,然后文件关闭时,将tmp后缀去掉,存储方案与file输出类似,可以设定时间间隔、文件大小、接受事件条数作为滚动生成新文件的依据,默认30s

 

5.2.  可配置项

Name

Default

Description

channel

 

type

The component type name, needs to be hdfs

hdfs.path

HDFS directory path (eg hdfs://namenode/flume/webdata/)

hdfs.filePrefix

FlumeData

Name prefixed to files created by Flume in hdfs directory

hdfs.rollInterval

30

Number of seconds to wait before rolling current file (0 = never roll based on time interval)

hdfs.rollSize

1024

File size to trigger roll, in bytes (0: never roll based on file size)

hdfs.rollCount

10

Number of events written to file before it rolled (0 = never roll based on number of events)

hdfs.batchSize

1

number of events written to file before it flushed to HDFS

hdfs.txnEventMax

100

 

hdfs.codeC

Compression codec. one of following : gzip, bzip2, lzo, snappy

hdfs.fileType

SequenceFile

File format: currently SequenceFile,DataStream orCompressedStream(1)DataStream will not compress output file and please don’t set codeC (2)CompressedStream requires set hdfs.codeC with an available codeC

hdfs.maxOpenFiles

5000

 

hdfs.writeFormat

“Text” or “Writable”

hdfs.appendTimeout

1000

 

hdfs.callTimeout

10000

 

hdfs.threadsPoolSize

10

Number of threads per HDFS sink for HDFS IO ops (open, write, etc.)

hdfs.rollTimerPoolSize

1

Number of threads per HDFS sink for scheduling timed file rolling

hdfs.kerberosPrincipal

Kerberos user principal for accessing secure HDFS

hdfs.kerberosKeytab

Kerberos keytab for accessing secure HDFS

hdfs.round

false

Should the timestamp be rounded down (if true, affects all time based escape sequences except %t)

hdfs.roundValue

1

Rounded down to the highest multiple of this (in the unit configured usinghdfs.roundUnit), less than current time.

hdfs.roundUnit

second

The unit of the round down value - second,minute orhour.

serializer

TEXT

Other possible options include AVRO_EVENT or the fully-qualified class name of an implementation of theEventSerializer.Builder interface.

serializer.*

 

 

5.3.  Agent配置样例

 

# The configuration file needs to define the sources,

# the channels and the sinks.

# Sources, channels and sinks are defined per agent,

# in this case called 'agent'

# example.conf: A single-node Flume configuration

 

# Name the components on this agent

 

agent1.sources = source1

agent1.sinks = sink1

agent1.channels = channel1

# Describe/configure source1

#agent1.sources.source1.type = avro

agent1.sources.source1.type = exec

agent1.sources.source1.command = tail -f /home/yubojie/logs/ultraIDCPServer.log

#agent1.sources.source1.bind = 192.168.0.146

#agent1.sources.source1.port = 44444

 

agent1.sources.source1.interceptors = a

agent1.sources.source1.interceptors.a.type = org.apache.flume.interceptor.HostInterceptor$Builder

agent1.sources.source1.interceptors.a.preserveExisting = false

agent1.sources.source1.interceptors.a.hostHeader = hostname

 

 

# Describe sink1

#agent1.sinks.sink1.type = FILE_ROLL

#agent1.sinks.sink1.sink.directory = /home/yubojie/flume/apache-flume-1.2.0/flume-out

agent1.sinks.sink1.type = hdfs

agent1.sinks.sink1.hdfs.path = hdfs://192.168.98.20:9000/user/hadoop/yubojietest

agent1.sinks.sink1.hdfs.fileType = DataStream

 

# Use a channel which buffers events in memory

agent1.channels.channel1.type = memory

agent1.channels.channel1.capacity = 1000

agent1.channels.channel1.transactionCapactiy = 100

 

# Bind the source and sink to the channel

agent1.sources.source1.channels = channel1

agent1.sinks.sink1.channel = channel1

 

6.  多agent采集文件到hdfs

6.1.  准备工作

1.         文件采集类打包成jar放到flume/apache-flume-1.2.0/lib目录下

2.         创建fileSourceRecorder.properties空文件放到flume/apache-flume-1.2.0/conf下(将要修改为如果文件不存在则创建该文件,后续将不用再创建这个文件)

 

6.2.  agent配置文件

6.2.1.  agent1

# example.conf: A single-node Flume configuration     

                                                                                                                                                         

# Name the components on this agent                

                                                    

agent1.sources = source1                                                                              

agent1.sinks = sink1                                                                          

agent1.channels = channel1                        

                                                   

# Describe/configure source1                       

agent1.sources.source1.type = com.ultrapower.ultracollector.flume.source.file.FileSource                                                

 

agent1.sources.source1.path = /home/yubojie/logs/ultraIDCPServer.log

#gbk,utf-8 

agent1.sources.source1.encoding = utf-8

agent1.sources.source1.onceMaxReadByte = 999

agent1.sources.source1.cacheQueueSize = 10

agent1.sources.source1.noChangeSleepTime = 1000

agent1.sources.source1.batchCommitSize = 5

agent1.sources.source1.batchWaitTime = 500        

       

#agent1.sources.source1.type = avro

#agent1.sources.source1.bind = localhost

#agent1.sources.source1.port = 44444

                                                                                      

# Describe sink1                                                                                       

#agent1.sinks.sink1.type = logger          

#agent1.sinks.sink1.type = FILE_ROLL

#agent1.sinks.sink1.sink.directory = E:/file-out

#agent1.sinks.sink1.sink.fileName = a.log

agent1.sinks.sink1.type = hdfs

#agent1.sinks.sink1.hdfs.path = hdfs://192.168.98.20:9000/user/hadoop/yubojietest

agent1.sinks.sink1.hdfs.path = hdfs://192.168.0.153:9000/user/file

agent1.sinks.sink1.hdfs.callTimeout = 20000

agent1.sinks.sink1.hdfs.fileType = DataStream

#agent1.sinks.sink1.sink.rollInterval = 30        

               

                                          

# Use a channel which buffers events in memory        

agent1.channels.channel1.type = memory             

agent1.channels.channel1.capacity = 1000           

agent1.channels.channel1.transactionCapactiy = 100 

                                          

                                                  

# Bind the source and sink to the channel                                                              

agent1.sources.source1.channels = channel1

agent1.sinks.sink1.channel = channel1       

    

########################## test method ########################################

 

#########start flume agent ######### 

#agent -n agent1 -f .\conf\flume-conf.properties.template.file.signle

 

######### client send message #########

 

# $ bin/flume-ng avro-client -H localhost -p 44444 -F 'F:/1/log.log'

6.2.2.  agent2

# example.conf: A single-node Flume configuration     

                                                                                                                                                         

# Name the components on this agent                 

                                                   

agent2.sources = source1                                                                              

agent2.sinks = sink1                                                                          

agent2.channels = channel1                        

                                                   

# Describe/configure source1                       

agent2.sources.source1.type = com.ultrapower.ultracollector.flume.source.file.FileSource                                                

 

agent2.sources.source1.path = /home/yubojie/logtest/logs/ultraIDCPServer.log

#gbk,utf-8 

agent2.sources.source1.encoding = utf-8

agent2.sources.source1.onceMaxReadByte = 999

agent2.sources.source1.cacheQueueSize = 10

agent2.sources.source1.noChangeSleepTime = 1000

agent2.sources.source1.batchCommitSize = 5

agent2.sources.source1.batchWaitTime = 500        

       

#agent1.sources.source1.type = avro

#agent1.sources.source1.bind = localhost

#agent1.sources.source1.port = 44444

                                                                                     

# Describe sink1                                                                                        

#agent1.sinks.sink1.type = logger          

#agent1.sinks.sink1.type = FILE_ROLL

#agent1.sinks.sink1.sink.directory = E:/file-out

#agent1.sinks.sink1.sink.fileName = a.log

agent2.sinks.sink1.type = hdfs

#agent1.sinks.sink1.hdfs.path = hdfs://192.168.98.20:9000/user/hadoop/yubojietest

agent2.sinks.sink1.hdfs.path = hdfs://192.168.0.153:9000/user/file

agent2.sinks.sink1.hdfs.callTimeout = 20000

agent2.sinks.sink1.hdfs.fileType = DataStream

#agent1.sinks.sink1.sink.rollInterval = 30        

               

                                          

# Use a channel which buffers events in memory        

agent2.channels.channel1.type = memory             

agent2.channels.channel1.capacity = 1000           

agent2.channels.channel1.transactionCapactiy = 100 

                                          

                                                  

# Bind the source and sink to the channel                                                               

agent2.sources.source1.channels = channel1

agent2.sinks.sink1.channel = channel1       

    

########################## test method ########################################

 

#########start flume agent ######### 

#agent -n agent1 -f .\conf\flume-conf.properties.template.file.signle

 

######### client send message #########

 

# $ bin/flume-ng avro-client -H localhost -p 44444 -F 'F:/1/log.log'

6.3.  启动命令

flume-ng agent -name agent1 -c conf -f ../conf/flume-conf.properties  

//agent1监控/home/yubojie/logs/ultraIDCPServer.log

flume-ng agent -name agent2 -c conf -f ../conf/flume-conf2.properties

//agent2监控/home/yubojie/logtest/logs/ultraIDCPServer.log

6.4.  测试结果

1.         agent1和agent2各自监控相应文件,互不干涉

2.         文件各自输出到hdfs生成各自的文件

6.  参考资料:

资料

日志采集

https://issues.cloudera.org//browse/FLUME-27

http://archive.cloudera.com/cdh/3/flume-ng-1.2.0-cdh3u5/FlumeUserGuide.html#exec-source

http://www.quora.com/Flume/What-Flume-sources-do-people-use-in-production

http://blog.csdn.net/rzhzhz/article/details/7610252

过滤: https://issues.apache.org/jira/secure/attachment/12537520/FLUME-1358.patch.v4.txt

https://issues.apache.org/jira/browse/FLUME-1358

 

RegexFilteringInterceptor源代码

 

 

packageorg.apache.flume.interceptor;

 

importstaticorg.apache.flume.interceptor.RegexFilteringInterceptor.Constants. DEFAULT_EXCLUDE_EVENTS;

importstatic org.apache.flume.interceptor.RegexFilteringInterceptor.Constants. DEFAULT_REGEX;

importstatic org.apache.flume.interceptor.RegexFilteringInterceptor.Constants. EXCLUDE_EVENTS;

importstatic org.apache.flume.interceptor.RegexFilteringInterceptor.Constants. REGEX;

 

import java.util.List;

import java.util.regex.Pattern;

 

import org.apache.flume.Context;

import org.apache.flume.Event;

import org.slf4j.Logger;

importorg.slf4j.LoggerFactory;

 

import com.google.common.collect.Lists;

 

publicclass RegexFilteringInterceptor implements Interceptor {

 

   privatestaticfinal Logger logger =LoggerFactory

      . getLogger(RegexFilteringInterceptor. class);

 

   privatefinal Patternregex;

   privatefinalbooleanexcludeEvents;

 

  /**

   *Only{@link RegexFilteringInterceptor.Builder}canbuildme

   */

   private RegexFilteringInterceptor(Pattern regex, boolean excludeEvents) {

     this.regex = regex;

     this.excludeEvents = excludeEvents;

  }

 

  @Override

   publicvoid initialize() {

    // no- op

  }

 

 

  @Override

  /**

   *Returnstheeventifitpassestheregularexpressionfilterandnull

   *otherwise.

   */

   public Event intercept(Event event) {

    // We've already ensured here that at most one of includeRegex and

    // excludeRegex are defined.

 

     if (!excludeEvents) {

       if (regex.matcher( new String(event.getBody())).find()) {

         return event;

      }

       else {

         returnnull;

      }

    }

     else {

       if (regex.matcher( new String(event.getBody())).find()) {

         returnnull;

      }

       else {

         return event;

      }

    }

  }

 

  /**

   *Returnsthesetofeventswhichpassfilters,accordingto

   *{@link #intercept(Event)}.

   * @paramevents

   *@return

   */

  @Override

   public List<Event> intercept(List<Event> events) {

    List<Event> out = Lists. newArrayList();

     for (Event event : events) {

      Event outEvent = intercept(event);

       if (outEvent != null) { out.add(outEvent); }

    }

     return out;

  }

  @Override

   publicvoid close() {

    // no- op

  }

  /**

   *BuilderwhichbuildsnewinstanceoftheStaticInterceptor.

   */

   publicstaticclass Builder implements Interceptor.Builder {

 

     private Patternregex;

     privatebooleanexcludeEvents;

    @Override

     publicvoid configure(Context context) {

      String regexString = context.getString( REGEX, DEFAULT_REGEX);

      regex = Pattern. compile(regexString);

      excludeEvents = context.getBoolean( EXCLUDE_EVENTS,

           DEFAULT_EXCLUDE_EVENTS);

    }

    @Override

     public Interceptor build() {

       logger.info(String. format(

          "Creating RegexFilteringInterceptor: regex=%s,excludeEvents=%s",

          regex,excludeEvents));

       returnnew RegexFilteringInterceptor(regex,excludeEvents);

    }

  }

   publicstaticclass Constants {

     publicstaticfinal String REGEX ="regex";

     publicstaticfinal String DEFAULT_REGEX =".*";

     publicstaticfinal String EXCLUDE_EVENTS ="excludeEvents";

     publicstaticfinalboolean DEFAULT_EXCLUDE_EVENTS =  false;

  }

}

InterceptorType源代码

黄色为添加内容

package org.apache.flume.interceptor;

 

public enum InterceptorType {

 

  TIMESTAMP(org.apache.flume.interceptor.TimestampInterceptor.Builder.class),

  HOST(org.apache.flume.interceptor.HostInterceptor.Builder.class),

  REGEX_FILTER(org.apache.flume.interceptor.RegexFilteringInterceptor.Builder.class),

  ;

 

  private final Class<? extends Interceptor.Builder> builderClass;

 

  private InterceptorType(Class<? extends Interceptor.Builder> builderClass) {

    this.builderClass = builderClass;

  }

 

  public Class<? extends Interceptor.Builder> getBuilderClass() {

    return builderClass;

  }

 

}

作者:sunmeng_Alex 发表于2013-8-5 9:25:59 原文链接
阅读:0 评论:0 查看评论

相关 [flume 日志] 推荐:

flume日志采集

- - CSDN博客推荐文章
1.1.2.  Client端Log4j配置文件. (黄色文字为需要配置的内容). //日志Appender修改为flume提供的Log4jAppender. //日志需要发送到的端口号,该端口要有ARVO类型的source在监听. //日志需要发送到的主机ip,该主机运行着ARVO类型的source.

Flume日志收集

- - 企业架构 - ITeye博客
转: http://www.cnblogs.com/oubo/archive/2012/05/25/2517751.html. Flume是一个分布式、可靠、和高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力.

分布式日志收集收集系统:Flume

- - 标点符
Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统. 支持在系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力. Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera.

分布式日志收集系统Apache Flume的设计介绍

- - CSDN博客架构设计推荐文章
Flume是Cloudera公司的一款高性能、高可能的分布式日志收集系统. 现在已经是Apache Top项目. 同Flume相似的日志收集系统还有 Facebook Scribe, Apache Chuwka, Apache Kafka(也是LinkedIn的). Flume是后起之秀,本文尝试简要分析Flume数据流通过程中提供的组件、可靠性保证来介绍Flume的主要设计,不涉及Flume具体的安装使用,也不涉及代码层面的剖析.

开源日志系统简介——Scribe,flume,kafka,Chukwa

- - 互联网 - ITeye博客
许多公司的平台每天会产生大量的日志(一般为流式数据,如,搜索引擎的pv,查询等),处理这些日志需要特定的日志系统,一般而言,这些系统需要具有以下特征:. (1) 构建应用系统和分析系统的桥梁,并将它们之间的关联解耦;. (2) 支持近实时的在线分析系统和类似于Hadoop之类的离线分析系统;. 即:当数据量增加时,可以通过增加节点进行水平扩展.

Flume + kafka + HDFS构建日志采集系统

- - 企业架构 - ITeye博客
    Flume是一个非常优秀日志采集组件,类似于logstash,我们通常将Flume作为agent部署在application server上,用于收集本地的日志文件,并将日志转存到HDFS、kafka等数据平台中;关于Flume的原理和特性,我们稍后详解,本文只简述如何构建使用Flume + kafka + HDFS构建一套日志采集系统.

使用Flume+Kafka+SparkStreaming进行实时日志分析

- - CSDN博客推荐文章
每个公司想要进行数据分析或数据挖掘,收集日志、ETL都是第一步的,今天就讲一下如何实时地(准实时,每分钟分析一次)收集日志,处理日志,把处理后的记录存入Hive中,并附上完整实战代码. 思考一下,正常情况下我们会如何收集并分析日志呢. 首先,业务日志会通过Nginx(或者其他方式,我们是使用Nginx写入日志)每分钟写入到磁盘中,现在我们想要使用Spark分析日志,就需要先将磁盘中的文件上传到HDFS上,然后Spark处理,最后存入Hive表中,如图所示:.

FLUME监控每天按日期滚动的日志文件

- - 开源软件 - ITeye博客
       原来的flume的配置如下:.        更改后的配置为:.        其中 locktail_rotate.sh 参见 https://github.com/ypenglyn/locktail/blob/master/locktail_rotate.sh. 已有 0 人发表留言,猛击->> 这里<<-参与讨论.

使用Flume+Kafka+SparkStreaming进行实时日志分析 - Trigl的博客 - CSDN博客

- -

Flume OG 与 Flume NG 的对比

- - 开源软件 - ITeye博客
很久没接触flume了,刚掀开官网一看,发现flume已然不是以前的那个flume了,其实早在flume技术群就听到NG这个字眼,以前没特注意,今天做了些对比,发现flume确实有了投胎换骨般的改变. 首先介绍下Flume OG & Flume NG这两个概念. Flume OG:Flume original generation 即Flume 0.9.x版本.