分布式日志

标签: 分布 日志 | 发表时间:2015-05-27 20:50 | 作者:muruiheng
出处:http://www.iteye.com

  

 

最近完成一个简单的日志管理系统,拿出来跟大家分享一下!


主要实现的功能:

1、支持动态修改配置

2、实现统一的配置管理

3、支持文件输出、habse输出、mongodb输出


基于以上三点功能,我们下面详细说明


1、支持动态修改配置

说道支持这个功能,有个同事认为没有这个必要,他的观点是log4j的配置不需要经常变动,不需要支持这样的功能;本人的观点是“配置可以进行统一管理、而且正式机跟测试机的log4j的配置肯定会有一些差异的”,因此这个功能是必须的。


下面说一下实现

通过log4j提供的PropertyConfigurator类可以非常轻松的实现这个功能

代码如下

 

 

public class Log4jConfigListener {

 private  String log4jPath;

 /**
 * @param log4jPath the log4jPath to set
 */
 public void setLog4jPath(String log4jPath) {
 this.log4jPath = log4jPath;
 }


 /**
 * 装载log4j配置文件
* 
 * @author mrh
 * @DATE 2011-5-28
 */
 public void load() {
 // String path="config/log4j.properties";
 System.out.println("log4j configfile path=" + log4j.toString());
 PropertyConfigurator.configureAndWatch(log4j.toString(), 1000);// 间隔特定时间,检测文件是否修改,自动重新读取配置
}
}

 

 Spring中的配置

 

<bean class="com.jl.net.log4j.config.Log4jConfigListener" init-method="load">
  <property name="log4jPath" value="WEB-INF/classes/log4j/" /> <!--通过zookeeper实现统一配置的dataId-->
 </bean>

 

2、统一配置管理

统一配置管理,是通过zookeeper实现的,配置形式与log4j.properties配置一致;

配置如下

 

log4j.rootLogger=INFO,console,HbaseAppender

#console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{ABSOLUTE} %5p %c{1}\:%L - %m%n
#hbase
log4j.appender.HbaseAppender=com.jl.net.log4j.hbase.HbaseAppender
log4j.appender.HbaseAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.HbaseAppender.sysName=tuid
log4j.appender.HbaseAppender.serverIp=Y
log4j.appender.HbaseAppender.zookeeper_ip=10.1.18.100,10.1.18.103,10.1.18.102
log4j.appender.HbaseAppender.zookeeper_port=2181
log4j.appender.HbaseAppender.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss sss} %c.%t:%L %-5p %x  %m%n


Spring中的配置


<bean class="com.jl.net.log4j.config.Log4jConfigListener" init-method="load">
  <property name="log4jPath" value="cfg/jlcloud.uid.log4j" /> <!--通过zookeeper实现统一配置的dataId-->
 </bean>

3、支持文件、habse、mongodb的输出

log4j本身就支持文件输出,mongodb的输出需要借助log4mongo-Java这个项目,maven的坐标是

 

<dependency>
   <groupId>org.log4mongo</groupId>
   <artifactId>log4mongo-java</artifactId>
   <version>0.7.4</version>
   <optional>true</optional>
  </dependency>

 

 

下面介绍一下如何实现hbase的输出,通过继承log4j的AppenderSkeleton类,同时实现Runnable接口,来实现:

代码如下

 

package com.yck.worm.hbase;

import java.io.IOException;
import java.net.InetAddress;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.spi.LoggingEvent;

import com.yck.worm.logging.WLogger;



/**
 * 将日志 输出到HBase里
 * @author mrh
 *
 */
public class HbaseAppender extends AppenderSkeleton implements Runnable {
	
	private static final WLogger LOGGER = WLogger.getLogger(HbaseAppender.class);
	
	private int batchSize = -1;
	private int period = 1000;
	private String hbLogName = "jl_logs";
	private String hbLogFamily = "logs";
	private Queue<LoggingEvent> loggingEvents;
	private ScheduledExecutorService executor;
	private ScheduledFuture<?> task;
	private Configuration conf;
	private HTableInterface htable;
	private HBaseAdmin admin;

	private HConnection connection;
	
	private String zookeeper;
	
	private String sysName;
	
	/**
	 * if serverIp = Y then out put server's IP to hbase 
	 */
	private String serverIp;

	/**
	 * log4j初始设置,启动日志处理计划任务
	 */
	@Override
	public void activateOptions() {
		try {
			super.activateOptions();
			// 创建一个计划任务,并自定义线程名
			executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HbaseAppender"));
			// 日志队列
			loggingEvents = new ConcurrentLinkedQueue<LoggingEvent>();
			// 启动计划任务,如果run函数有异常任务将中断!
			task = executor.scheduleWithFixedDelay(this, period, period,TimeUnit.MILLISECONDS);
			if ("Y".equalsIgnoreCase(this.serverIp)) {
				InetAddress addr = InetAddress.getLocalHost();
				this.serverIp = addr.getHostAddress().toString();//获得本机IP 
			}
			LOGGER.info("ActivateOptions ok!");
		} catch (Exception e) {
			LOGGER.error("Error during activateOptions: " , e);
		}
	}

	/**
	 * 初始HBASE
	 *
	 * @return
	 */
	private boolean initHbase() {
        try {
            if (conf == null) {
            	LOGGER.info("initHbase ... zookeeper address is " + this.zookeeper);
            	if (this.zookeeper == null || "".equals(this.zookeeper)) {
            		throw new IllegalArgumentException("the zookeeper address for jlog4j's hadoop initial can not be null!");
            	}
            	String ip = this.zookeeper.split(":")[0];
            	String port = this.zookeeper.split(":")[1];
                conf = HBaseConfiguration.create();
                conf.set(HConstants.ZOOKEEPER_QUORUM, ip);
                conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.parseInt(port));
                
                this.admin  = new HBaseAdmin(conf);
                if (!this.admin.tableExists(Bytes.toBytes(hbLogName))) {
                	HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(hbLogName));
                	HColumnDescriptor columnDescriptor = new HColumnDescriptor(hbLogFamily.getBytes());
                	tableDescriptor.addFamily(columnDescriptor);
                	this.admin.createTable(tableDescriptor);
                }
                this.connection = HConnectionManager.createConnection(conf);
                this.htable = this.connection.getTable(hbLogName.getBytes());
            }
            LOGGER.error("Init Hbase success !");
            return true;
        } catch (Exception e) {
            task.cancel(false);
            executor.shutdown();
            LOGGER.error("Init Hbase fail !", e);
            return false;
        }
    }

	@Override
	public void run() {
		if (conf == null || htable == null) {
			initHbase();
		}
		try {
			if (this.batchSize < 0 && loggingEvents.size() > 0 ) {
				this.output();
			} else 
			 // 日志数据超出批量处理大小
			if (loggingEvents.size() > 0 && loggingEvents.size() > this.batchSize) {
				this.output();
			}
		} catch (Exception e) {
			LOGGER.error("HbaseAppender Error run ", e);
		}
	}
	
	/**
	 * 输出日志
	 * @throws IOException
	 */
	private void output() throws IOException {
		LoggingEvent event;
		List<Put> logs = new ArrayList<Put>();
		// 循环处理日志队列
		while ((event = loggingEvents.poll()) != null) {
			try {
				// 写日志内容
				SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS");
				Date date = new Date(event.getTimeStamp());
				// 创建日志并指定ROW KEY
				StringBuffer key = new StringBuffer("");
				if (this.sysName != null)
					key.append(this.sysName).append("#");
				key.append(event.getLoggerName()).append("#");
				key.append(event.getLevel()).append("#");
				key.append(sdf.format(date)).append("#");
				key.append(UUID.randomUUID().toString().replace("-", ""));
				Put log = new Put(key.toString().getBytes());
				if (this.sysName != null)
					log.add(hbLogFamily.getBytes(), "sysName".getBytes(), this.sysName.getBytes());
				if (this.serverIp != null)
					log.add(hbLogFamily.getBytes(), "serverIp".getBytes(), this.serverIp.getBytes());
				log.add(hbLogFamily.getBytes(), "LoggerName".getBytes(), event.getLoggerName().getBytes());
				log.add(hbLogFamily.getBytes(), "level".getBytes(), event.getLevel().toString().getBytes());
				log.add(hbLogFamily.getBytes(), "datetime".getBytes(), sdf.format(date).getBytes());
				log.add(hbLogFamily.getBytes(), "message".getBytes(), event.getMessage().toString().getBytes());
				logs.add(log);
			} catch (Exception e) {
				LOGGER.error("Error logging put ", e);
			}
		}
		// 批量写入HBASE
		if (logs.size() > 0)
			htable.put(logs);
	}

	/**
	 * 日志事件
	 *
	 * @param loggingEvent
	 */
	@Override
	protected void append(LoggingEvent loggingEvent) {
		try {
			populateEvent(loggingEvent);
			// 添加到日志队列
			loggingEvents.add(loggingEvent);
		} catch (Exception e) {
			LOGGER.error("Error populating event and adding to queue", e);
		}
	}

	/**
	 * 事件测试
	 *
	 * @param event
	 */
	protected void populateEvent(LoggingEvent event) {
		event.getThreadName();
		event.getRenderedMessage();
		event.getNDC();
		event.getMDCCopy();
		event.getThrowableStrRep();
		event.getLocationInformation();
	}

	@Override
	public void close() {
		try {
			if (this.task != null)
				this.task.cancel(false);
			if (this.executor != null)
				this.executor.shutdown();
			if (this.connection != null)
				this.connection.close();
			if (this.admin != null)
				this.admin.close();
			if (this.htable != null)
				this.htable.close();
		} catch (IOException e) {
			LOGGER.error("Error close ", e);
		}
	}

	@Override
	public boolean requiresLayout() {
		return true;
	}

	// 设置每一批日志处理数量
	public void setBatchSize(int batchSize) {
		this.batchSize = batchSize;
	}

	/**
	 * 设置计划任务执行间隔
	 *
	 * @param period
	 */
	public void setPeriod(int period) {
		this.period = period;
	}

	/**
	 * 设置日志存储HBASE表名
	 *
	 * @param hbLogName
	 */
	public void setHbLogName(String hbLogName) {
		this.hbLogName = hbLogName;
	}

	/**
	 * 日志表的列族名字
	 * 
	 * @param hbLogFamily
	 */
	public void setHbLogFamily(String hbLogFamily) {
		this.hbLogFamily = hbLogFamily;
	}

	/**
	 * @param zookeeper the zookeeper to set
	 */
	public void setZookeeper(String zookeeper) {
		this.zookeeper = zookeeper;
	}

	/**
	 * @param sysName the sysName to set
	 */
	public void setSysName(String sysName) {
		this.sysName = sysName;
	}

	/**
	 * @param serverIp the serverIp to set
	 */
	public void setServerIp(String serverIp) {
		this.serverIp = serverIp;
	}
}

 

 

收工,有些粗糙,将就吧!

 

附带源码,见附件



    本文附件下载:
  • worm.rar (18.9 KB)


已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [分布 日志] 推荐:

分布式日志

- - Java - 编程语言 - ITeye博客
最近完成一个简单的日志管理系统,拿出来跟大家分享一下. 3、支持文件输出、habse输出、mongodb输出. 基于以上三点功能,我们下面详细说明. 说道支持这个功能,有个同事认为没有这个必要,他的观点是log4j的配置不需要经常变动,不需要支持这样的功能;本人的观点是“配置可以进行统一管理、而且正式机跟测试机的log4j的配置肯定会有一些差异的”,因此这个功能是必须的.

分布式日志收集收集系统:Flume

- - 标点符
Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统. 支持在系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力. Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera.

分布式日志收集系统Apache Flume的设计介绍

- - CSDN博客架构设计推荐文章
Flume是Cloudera公司的一款高性能、高可能的分布式日志收集系统. 现在已经是Apache Top项目. 同Flume相似的日志收集系统还有 Facebook Scribe, Apache Chuwka, Apache Kafka(也是LinkedIn的). Flume是后起之秀,本文尝试简要分析Flume数据流通过程中提供的组件、可靠性保证来介绍Flume的主要设计,不涉及Flume具体的安装使用,也不涉及代码层面的剖析.

开源分布式搜索平台ELK(Elasticsearch+Logstash+Kibana)+Redis+Syslog-ng实现日志实时搜索

- - C1G军火库
ElasticSearch是一个基于Lucene构建的开源,分布式,RESTful搜索引擎. 设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便. 支持通过HTTP使用JSON进行数据索引. logstash是一个应用程序日志、事件的传输、处理、管理和搜索的平台. 你可以用它来统一对应用程序日志进行收集管理,提供 Web 接口用于查询和统计.

日志管理

- - CSDN博客系统运维推荐文章
#很关键 [root@client01 ~]# ls /var/log/ anaconda.ifcfg.log. tallylog #关键日志,大部分记录在里面 [root@client01 ~]# ls /var/log/messages /var/log/messages. [root@client01 ~]# ps -ef|grep log #系统日志服务 root.

日志优化

- - 互联网 - ITeye博客
在任何系统中,日志都是非常重要的组成部分,它是反映系统运行情况的重要依据,也是排查问题时的必要线索. 绝大多数人都认可日志的重要性,但是又有多少人仔细想过该怎么打日志,日志对性能的影响究竟有多大呢. 今天就让我们来聊聊Java日志性能那些事. 说到Java日志,大家肯定都会说要选择合理的日志级别、合理控制日志内容,但是这仅是万里长征第一步……哪怕一些 DEBUG级别的日志在生产环境中不会输出到文件中,也可能带来不小的开销.

nginx日志切割

- - haohtml's blog
nginx的日志文件没有rotate功能. 如果你不处理,日志文件将变得越来越大,还好我们可以写一个nginx日志切割脚本来自动切割日志文件. 第一步就是重命名日志文件,不用担心重命名后nginx找不到日志文件而丢失日志. 在你未重新打开原名字的日志文件前,nginx还是会向你重命名的文件写日志,linux是靠文件描述符而不是文件名定位文件.

flume日志采集

- - CSDN博客推荐文章
1.1.2.  Client端Log4j配置文件. (黄色文字为需要配置的内容). //日志Appender修改为flume提供的Log4jAppender. //日志需要发送到的端口号,该端口要有ARVO类型的source在监听. //日志需要发送到的主机ip,该主机运行着ARVO类型的source.

Flume日志收集

- - 企业架构 - ITeye博客
转: http://www.cnblogs.com/oubo/archive/2012/05/25/2517751.html. Flume是一个分布式、可靠、和高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力.

GC 日志分析

- - 码蜂笔记
不同的JVM及其选项会输出不同的日志. 生成下面日志使用的选项: -XX:+PrintGCTimeStamps -XX:+PrintGCDetails -Xloggc:d:/GClogs/tomcat6-gc.log. 最前面的数字 4.231 和 4.445 代表虚拟机启动以来的秒数.